1- #
1+ #
22# Copyright 2024 ABSA Group Limited
3- #
3+ #
44# Licensed under the Apache License, Version 2.0 (the "License");
55# you may not use this file except in compliance with the License.
66# You may obtain a copy of the License at
7- #
7+ #
88# http://www.apache.org/licenses/LICENSE-2.0
9- #
9+ #
1010# Unless required by applicable law or agreed to in writing, software
1111# distributed under the License is distributed on an "AS IS" BASIS,
1212# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313# See the License for the specific language governing permissions and
1414# limitations under the License.
15- #
15+ #
1616import base64
1717import json
1818import logging
2727from jsonschema import validate
2828from jsonschema .exceptions import ValidationError
2929
30- # Resolve project root (parent directory of this file's directory)
31- _PROJECT_ROOT = os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' ))
32- _CONF_DIR = os .path .join (_PROJECT_ROOT , 'conf' )
30+ from . import conf_path # new import for CONF_DIR resolution
31+ try : # fallback if relative import fails (e.g., executed as a script)
32+ from . import conf_path as _conf_mod
33+ except Exception : # pragma: no cover
34+ import conf_path as _conf_mod
35+ conf_path = _conf_mod
36+
37+ # Remove old resolution logic, use module instead
38+ _CONF_DIR = conf_path .CONF_DIR
39+ _INVALID_CONF_ENV = conf_path .INVALID_CONF_ENV
3340
3441sys .path .append (os .path .join (os .path .dirname (__file__ )))
3542
4451logger .setLevel (log_level )
4552logger .addHandler (logging .StreamHandler ())
4653logger .debug ("Initialized LOGGER" )
54+ logger .debug (f"Using CONF_DIR={ _CONF_DIR } " )
55+ if _INVALID_CONF_ENV :
56+ logger .warning (
57+ f"CONF_DIR env var set to non-existent path: { _INVALID_CONF_ENV } ; fell back to { _CONF_DIR } "
58+ )
4759
48- with open (os .path .join (_CONF_DIR , ' api.yaml' ), 'r' ) as file :
60+ with open (os .path .join (_CONF_DIR , " api.yaml" ), "r" ) as file :
4961 API = file .read ()
5062logger .debug ("Loaded API definition" )
5163
5264TOPICS = {}
53- with open (os .path .join (_CONF_DIR , ' topic_runs.json' ), 'r' ) as file :
65+ with open (os .path .join (_CONF_DIR , " topic_runs.json" ), "r" ) as file :
5466 TOPICS ["public.cps.za.runs" ] = json .load (file )
55- with open (os .path .join (_CONF_DIR , ' topic_dlchange.json' ), 'r' ) as file :
67+ with open (os .path .join (_CONF_DIR , " topic_dlchange.json" ), "r" ) as file :
5668 TOPICS ["public.cps.za.dlchange" ] = json .load (file )
57- with open (os .path .join (_CONF_DIR , ' topic_test.json' ), 'r' ) as file :
69+ with open (os .path .join (_CONF_DIR , " topic_test.json" ), "r" ) as file :
5870 TOPICS ["public.cps.za.test" ] = json .load (file )
5971logger .debug ("Loaded TOPICS" )
6072
61- with open (os .path .join (_CONF_DIR , ' config.json' ), 'r' ) as file :
73+ with open (os .path .join (_CONF_DIR , " config.json" ), "r" ) as file :
6274 CONFIG = json .load (file )
6375logger .debug ("Loaded main CONFIG" )
6476
65- aws_s3 = boto3 .Session ().resource ('s3' , verify = False )
77+ aws_s3 = boto3 .Session ().resource ("s3" , verify = False )
6678logger .debug ("Initialized AWS S3 Client" )
6779
6880if CONFIG ["access_config" ].startswith ("s3://" ):
69- name_parts = CONFIG ["access_config" ].split ('/' )
81+ name_parts = CONFIG ["access_config" ].split ("/" )
7082 bucket_name = name_parts [2 ]
7183 bucket_object = "/" .join (name_parts [3 :])
72- ACCESS = json .loads (aws_s3 .Bucket (bucket_name ).Object (bucket_object ).get ()["Body" ].read ().decode ("utf-8" ))
84+ ACCESS = json .loads (
85+ aws_s3 .Bucket (bucket_name )
86+ .Object (bucket_object )
87+ .get ()["Body" ]
88+ .read ()
89+ .decode ("utf-8" )
90+ )
7391else :
7492 with open (CONFIG ["access_config" ], "r" ) as file :
7593 ACCESS = json .load (file )
7694logger .debug ("Loaded ACCESS definitions" )
7795
7896TOKEN_PROVIDER_URL = CONFIG ["token_provider_url" ]
79- token_public_key_encoded = requests .get (CONFIG ["token_public_key_url" ], verify = False ).json ()["key" ]
80- TOKEN_PUBLIC_KEY = serialization .load_der_public_key (base64 .b64decode (token_public_key_encoded ))
97+ token_public_key_encoded = requests .get (
98+ CONFIG ["token_public_key_url" ], verify = False
99+ ).json ()["key" ]
100+ TOKEN_PUBLIC_KEY = serialization .load_der_public_key (
101+ base64 .b64decode (token_public_key_encoded )
102+ )
81103logger .debug ("Loaded TOKEN_PUBLIC_KEY" )
82104
83105writer_eventbridge .init (logger , CONFIG )
84106writer_kafka .init (logger , CONFIG )
85107writer_postgres .init (logger )
86108
109+
87110def _error_response (status , err_type , message ):
88111 return {
89112 "statusCode" : status ,
90113 "headers" : {"Content-Type" : "application/json" },
91- "body" : json .dumps ({
92- "success" : False ,
93- "statusCode" : status ,
94- "errors" : [{"type" : err_type , "message" : message }]
95- })
114+ "body" : json .dumps (
115+ {
116+ "success" : False ,
117+ "statusCode" : status ,
118+ "errors" : [{"type" : err_type , "message" : message }],
119+ }
120+ ),
96121 }
97122
123+
98124def get_api ():
99- return {
100- "statusCode" : 200 ,
101- "body" : API
102- }
125+ return {"statusCode" : 200 , "body" : API }
126+
103127
104128def get_token ():
105129 logger .debug ("Handling GET Token" )
106- return {
107- "statusCode" : 303 ,
108- "headers" : {"Location" : TOKEN_PROVIDER_URL }
109- }
110-
130+ return {"statusCode" : 303 , "headers" : {"Location" : TOKEN_PROVIDER_URL }}
131+
132+
111133def get_topics ():
112134 logger .debug ("Handling GET Topics" )
113135 return {
114136 "statusCode" : 200 ,
115137 "headers" : {"Content-Type" : "application/json" },
116- "body" : json .dumps ([topicName for topicName in TOPICS ])
138+ "body" : json .dumps ([topicName for topicName in TOPICS ]),
117139 }
118-
140+
141+
119142def get_topic_schema (topicName ):
120143 logger .debug (f"Handling GET TopicSchema({ topicName } )" )
121144 if topicName not in TOPICS :
122145 return _error_response (404 , "topic" , f"Topic '{ topicName } ' not found" )
123-
146+
124147 return {
125148 "statusCode" : 200 ,
126149 "headers" : {"Content-Type" : "application/json" },
127- "body" : json .dumps (TOPICS [topicName ])
150+ "body" : json .dumps (TOPICS [topicName ]),
128151 }
129152
153+
130154def post_topic_message (topicName , topicMessage , tokenEncoded ):
131155 logger .debug (f"Handling POST { topicName } " )
132156 try :
133157 token = jwt .decode (tokenEncoded , TOKEN_PUBLIC_KEY , algorithms = ["RS256" ])
134158 except Exception :
135- return _error_response (401 , "auth" , "Invalid or missing token" )
159+ return _error_response (401 , "auth" , "Invalid or missing token" )
136160
137161 if topicName not in TOPICS :
138162 return _error_response (404 , "topic" , f"Topic '{ topicName } ' not found" )
@@ -144,8 +168,8 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
144168 try :
145169 validate (instance = topicMessage , schema = TOPICS [topicName ])
146170 except ValidationError as e :
147- return _error_response (400 , "validation" , e .message )
148-
171+ return _error_response (400 , "validation" , e .message )
172+
149173 # Run all writers independently (avoid short-circuit so failures in one don't skip others)
150174 kafka_ok , kafka_err = writer_kafka .write (topicName , topicMessage )
151175 eventbridge_ok , eventbridge_err = writer_eventbridge .write (topicName , topicMessage )
@@ -163,31 +187,28 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
163187 return {
164188 "statusCode" : 500 ,
165189 "headers" : {"Content-Type" : "application/json" },
166- "body" : json .dumps ({
167- "success" : False ,
168- "statusCode" : 500 ,
169- "errors" : errors
170- })
190+ "body" : json .dumps ({"success" : False , "statusCode" : 500 , "errors" : errors }),
171191 }
172192
173193 return {
174194 "statusCode" : 202 ,
175195 "headers" : {"Content-Type" : "application/json" },
176- "body" : json .dumps ({
177- "success" : True ,
178- "statusCode" : 202
179- })
196+ "body" : json .dumps ({"success" : True , "statusCode" : 202 }),
180197 }
181198
199+
182200def extract_token (eventHeaders ):
183201 # Initial implementation used bearer header directly
184202 if "bearer" in eventHeaders :
185203 return eventHeaders ["bearer" ]
186-
187- if "Authorization" in eventHeaders and eventHeaders ["Authorization" ].startswith ("Bearer " ):
188- return eventHeaders ["Authorization" ][len ("Bearer " ):]
189-
190- return "" # Will result in 401
204+
205+ if "Authorization" in eventHeaders and eventHeaders ["Authorization" ].startswith (
206+ "Bearer "
207+ ):
208+ return eventHeaders ["Authorization" ][len ("Bearer " ) :]
209+
210+ return "" # Will result in 401
211+
191212
192213def lambda_handler (event , context ):
193214 try :
@@ -201,7 +222,11 @@ def lambda_handler(event, context):
201222 if event ["httpMethod" ] == "GET" :
202223 return get_topic_schema (event ["pathParameters" ]["topic_name" ].lower ())
203224 if event ["httpMethod" ] == "POST" :
204- return post_topic_message (event ["pathParameters" ]["topic_name" ].lower (), json .loads (event ["body" ]), extract_token (event ["headers" ]))
225+ return post_topic_message (
226+ event ["pathParameters" ]["topic_name" ].lower (),
227+ json .loads (event ["body" ]),
228+ extract_token (event ["headers" ]),
229+ )
205230 if event ["resource" ].lower () == "/terminate" :
206231 sys .exit ("TERMINATING" )
207232 return _error_response (404 , "route" , "Resource not found" )
0 commit comments