171 lines
7.1 KiB
Python
171 lines
7.1 KiB
Python
# uncompyle6 version 3.9.2
|
|
# Python bytecode version base 3.7.0 (3394)
|
|
# Decompiled from: Python 3.8.19 (default, Mar 20 2024, 15:27:52)
|
|
# [Clang 14.0.6 ]
|
|
# Embedded file name: /var/user/app/device_supervisorbak/device_supervisor/lib/jwt/api_jwt.py
|
|
# Compiled at: 2024-04-18 03:12:55
|
|
# Size of source mod 2**32: 8127 bytes
|
|
import json, warnings
|
|
from calendar import timegm
|
|
from datetime import datetime, timedelta
|
|
try:
|
|
from typing import Callable, Dict, List, Optional, Union
|
|
except ImportError:
|
|
pass
|
|
|
|
from .api_jws import PyJWS
|
|
from .algorithms import Algorithm, get_default_algorithms
|
|
from .compat import Iterable, Mapping, string_types
|
|
from .exceptions import DecodeError, ExpiredSignatureError, ImmatureSignatureError, InvalidAudienceError, InvalidIssuedAtError, InvalidIssuerError, MissingRequiredClaimError
|
|
from .utils import merge_dict
|
|
|
|
class PyJWT(PyJWS):
|
|
header_type = "JWT"
|
|
|
|
@staticmethod
|
|
def _get_default_options():
|
|
return {
|
|
'verify_signature': True,
|
|
'verify_exp': True,
|
|
'verify_nbf': True,
|
|
'verify_iat': True,
|
|
'verify_aud': True,
|
|
'verify_iss': True,
|
|
'require_exp': False,
|
|
'require_iat': False,
|
|
'require_nbf': False}
|
|
|
|
def encode(self, payload, key, algorithm='HS256', headers=None, json_encoder=None):
|
|
if not isinstance(payload, Mapping):
|
|
raise TypeError("Expecting a mapping object, as JWT only supports JSON objects as payloads.")
|
|
for time_claim in ('exp', 'iat', 'nbf'):
|
|
if isinstance(payload.get(time_claim), datetime):
|
|
payload[time_claim] = timegm(payload[time_claim].utctimetuple())
|
|
|
|
json_payload = json.dumps(payload,
|
|
separators=(',', ':'),
|
|
cls=json_encoder).encode("utf-8")
|
|
return super(PyJWT, self).encode(json_payload, key, algorithm, headers, json_encoder)
|
|
|
|
def decode(self, jwt, key='', verify=True, algorithms=None, options=None, **kwargs):
|
|
if verify:
|
|
if not algorithms:
|
|
warnings.warn('It is strongly recommended that you pass in a value for the "algorithms" argument when calling decode(). This argument will be mandatory in a future version.', DeprecationWarning)
|
|
else:
|
|
payload, _, _, _ = self._load(jwt)
|
|
if options is None:
|
|
options = {"verify_signature": verify}
|
|
else:
|
|
options.setdefault("verify_signature", verify)
|
|
decoded = (super(PyJWT, self).decode)(jwt, key=key, algorithms=algorithms, options=options, **kwargs)
|
|
try:
|
|
payload = json.loads(decoded.decode("utf-8"))
|
|
except ValueError as e:
|
|
try:
|
|
raise DecodeError("Invalid payload string: %s" % e)
|
|
finally:
|
|
e = None
|
|
del e
|
|
|
|
if not isinstance(payload, Mapping):
|
|
raise DecodeError("Invalid payload string: must be a json object")
|
|
if verify:
|
|
merged_options = merge_dict(self.options, options)
|
|
(self._validate_claims)(payload, merged_options, **kwargs)
|
|
return payload
|
|
|
|
def _validate_claims(self, payload, options, audience=None, issuer=None, leeway=0, **kwargs):
|
|
if "verify_expiration" in kwargs:
|
|
options["verify_exp"] = kwargs.get("verify_expiration", True)
|
|
warnings.warn("The verify_expiration parameter is deprecated. Please use verify_exp in options instead.", DeprecationWarning)
|
|
else:
|
|
if isinstance(leeway, timedelta):
|
|
leeway = leeway.total_seconds()
|
|
elif not isinstance(audience, (string_types, type(None), Iterable)):
|
|
raise TypeError("audience must be a string, iterable, or None")
|
|
self._validate_required_claims(payload, options)
|
|
now = timegm(datetime.utcnow().utctimetuple())
|
|
if "iat" in payload:
|
|
if options.get("verify_iat"):
|
|
self._validate_iat(payload, now, leeway)
|
|
if "nbf" in payload and options.get("verify_nbf"):
|
|
self._validate_nbf(payload, now, leeway)
|
|
if "exp" in payload and options.get("verify_exp"):
|
|
self._validate_exp(payload, now, leeway)
|
|
if options.get("verify_iss"):
|
|
self._validate_iss(payload, issuer)
|
|
if options.get("verify_aud"):
|
|
self._validate_aud(payload, audience)
|
|
|
|
def _validate_required_claims(self, payload, options):
|
|
if options.get("require_exp"):
|
|
if payload.get("exp") is None:
|
|
raise MissingRequiredClaimError("exp")
|
|
elif options.get("require_iat"):
|
|
if payload.get("iat") is None:
|
|
raise MissingRequiredClaimError("iat")
|
|
if options.get("require_nbf") and payload.get("nbf") is None:
|
|
raise MissingRequiredClaimError("nbf")
|
|
|
|
def _validate_iat(self, payload, now, leeway):
|
|
try:
|
|
int(payload["iat"])
|
|
except ValueError:
|
|
raise InvalidIssuedAtError("Issued At claim (iat) must be an integer.")
|
|
|
|
def _validate_nbf(self, payload, now, leeway):
|
|
try:
|
|
nbf = int(payload["nbf"])
|
|
except ValueError:
|
|
raise DecodeError("Not Before claim (nbf) must be an integer.")
|
|
|
|
if nbf > now + leeway:
|
|
raise ImmatureSignatureError("The token is not yet valid (nbf)")
|
|
|
|
def _validate_exp(self, payload, now, leeway):
|
|
try:
|
|
exp = int(payload["exp"])
|
|
except ValueError:
|
|
raise DecodeError("Expiration Time claim (exp) must be an integer.")
|
|
|
|
if exp < now - leeway:
|
|
raise ExpiredSignatureError("Signature has expired")
|
|
|
|
def _validate_aud(self, payload, audience):
|
|
if audience is None:
|
|
if "aud" not in payload:
|
|
return
|
|
elif audience is not None and "aud" not in payload:
|
|
raise MissingRequiredClaimError("aud")
|
|
if audience is None:
|
|
if "aud" in payload:
|
|
raise InvalidAudienceError("Invalid audience")
|
|
audience_claims = payload["aud"]
|
|
if isinstance(audience_claims, string_types):
|
|
audience_claims = [
|
|
audience_claims]
|
|
elif not isinstance(audience_claims, list):
|
|
raise InvalidAudienceError("Invalid claim format in token")
|
|
if any((not isinstance(c, string_types) for c in audience_claims)):
|
|
raise InvalidAudienceError("Invalid claim format in token")
|
|
if isinstance(audience, string_types):
|
|
audience = [
|
|
audience]
|
|
assert any((aud in audience_claims for aud in audience)), "Invalid audience"
|
|
|
|
def _validate_iss(self, payload, issuer):
|
|
if issuer is None:
|
|
return
|
|
if "iss" not in payload:
|
|
raise MissingRequiredClaimError("iss")
|
|
if payload["iss"] != issuer:
|
|
raise InvalidIssuerError("Invalid issuer")
|
|
|
|
|
|
_jwt_global_obj = PyJWT()
|
|
encode = _jwt_global_obj.encode
|
|
decode = _jwt_global_obj.decode
|
|
register_algorithm = _jwt_global_obj.register_algorithm
|
|
unregister_algorithm = _jwt_global_obj.unregister_algorithm
|
|
get_unverified_header = _jwt_global_obj.get_unverified_header
|