-
Notifications
You must be signed in to change notification settings - Fork 387
Expand file tree
/
Copy pathsession_token.py
More file actions
87 lines (59 loc) · 2.55 KB
/
session_token.py
File metadata and controls
87 lines (59 loc) · 2.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import jwt
import re
import six
import sys
from shopify.utils import shop_url
if sys.version_info[0] < 3: # Backwards compatibility for python < v3.0.0
from urlparse import urljoin
else:
from urllib.parse import urljoin
ALGORITHM = "HS256"
PREFIX = "Bearer "
REQUIRED_FIELDS = ["iss", "dest", "sub", "jti", "sid"]
EXTENSION_REQUIRED_FIELDS = ["aud", "dest", "jti", "exp", "nbf", "iat"]
LEEWAY_SECONDS = 10
class SessionTokenError(Exception):
pass
class InvalidIssuerError(SessionTokenError):
pass
class MismatchedHostsError(SessionTokenError):
pass
class TokenAuthenticationError(SessionTokenError):
pass
def decode_from_header(authorization_header, api_key, secret, is_extension=False):
session_token = _extract_session_token(authorization_header)
decoded_payload = _decode_session_token(session_token, api_key, secret, is_extension)
# skip validation for tokens coming from ui-extensions
_validate_issuer(decoded_payload) if not is_extension else None
return decoded_payload
def _extract_session_token(authorization_header):
if not authorization_header.startswith(PREFIX):
raise TokenAuthenticationError("The HTTP_AUTHORIZATION_HEADER provided does not contain a Bearer token")
return authorization_header[len(PREFIX) :]
def _decode_session_token(session_token, api_key, secret, is_extension):
required_fields = EXTENSION_REQUIRED_FIELDS if is_extension else REQUIRED_FIELDS
try:
return jwt.decode(
session_token,
secret,
audience=api_key,
algorithms=[ALGORITHM],
# AppBridge frequently sends future `nbf`, and it causes `ImmatureSignatureError`.
# Accept few seconds clock skew to avoid this error.
leeway=LEEWAY_SECONDS,
options={"require": required_fields},
)
except jwt.exceptions.PyJWTError as exception:
six.raise_from(SessionTokenError(str(exception)), exception)
def _validate_issuer(decoded_payload):
_validate_issuer_hostname(decoded_payload)
_validate_issuer_and_dest_match(decoded_payload)
def _validate_issuer_hostname(decoded_payload):
issuer_root = urljoin(decoded_payload["iss"], "/")
if not shop_url.sanitize_shop_domain(issuer_root):
raise InvalidIssuerError("Invalid issuer")
def _validate_issuer_and_dest_match(decoded_payload):
issuer_root = urljoin(decoded_payload["iss"], "/")
dest_root = urljoin(decoded_payload["dest"], "/")
if issuer_root != dest_root:
raise MismatchedHostsError("The issuer and destination do not match")