11import hmac
2- import base64
32import hashlib
4- from http import HTTPStatus
5- from datetime import datetime
3+ from datetime import datetime , timedelta
64
75from django .http import HttpRequest
8- from ninja .errors import HttpError
6+ from ninja .errors import AuthenticationError
97from ninja .security import APIKeyHeader
108from ninja .openapi .schema import OpenAPISchema
119
@@ -18,23 +16,24 @@ def __init__(self, logger: Logger) -> None:
1816 self .logger = logger
1917 super ().__init__ ()
2018
21- def authenticate (self , request : HttpRequest , key : str ) -> str | None :
22- api_key = request .headers .get ("X-API-GATEWAY-KEY" )
23- api_timestamp = request .headers .get ("X-API-GATEWAY-TIMESTAMP" )
24- api_signature = request .headers .get ("X-API-GATEWAY-SIGNATURE" )
25- user_id = request .headers .get ("X-USER-ID" )
26- user_email = request .headers .get ("X-USER-EMAIL" )
19+ def authenticate (self , request : HttpRequest , key : str | None ) -> str | None :
2720
28- if not all ([api_key , api_timestamp , api_signature , user_id , user_email ]):
29- message = "Missing required headers!"
21+ try :
22+ api_key = request .headers ["X-API-GATEWAY-KEY" ]
23+ api_timestamp = request .headers ["X-API-GATEWAY-TIMESTAMP" ]
24+ api_signature = request .headers ["X-API-GATEWAY-SIGNATURE" ]
25+ user_id = request .headers ["X-USER-ID" ]
26+ user_email = request .headers ["X-USER-EMAIL" ]
27+ except KeyError as e :
28+ message = f"Missing required header: { e } "
3029 self .logger .error (
3130 {
3231 "activity_type" : "Authenticate User" ,
3332 "message" : message ,
3433 "metadata" : {"headers" : request .headers },
3534 }
3635 )
37- raise HttpError ( HTTPStatus . UNAUTHORIZED , message )
36+ raise AuthenticationError ( message = message )
3837
3938 valid_api_key = api_gateway ["key" ]
4039 if api_key != valid_api_key :
@@ -46,9 +45,9 @@ def authenticate(self, request: HttpRequest, key: str) -> str | None:
4645 "metadata" : {"headers" : request .headers },
4746 }
4847 )
49- raise HttpError ( HTTPStatus . UNAUTHORIZED , message )
48+ raise AuthenticationError ( message = message )
5049
51- self ._verify_signature (valid_api_key , valid_api_key , api_timestamp )
50+ self ._verify_signature (valid_api_key , api_signature , api_timestamp )
5251
5352 self .logger .debug (
5453 {
@@ -80,13 +79,12 @@ def _verify_signature(
8079 "metadata" : {"signature" : signature },
8180 }
8281 )
83- raise HttpError ( HTTPStatus . UNAUTHORIZED , message )
82+ raise AuthenticationError ( message = message )
8483
85- timestamp = int (timestamp )
86- current_time = datetime .now ().timestamp () * 1000
87- lifespan = api_gateway ["expires_at" ] * 60 * 60 * 1000
88-
89- if abs (current_time - timestamp ) > lifespan :
84+ initial_time = datetime .fromtimestamp (int (timestamp ) / 1000 )
85+ valid_window = initial_time + timedelta (minutes = api_gateway ["expires_at" ])
86+ if valid_window < datetime .now ():
87+
9088 message = "Signature expired!"
9189 self .logger .error (
9290 {
@@ -95,15 +93,15 @@ def _verify_signature(
9593 "metadata" : {"timestamp" : timestamp },
9694 }
9795 )
98- raise HttpError ( HTTPStatus . UNAUTHORIZED , message )
96+ raise AuthenticationError ( message = message )
9997
10098 return True
10199
102100 def generate_signature (self , api_key : str , timestamp : str ) -> str :
103101 signature = hmac .new (
104102 key = api_key .encode (), msg = timestamp .encode (), digestmod = hashlib .sha256
105- ).digest ()
106- return base64 . b64encode ( signature ). decode ( "utf-8" )
103+ ).hexdigest ()
104+ return signature
107105
108106
109107def get_authentication () -> GateWayAuth :
0 commit comments