431 lines
16 KiB
Python
431 lines
16 KiB
Python
# uncompyle6 version 3.9.2
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
|
|
# [Clang 14.0.6 ]
|
|
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/h2/utilities.py
|
|
# Compiled at: 2024-04-18 03:12:55
|
|
# Size of source mod 2**32: 20128 bytes
|
|
"""
|
|
h2/utilities
|
|
~~~~~~~~~~~~
|
|
|
|
Utility functions that do not belong in a separate module.
|
|
"""
|
|
import collections, re
|
|
from string import whitespace
|
|
import sys
|
|
from hpack import HeaderTuple, NeverIndexedHeaderTuple
|
|
from .exceptions import ProtocolError, FlowControlError
|
|
UPPER_RE = re.compile(b'[A-Z]')
|
|
CONNECTION_HEADERS = frozenset([
|
|
b'connection', 'connection',
|
|
b'proxy-connection', 'proxy-connection',
|
|
b'keep-alive',
|
|
'keep-alive',
|
|
b'transfer-encoding', 'transfer-encoding',
|
|
b'upgrade',
|
|
'upgrade'])
|
|
_ALLOWED_PSEUDO_HEADER_FIELDS = frozenset([
|
|
b':method', ':method',
|
|
b':scheme', ':scheme',
|
|
b':authority', ':authority',
|
|
b':path',
|
|
':path',
|
|
b':status', ':status'])
|
|
_SECURE_HEADERS = frozenset([
|
|
b'authorization', "authorization",
|
|
b'proxy-authorization', "proxy-authorization"])
|
|
_REQUEST_ONLY_HEADERS = frozenset([
|
|
b':scheme', ':scheme',
|
|
b':path', ':path',
|
|
b':authority', ':authority',
|
|
b':method',
|
|
':method'])
|
|
_RESPONSE_ONLY_HEADERS = frozenset([b':status', ":status"])
|
|
if sys.version_info[0] == 2:
|
|
_WHITESPACE = frozenset(whitespace)
|
|
else:
|
|
_WHITESPACE = frozenset(map(ord, whitespace))
|
|
|
|
def _secure_headers(headers, hdr_validation_flags):
|
|
"""
|
|
Certain headers are at risk of being attacked during the header compression
|
|
phase, and so need to be kept out of header compression contexts. This
|
|
function automatically transforms certain specific headers into HPACK
|
|
never-indexed fields to ensure they don't get added to header compression
|
|
contexts.
|
|
|
|
This function currently implements two rules:
|
|
|
|
- 'authorization' and 'proxy-authorization' fields are automatically made
|
|
never-indexed.
|
|
- Any 'cookie' header field shorter than 20 bytes long is made
|
|
never-indexed.
|
|
|
|
These fields are the most at-risk. These rules are inspired by Firefox
|
|
and nghttp2.
|
|
"""
|
|
for header in headers:
|
|
if header[0] in _SECURE_HEADERS:
|
|
yield NeverIndexedHeaderTuple(*header)
|
|
elif header[0] in (b'cookie', 'cookie') and len(header[1]) < 20:
|
|
yield NeverIndexedHeaderTuple(*header)
|
|
else:
|
|
yield header
|
|
|
|
|
|
def extract_method_header(headers):
|
|
"""
|
|
Extracts the request method from the headers list.
|
|
"""
|
|
for k, v in headers:
|
|
if k in (b':method', ':method'):
|
|
if not isinstance(v, bytes):
|
|
return v.encode("utf-8")
|
|
return v
|
|
|
|
|
|
def is_informational_response(headers):
|
|
"""
|
|
Searches a header block for a :status header to confirm that a given
|
|
collection of headers are an informational response. Assumes the header
|
|
block is well formed: that is, that the HTTP/2 special headers are first
|
|
in the block, and so that it can stop looking when it finds the first
|
|
header field whose name does not begin with a colon.
|
|
|
|
:param headers: The HTTP/2 header block.
|
|
:returns: A boolean indicating if this is an informational response.
|
|
"""
|
|
for n, v in headers:
|
|
if isinstance(n, bytes):
|
|
sigil = b':'
|
|
status = b':status'
|
|
informational_start = b'1'
|
|
else:
|
|
sigil = ":"
|
|
status = ":status"
|
|
informational_start = "1"
|
|
if not n.startswith(sigil):
|
|
return False
|
|
if n != status:
|
|
continue
|
|
return v.startswith(informational_start)
|
|
|
|
|
|
def guard_increment_window(current, increment):
|
|
"""
|
|
Increments a flow control window, guarding against that window becoming too
|
|
large.
|
|
|
|
:param current: The current value of the flow control window.
|
|
:param increment: The increment to apply to that window.
|
|
:returns: The new value of the window.
|
|
:raises: ``FlowControlError``
|
|
"""
|
|
LARGEST_FLOW_CONTROL_WINDOW = 2147483647
|
|
new_size = current + increment
|
|
if new_size > LARGEST_FLOW_CONTROL_WINDOW:
|
|
raise FlowControlError("May not increment flow control window past %d" % LARGEST_FLOW_CONTROL_WINDOW)
|
|
return new_size
|
|
|
|
|
|
def authority_from_headers(headers):
|
|
"""
|
|
Given a header set, searches for the authority header and returns the
|
|
value.
|
|
|
|
Note that this doesn't terminate early, so should only be called if the
|
|
headers are for a client request. Otherwise, will loop over the entire
|
|
header set, which is potentially unwise.
|
|
|
|
:param headers: The HTTP header set.
|
|
:returns: The value of the authority header, or ``None``.
|
|
:rtype: ``bytes`` or ``None``.
|
|
"""
|
|
for n, v in headers:
|
|
if n in (b':authority', ':authority'):
|
|
if not isinstance(v, bytes):
|
|
return v.encode("utf-8")
|
|
return v
|
|
|
|
|
|
HeaderValidationFlags = collections.namedtuple("HeaderValidationFlags", [
|
|
"is_client", "is_trailer", "is_response_header", "is_push_promise"])
|
|
|
|
def validate_headers(headers, hdr_validation_flags):
|
|
"""
|
|
Validates a header sequence against a set of constraints from RFC 7540.
|
|
|
|
:param headers: The HTTP header set.
|
|
:param hdr_validation_flags: An instance of HeaderValidationFlags.
|
|
"""
|
|
headers = _reject_uppercase_header_fields(headers, hdr_validation_flags)
|
|
headers = _reject_surrounding_whitespace(headers, hdr_validation_flags)
|
|
headers = _reject_te(headers, hdr_validation_flags)
|
|
headers = _reject_connection_header(headers, hdr_validation_flags)
|
|
headers = _reject_pseudo_header_fields(headers, hdr_validation_flags)
|
|
headers = _check_host_authority_header(headers, hdr_validation_flags)
|
|
headers = _check_path_header(headers, hdr_validation_flags)
|
|
return list(headers)
|
|
|
|
|
|
def _reject_uppercase_header_fields(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if any uppercase character is found in a header
|
|
block.
|
|
"""
|
|
for header in headers:
|
|
if UPPER_RE.search(header[0]):
|
|
raise ProtocolError("Received uppercase header name %s." % header[0])
|
|
yield header
|
|
|
|
|
|
def _reject_surrounding_whitespace(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if any header name or value is surrounded by
|
|
whitespace characters.
|
|
"""
|
|
for header in headers:
|
|
if not header[0][0] in _WHITESPACE:
|
|
if header[0][-1] in _WHITESPACE:
|
|
raise ProtocolError("Received header name surrounded by whitespace %r" % header[0])
|
|
if header[1]:
|
|
if header[1][0] in _WHITESPACE or header[1][-1] in _WHITESPACE:
|
|
raise ProtocolError("Received header value surrounded by whitespace %r" % header[1])
|
|
yield header
|
|
|
|
|
|
def _reject_te(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if the TE header is present in a header block and
|
|
its value is anything other than "trailers".
|
|
"""
|
|
for header in headers:
|
|
if header[0] in (b'te', 'te'):
|
|
if header[1].lower() not in (b'trailers', 'trailers'):
|
|
raise ProtocolError("Invalid value for Transfer-Encoding header: %s" % header[1])
|
|
yield header
|
|
|
|
|
|
def _reject_connection_header(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if the Connection header is present in a header
|
|
block.
|
|
"""
|
|
for header in headers:
|
|
if header[0] in CONNECTION_HEADERS:
|
|
raise ProtocolError("Connection-specific header field present: %s." % header[0])
|
|
yield header
|
|
|
|
|
|
def _custom_startswith(test_string, bytes_prefix, unicode_prefix):
|
|
"""
|
|
Given a string that might be a bytestring or a Unicode string,
|
|
return True if it starts with the appropriate prefix.
|
|
"""
|
|
if isinstance(test_string, bytes):
|
|
return test_string.startswith(bytes_prefix)
|
|
return test_string.startswith(unicode_prefix)
|
|
|
|
|
|
def _assert_header_in_set(string_header, bytes_header, header_set):
|
|
"""
|
|
Given a set of header names, checks whether the string or byte version of
|
|
the header name is present. Raises a Protocol error with the appropriate
|
|
error if it's missing.
|
|
"""
|
|
if not string_header in header_set:
|
|
if not bytes_header in header_set:
|
|
raise ProtocolError("Header block missing mandatory %s header" % string_header)
|
|
|
|
|
|
def _reject_pseudo_header_fields(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if duplicate pseudo-header fields are found in a
|
|
header block or if a pseudo-header field appears in a block after an
|
|
ordinary header field.
|
|
|
|
Raises a ProtocolError if pseudo-header fields are found in trailers.
|
|
"""
|
|
seen_pseudo_header_fields = set()
|
|
seen_regular_header = False
|
|
for header in headers:
|
|
if _custom_startswith(header[0], b':', ":"):
|
|
if header[0] in seen_pseudo_header_fields:
|
|
raise ProtocolError("Received duplicate pseudo-header field %s" % header[0])
|
|
seen_pseudo_header_fields.add(header[0])
|
|
if seen_regular_header:
|
|
raise ProtocolError("Received pseudo-header field out of sequence: %s" % header[0])
|
|
if header[0] not in _ALLOWED_PSEUDO_HEADER_FIELDS:
|
|
raise ProtocolError("Received custom pseudo-header field %s" % header[0])
|
|
else:
|
|
seen_regular_header = True
|
|
yield header
|
|
|
|
_check_pseudo_header_field_acceptability(seen_pseudo_header_fields, hdr_validation_flags)
|
|
|
|
|
|
def _check_pseudo_header_field_acceptability(pseudo_headers, hdr_validation_flags):
|
|
"""
|
|
Given the set of pseudo-headers present in a header block and the
|
|
validation flags, confirms that RFC 7540 allows them.
|
|
"""
|
|
if hdr_validation_flags.is_trailer:
|
|
if pseudo_headers:
|
|
raise ProtocolError("Received pseudo-header in trailer %s" % pseudo_headers)
|
|
elif hdr_validation_flags.is_response_header:
|
|
_assert_header_in_set(":status", b':status', pseudo_headers)
|
|
invalid_response_headers = pseudo_headers & _REQUEST_ONLY_HEADERS
|
|
if invalid_response_headers:
|
|
raise ProtocolError("Encountered request-only headers %s" % invalid_response_headers)
|
|
elif not hdr_validation_flags.is_response_header:
|
|
if not hdr_validation_flags.is_trailer:
|
|
_assert_header_in_set(":path", b':path', pseudo_headers)
|
|
_assert_header_in_set(":method", b':method', pseudo_headers)
|
|
_assert_header_in_set(":scheme", b':scheme', pseudo_headers)
|
|
invalid_request_headers = pseudo_headers & _RESPONSE_ONLY_HEADERS
|
|
if invalid_request_headers:
|
|
raise ProtocolError("Encountered response-only headers %s" % invalid_request_headers)
|
|
|
|
|
|
def _validate_host_authority_header(headers):
|
|
"""
|
|
Given the :authority and Host headers from a request block that isn't
|
|
a trailer, check that:
|
|
1. At least one of these headers is set.
|
|
2. If both headers are set, they match.
|
|
|
|
:param headers: The HTTP header set.
|
|
:raises: ``ProtocolError``
|
|
"""
|
|
authority_header_val = None
|
|
host_header_val = None
|
|
for header in headers:
|
|
if header[0] in (b':authority', ':authority'):
|
|
authority_header_val = header[1]
|
|
else:
|
|
if header[0] in (b'host', 'host'):
|
|
host_header_val = header[1]
|
|
yield header
|
|
|
|
authority_present = authority_header_val is not None
|
|
host_present = host_header_val is not None
|
|
if not authority_present:
|
|
if not host_present:
|
|
raise ProtocolError("Request header block does not have an :authority or Host header.")
|
|
if authority_present:
|
|
if host_present:
|
|
if authority_header_val != host_header_val:
|
|
raise ProtocolError("Request header block has mismatched :authority and Host headers: %r / %r" % (
|
|
authority_header_val, host_header_val))
|
|
|
|
|
|
def _check_host_authority_header(headers, hdr_validation_flags):
|
|
"""
|
|
Raises a ProtocolError if a header block arrives that does not contain an
|
|
:authority or a Host header, or if a header block contains both fields,
|
|
but their values do not match.
|
|
"""
|
|
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
|
|
if skip_validation:
|
|
return headers
|
|
return _validate_host_authority_header(headers)
|
|
|
|
|
|
def _check_path_header(headers, hdr_validation_flags):
|
|
"""
|
|
Raise a ProtocolError if a header block arrives or is sent that contains an
|
|
empty :path header.
|
|
"""
|
|
|
|
def inner():
|
|
for header in headers:
|
|
if header[0] in (b':path', ':path'):
|
|
if not header[1]:
|
|
raise ProtocolError("An empty :path header is forbidden")
|
|
yield header
|
|
|
|
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
|
|
if skip_validation:
|
|
return headers
|
|
return inner()
|
|
|
|
|
|
def _lowercase_header_names(headers, hdr_validation_flags):
|
|
"""
|
|
Given an iterable of header two-tuples, rebuilds that iterable with the
|
|
header names lowercased. This generator produces tuples that preserve the
|
|
original type of the header tuple for tuple and any ``HeaderTuple``.
|
|
"""
|
|
for header in headers:
|
|
if isinstance(header, HeaderTuple):
|
|
yield header.__class__(header[0].lower(), header[1])
|
|
else:
|
|
yield (
|
|
header[0].lower(), header[1])
|
|
|
|
|
|
def _strip_surrounding_whitespace(headers, hdr_validation_flags):
|
|
"""
|
|
Given an iterable of header two-tuples, strip both leading and trailing
|
|
whitespace from both header names and header values. This generator
|
|
produces tuples that preserve the original type of the header tuple for
|
|
tuple and any ``HeaderTuple``.
|
|
"""
|
|
for header in headers:
|
|
if isinstance(header, HeaderTuple):
|
|
yield header.__class__(header[0].strip(), header[1].strip())
|
|
else:
|
|
yield (
|
|
header[0].strip(), header[1].strip())
|
|
|
|
|
|
def _strip_connection_headers(headers, hdr_validation_flags):
|
|
"""
|
|
Strip any connection headers as per RFC7540 § 8.1.2.2.
|
|
"""
|
|
for header in headers:
|
|
if header[0] not in CONNECTION_HEADERS:
|
|
yield header
|
|
|
|
|
|
def _check_sent_host_authority_header(headers, hdr_validation_flags):
|
|
"""
|
|
Raises an InvalidHeaderBlockError if we try to send a header block
|
|
that does not contain an :authority or a Host header, or if
|
|
the header block contains both fields, but their values do not match.
|
|
"""
|
|
skip_validation = hdr_validation_flags.is_response_header or hdr_validation_flags.is_trailer
|
|
if skip_validation:
|
|
return headers
|
|
return _validate_host_authority_header(headers)
|
|
|
|
|
|
def normalize_outbound_headers(headers, hdr_validation_flags):
|
|
"""
|
|
Normalizes a header sequence that we are about to send.
|
|
|
|
:param headers: The HTTP header set.
|
|
:param hdr_validation_flags: An instance of HeaderValidationFlags.
|
|
"""
|
|
headers = _lowercase_header_names(headers, hdr_validation_flags)
|
|
headers = _strip_surrounding_whitespace(headers, hdr_validation_flags)
|
|
headers = _strip_connection_headers(headers, hdr_validation_flags)
|
|
headers = _secure_headers(headers, hdr_validation_flags)
|
|
return headers
|
|
|
|
|
|
def validate_outbound_headers(headers, hdr_validation_flags):
|
|
"""
|
|
Validates and normalizes a header sequence that we are about to send.
|
|
|
|
:param headers: The HTTP header set.
|
|
:param hdr_validation_flags: An instance of HeaderValidationFlags.
|
|
"""
|
|
headers = _reject_te(headers, hdr_validation_flags)
|
|
headers = _reject_connection_header(headers, hdr_validation_flags)
|
|
headers = _reject_pseudo_header_fields(headers, hdr_validation_flags)
|
|
headers = _check_sent_host_authority_header(headers, hdr_validation_flags)
|
|
headers = _check_path_header(headers, hdr_validation_flags)
|
|
return headers
|