Files
2025-04-30 08:48:49 -05:00

431 lines
16 KiB
Python

# uncompyle6 version 3.9.2
# Python bytecode version base 3.7.0 (3394)
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
# [Clang 14.0.6 ]
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/h2/utilities.py
# Compiled at: 2024-04-18 03:12:55
# Size of source mod 2**32: 20128 bytes
"""
h2/utilities
~~~~~~~~~~~~
Utility functions that do not belong in a separate module.
"""
import collections, re
from string import whitespace
import sys
from hpack import HeaderTuple, NeverIndexedHeaderTuple
from .exceptions import ProtocolError, FlowControlError
UPPER_RE = re.compile(b'[A-Z]')
CONNECTION_HEADERS = frozenset([
b'connection', 'connection',
b'proxy-connection', 'proxy-connection',
b'keep-alive',
'keep-alive',
b'transfer-encoding', 'transfer-encoding',
b'upgrade',
'upgrade'])
_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([
b':method', ':method',
b':scheme', ':scheme',
b':authority', ':authority',
b':path',
':path',
b':status', ':status'])
_SECURE_HEADERS = frozenset([
b'authorization', "authorization",
b'proxy-authorization', "proxy-authorization"])
_REQUEST_ONLY_HEADERS = frozenset([
b':scheme', ':scheme',
b':path', ':path',
b':authority', ':authority',
b':method',
':method'])
_RESPONSE_ONLY_HEADERS = frozenset([b':status', ":status"])
if sys.version_info[0] == 2:
_WHITESPACE = frozenset(whitespace)
else:
_WHITESPACE = frozenset(map(ord, whitespace))
def _secure_headers(headers, hdr_validation_flags):
"""
Certain headers are at risk of being attacked during the header compression
phase, and so need to be kept out of header compression contexts. This
function automatically transforms certain specific headers into HPACK
never-indexed fields to ensure they don't get added to header compression
contexts.
This function currently implements two rules:
- 'authorization' and 'proxy-authorization' fields are automatically made
never-indexed.
- Any 'cookie' header field shorter than 20 bytes long is made
never-indexed.
These fields are the most at-risk. These rules are inspired by Firefox
and nghttp2.
"""
for header in headers:
if header[0] in _SECURE_HEADERS:
yield NeverIndexedHeaderTuple(*header)
elif header[0] in (b'cookie', 'cookie') and len(header[1]) < 20:
yield NeverIndexedHeaderTuple(*header)
else:
yield header
def extract_method_header(headers):
"""
Extracts the request method from the headers list.
"""
for k, v in headers:
if k in (b':method', ':method'):
if not isinstance(v, bytes):
return v.encode("utf-8")
return v
def is_informational_response(headers):
"""
Searches a header block for a :status header to confirm that a given
collection of headers are an informational response. Assumes the header
block is well formed: that is, that the HTTP/2 special headers are first
in the block, and so that it can stop looking when it finds the first
header field whose name does not begin with a colon.
:param headers: The HTTP/2 header block.
:returns: A boolean indicating if this is an informational response.
"""
for n, v in headers:
if isinstance(n, bytes):
sigil = b':'
status = b':status'
informational_start = b'1'
else:
sigil = ":"
status = ":status"
informational_start = "1"
if not n.startswith(sigil):
return False
if n != status:
continue
return v.startswith(informational_start)
def guard_increment_window(current, increment):
"""
Increments a flow control window, guarding against that window becoming too
large.
:param current: The current value of the flow control window.
:param increment: The increment to apply to that window.
:returns: The new value of the window.
:raises: ``FlowControlError``
"""
LARGEST_FLOW_CONTROL_WINDOW = 2147483647
new_size = current + increment
if new_size > LARGEST_FLOW_CONTROL_WINDOW:
raise FlowControlError("May not increment flow control window past %d" % LARGEST_FLOW_CONTROL_WINDOW)
return new_size
def authority_from_headers(headers):
"""
Given a header set, searches for the authority header and returns the
value.
Note that this doesn't terminate early, so should only be called if the
headers are for a client request. Otherwise, will loop over the entire
header set, which is potentially unwise.
:param headers: The HTTP header set.
:returns: The value of the authority header, or ``None``.
:rtype: ``bytes`` or ``None``.
"""
for n, v in headers:
if n in (b':authority', ':authority'):
if not isinstance(v, bytes):
return v.encode("utf-8")
return v
HeaderValidationFlags = collections.namedtuple("HeaderValidationFlags", [
"is_client", "is_trailer", "is_response_header", "is_push_promise"])
def validate_headers(headers, hdr_validation_flags):
"""
Validates a header sequence against a set of constraints from RFC 7540.
:param headers: The HTTP header set.
:param hdr_validation_flags: An instance of HeaderValidationFlags.
"""
headers = _reject_uppercase_header_fields(headers, hdr_validation_flags)
headers = _reject_surrounding_whitespace(headers, hdr_validation_flags)
headers = _reject_te(headers, hdr_validation_flags)
headers = _reject_connection_header(headers, hdr_validation_flags)
headers = _reject_pseudo_header_fields(headers, hdr_validation_flags)
headers = _check_host_authority_header(headers, hdr_validation_flags)
headers = _check_path_header(headers, hdr_validation_flags)
return list(headers)
def _reject_uppercase_header_fields(headers, hdr_validation_flags):
"""
Raises a ProtocolError if any uppercase character is found in a header
block.
"""
for header in headers:
if UPPER_RE.search(header[0]):
raise ProtocolError("Received uppercase header name %s." % header[0])
yield header
def _reject_surrounding_whitespace(headers, hdr_validation_flags):
"""
Raises a ProtocolError if any header name or value is surrounded by
whitespace characters.
"""
for header in headers:
if not header[0][0] in _WHITESPACE:
if header[0][-1] in _WHITESPACE:
raise ProtocolError("Received header name surrounded by whitespace %r" % header[0])
if header[1]:
if header[1][0] in _WHITESPACE or header[1][-1] in _WHITESPACE:
raise ProtocolError("Received header value surrounded by whitespace %r" % header[1])
yield header
def _reject_te(headers, hdr_validation_flags):
"""
Raises a ProtocolError if the TE header is present in a header block and
its value is anything other than "trailers".
"""
for header in headers:
if header[0] in (b'te', 'te'):
if header[1].lower() not in (b'trailers', 'trailers'):
raise ProtocolError("Invalid value for Transfer-Encoding header: %s" % header[1])
yield header
def _reject_connection_header(headers, hdr_validation_flags):
"""
Raises a ProtocolError if the Connection header is present in a header
block.
"""
for header in headers:
if header[0] in CONNECTION_HEADERS:
raise ProtocolError("Connection-specific header field present: %s." % header[0])
yield header
def _custom_startswith(test_string, bytes_prefix, unicode_prefix):
"""
Given a string that might be a bytestring or a Unicode string,
return True if it starts with the appropriate prefix.
"""
if isinstance(test_string, bytes):
return test_string.startswith(bytes_prefix)
return test_string.startswith(unicode_prefix)
def _assert_header_in_set(string_header, bytes_header, header_set):
"""
Given a set of header names, checks whether the string or byte version of
the header name is present. Raises a Protocol error with the appropriate
error if it's missing.
"""
if not string_header in header_set:
if not bytes_header in header_set:
raise ProtocolError("Header block missing mandatory %s header" % string_header)
def _reject_pseudo_header_fields(headers, hdr_validation_flags):
"""
Raises a ProtocolError if duplicate pseudo-header fields are found in a
header block or if a pseudo-header field appears in a block after an
ordinary header field.
Raises a ProtocolError if pseudo-header fields are found in trailers.
"""
seen_pseudo_header_fields = set()
seen_regular_header = False
for header in headers:
if _custom_startswith(header[0], b':', ":"):
if header[0] in seen_pseudo_header_fields:
raise ProtocolError("Received duplicate pseudo-header field %s" % header[0])
seen_pseudo_header_fields.add(header[0])
if seen_regular_header:
raise ProtocolError("Received pseudo-header field out of sequence: %s" % header[0])
if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS:
raise ProtocolError("Received custom pseudo-header field %s" % header[0])
else:
seen_regular_header = True
yield header
_check_pseudo_header_field_acceptability(seen_pseudo_header_fields, hdr_validation_flags)
def _check_pseudo_header_field_acceptability(pseudo_headers, hdr_validation_flags):
"""
Given the set of pseudo-headers present in a header block and the
validation flags, confirms that RFC 7540 allows them.
"""
if hdr_validation_flags.is_trailer:
if pseudo_headers:
raise ProtocolError("Received pseudo-header in trailer %s" % pseudo_headers)
elif hdr_validation_flags.is_response_header:
_assert_header_in_set(":status", b':status', pseudo_headers)
invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS
if invalid_response_headers:
raise ProtocolError("Encountered request-only headers %s" % invalid_response_headers)
elif not hdr_validation_flags.is_response_header:
if not hdr_validation_flags.is_trailer:
_assert_header_in_set(":path", b':path', pseudo_headers)
_assert_header_in_set(":method", b':method', pseudo_headers)
_assert_header_in_set(":scheme", b':scheme', pseudo_headers)
invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS
if invalid_request_headers:
raise ProtocolError("Encountered response-only headers %s" % invalid_request_headers)
def _validate_host_authority_header(headers):
"""
Given the :authority and Host headers from a request block that isn't
a trailer, check that:
1. At least one of these headers is set.
2. If both headers are set, they match.
:param headers: The HTTP header set.
:raises: ``ProtocolError``
"""
authority_header_val = None
host_header_val = None
for header in headers:
if header[0] in (b':authority', ':authority'):
authority_header_val = header[1]
else:
if header[0] in (b'host', 'host'):
host_header_val = header[1]
yield header
authority_present = authority_header_val is not None
host_present = host_header_val is not None
if not authority_present:
if not host_present:
raise ProtocolError("Request header block does not have an :authority or Host header.")
if authority_present:
if host_present:
if authority_header_val != host_header_val:
raise ProtocolError("Request header block has mismatched :authority and Host headers: %r / %r" % (
authority_header_val, host_header_val))
def _check_host_authority_header(headers, hdr_validation_flags):
"""
Raises a ProtocolError if a header block arrives that does not contain an
:authority or a Host header, or if a header block contains both fields,
but their values do not match.
"""
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
if skip_validation:
return headers
return _validate_host_authority_header(headers)
def _check_path_header(headers, hdr_validation_flags):
"""
Raise a ProtocolError if a header block arrives or is sent that contains an
empty :path header.
"""
def inner():
for header in headers:
if header[0] in (b':path', ':path'):
if not header[1]:
raise ProtocolError("An empty :path header is forbidden")
yield header
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
if skip_validation:
return headers
return inner()
def _lowercase_header_names(headers, hdr_validation_flags):
"""
Given an iterable of header two-tuples, rebuilds that iterable with the
header names lowercased. This generator produces tuples that preserve the
original type of the header tuple for tuple and any ``HeaderTuple``.
"""
for header in headers:
if isinstance(header, HeaderTuple):
yield header.__class__(header[0].lower(), header[1])
else:
yield (
header[0].lower(), header[1])
def _strip_surrounding_whitespace(headers, hdr_validation_flags):
"""
Given an iterable of header two-tuples, strip both leading and trailing
whitespace from both header names and header values. This generator
produces tuples that preserve the original type of the header tuple for
tuple and any ``HeaderTuple``.
"""
for header in headers:
if isinstance(header, HeaderTuple):
yield header.__class__(header[0].strip(), header[1].strip())
else:
yield (
header[0].strip(), header[1].strip())
def _strip_connection_headers(headers, hdr_validation_flags):
"""
Strip any connection headers as per RFC7540 § 8.1.2.2.
"""
for header in headers:
if header[0] not in CONNECTION_HEADERS:
yield header
def _check_sent_host_authority_header(headers, hdr_validation_flags):
"""
Raises an InvalidHeaderBlockError if we try to send a header block
that does not contain an :authority or a Host header, or if
the header block contains both fields, but their values do not match.
"""
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
if skip_validation:
return headers
return _validate_host_authority_header(headers)
def normalize_outbound_headers(headers, hdr_validation_flags):
"""
Normalizes a header sequence that we are about to send.
:param headers: The HTTP header set.
:param hdr_validation_flags: An instance of HeaderValidationFlags.
"""
headers = _lowercase_header_names(headers, hdr_validation_flags)
headers = _strip_surrounding_whitespace(headers, hdr_validation_flags)
headers = _strip_connection_headers(headers, hdr_validation_flags)
headers = _secure_headers(headers, hdr_validation_flags)
return headers
def validate_outbound_headers(headers, hdr_validation_flags):
"""
Validates and normalizes a header sequence that we are about to send.
:param headers: The HTTP header set.
:param hdr_validation_flags: An instance of HeaderValidationFlags.
"""
headers = _reject_te(headers, hdr_validation_flags)
headers = _reject_connection_header(headers, hdr_validation_flags)
headers = _reject_pseudo_header_fields(headers, hdr_validation_flags)
headers = _check_sent_host_authority_header(headers, hdr_validation_flags)
headers = _check_path_header(headers, hdr_validation_flags)
return headers