diff --git a/python/svix/webhooks.py b/python/svix/webhooks.py index 62feb3597..910718ba2 100644 --- a/python/svix/webhooks.py +++ b/python/svix/webhooks.py @@ -18,8 +18,9 @@ class WebhookVerificationError(Exception): class Webhook: _SECRET_PREFIX: str = "whsec_" _whsecret: bytes + _whtolerance: timedelta - def __init__(self, whsecret: t.Union[str, bytes]): + def __init__(self, whsecret: t.Union[str, bytes], whtolerance: timedelta = timedelta(minutes=5)): if not whsecret: raise RuntimeError("Secret can't be empty.") @@ -30,6 +31,8 @@ def __init__(self, whsecret: t.Union[str, bytes]): if isinstance(whsecret, bytes): self._whsecret = whsecret + + self._whtolerance = whtolerance def verify(self, data: t.Union[bytes, str], headers: t.Dict[str, str]) -> t.Any: data = data if isinstance(data, str) else data.decode() @@ -67,15 +70,14 @@ def sign(self, msg_id: str, timestamp: datetime, data: str) -> str: return f"v1,{base64.b64encode(signature).decode('utf-8')}" def __verify_timestamp(self, timestamp_header: str) -> datetime: - webhook_tolerance = timedelta(minutes=5) now = datetime.now(tz=timezone.utc) try: timestamp = datetime.fromtimestamp(float(timestamp_header), tz=timezone.utc) except Exception: raise WebhookVerificationError("Invalid Signature Headers") - if timestamp < (now - webhook_tolerance): + if timestamp < (now - self._whtolerance): raise WebhookVerificationError("Message timestamp too old") - if timestamp > (now + webhook_tolerance): + if timestamp > (now + self._whtolerance): raise WebhookVerificationError("Message timestamp too new") return timestamp diff --git a/python/tests/test_webhooks.py b/python/tests/test_webhooks.py index 73cef9e93..8ad481116 100644 --- a/python/tests/test_webhooks.py +++ b/python/tests/test_webhooks.py @@ -177,3 +177,18 @@ def test_sign_function(): wh = Webhook(key) signature = wh.sign(msg_id=msg_id, timestamp=timestamp, data=payload) assert signature == expected + + +def test_default_tolerance_is_five_minutes(): + testPayload = PayloadForTesting() + wh = Webhook(testPayload.secret) + + assert wh._whtolerance == timedelta(minutes=5) + + +def test_custom_tolerance_is_set(): + testPayload = PayloadForTesting() + custom_tolerance = timedelta(minutes=10) + wh = Webhook(testPayload.secret, custom_tolerance) + + assert wh._whtolerance == custom_tolerance