1- #
1+ #
22# Copyright 2024 ABSA Group Limited
3- #
3+ #
44# Licensed under the Apache License, Version 2.0 (the "License");
55# you may not use this file except in compliance with the License.
66# You may obtain a copy of the License at
7- #
7+ #
88# http://www.apache.org/licenses/LICENSE-2.0
9- #
9+ #
1010# Unless required by applicable law or agreed to in writing, software
1111# distributed under the License is distributed on an "AS IS" BASIS,
1212# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313# See the License for the specific language governing permissions and
1414# limitations under the License.
15- #
15+ #
1616import base64
1717import json
1818import logging
2727from jsonschema import validate
2828from jsonschema .exceptions import ValidationError
2929
30- # Resolve project root (parent directory of this file's directory)
31- _PROJECT_ROOT = os .path .abspath (os .path .join (os .path .dirname (__file__ ), '..' ))
32- _CONF_DIR = os .path .join (_PROJECT_ROOT , 'conf' )
30+ try :
31+ from .conf_path import CONF_DIR , INVALID_CONF_ENV
32+ except ImportError : # fallback when executed outside package context
33+ from conf_path import CONF_DIR , INVALID_CONF_ENV
34+
35+ # Use imported symbols for internal variables
36+ _CONF_DIR = CONF_DIR
37+ _INVALID_CONF_ENV = INVALID_CONF_ENV
3338
3439sys .path .append (os .path .join (os .path .dirname (__file__ )))
3540
4449logger .setLevel (log_level )
4550logger .addHandler (logging .StreamHandler ())
4651logger .debug ("Initialized LOGGER" )
52+ logger .debug (f"Using CONF_DIR={ _CONF_DIR } " )
53+ if _INVALID_CONF_ENV :
54+ logger .warning (
55+ f"CONF_DIR env var set to non-existent path: { _INVALID_CONF_ENV } ; fell back to { _CONF_DIR } "
56+ )
4757
48- with open (os .path .join (_CONF_DIR , ' api.yaml' ), 'r' ) as file :
58+ with open (os .path .join (_CONF_DIR , " api.yaml" ), "r" ) as file :
4959 API = file .read ()
5060logger .debug ("Loaded API definition" )
5161
5262TOPICS = {}
53- with open (os .path .join (_CONF_DIR , ' topic_runs.json' ), 'r' ) as file :
63+ with open (os .path .join (_CONF_DIR , " topic_runs.json" ), "r" ) as file :
5464 TOPICS ["public.cps.za.runs" ] = json .load (file )
55- with open (os .path .join (_CONF_DIR , ' topic_dlchange.json' ), 'r' ) as file :
65+ with open (os .path .join (_CONF_DIR , " topic_dlchange.json" ), "r" ) as file :
5666 TOPICS ["public.cps.za.dlchange" ] = json .load (file )
57- with open (os .path .join (_CONF_DIR , ' topic_test.json' ), 'r' ) as file :
67+ with open (os .path .join (_CONF_DIR , " topic_test.json" ), "r" ) as file :
5868 TOPICS ["public.cps.za.test" ] = json .load (file )
5969logger .debug ("Loaded TOPICS" )
6070
61- with open (os .path .join (_CONF_DIR , ' config.json' ), 'r' ) as file :
71+ with open (os .path .join (_CONF_DIR , " config.json" ), "r" ) as file :
6272 CONFIG = json .load (file )
6373logger .debug ("Loaded main CONFIG" )
6474
65- aws_s3 = boto3 .Session ().resource ('s3' , verify = False )
75+ aws_s3 = boto3 .Session ().resource ("s3" , verify = False )
6676logger .debug ("Initialized AWS S3 Client" )
6777
6878if CONFIG ["access_config" ].startswith ("s3://" ):
69- name_parts = CONFIG ["access_config" ].split ('/' )
79+ name_parts = CONFIG ["access_config" ].split ("/" )
7080 bucket_name = name_parts [2 ]
7181 bucket_object = "/" .join (name_parts [3 :])
72- ACCESS = json .loads (aws_s3 .Bucket (bucket_name ).Object (bucket_object ).get ()["Body" ].read ().decode ("utf-8" ))
82+ ACCESS = json .loads (
83+ aws_s3 .Bucket (bucket_name )
84+ .Object (bucket_object )
85+ .get ()["Body" ]
86+ .read ()
87+ .decode ("utf-8" )
88+ )
7389else :
7490 with open (CONFIG ["access_config" ], "r" ) as file :
7591 ACCESS = json .load (file )
7692logger .debug ("Loaded ACCESS definitions" )
7793
7894TOKEN_PROVIDER_URL = CONFIG ["token_provider_url" ]
79- token_public_key_encoded = requests .get (CONFIG ["token_public_key_url" ], verify = False ).json ()["key" ]
80- TOKEN_PUBLIC_KEY = serialization .load_der_public_key (base64 .b64decode (token_public_key_encoded ))
95+ token_public_key_encoded = requests .get (
96+ CONFIG ["token_public_key_url" ], verify = False
97+ ).json ()["key" ]
98+ TOKEN_PUBLIC_KEY = serialization .load_der_public_key (
99+ base64 .b64decode (token_public_key_encoded )
100+ )
81101logger .debug ("Loaded TOKEN_PUBLIC_KEY" )
82102
83103writer_eventbridge .init (logger , CONFIG )
84104writer_kafka .init (logger , CONFIG )
85105writer_postgres .init (logger )
86106
107+
87108def _error_response (status , err_type , message ):
88109 return {
89110 "statusCode" : status ,
90111 "headers" : {"Content-Type" : "application/json" },
91- "body" : json .dumps ({
92- "success" : False ,
93- "statusCode" : status ,
94- "errors" : [{"type" : err_type , "message" : message }]
95- })
112+ "body" : json .dumps (
113+ {
114+ "success" : False ,
115+ "statusCode" : status ,
116+ "errors" : [{"type" : err_type , "message" : message }],
117+ }
118+ ),
96119 }
97120
121+
98122def get_api ():
99- return {
100- "statusCode" : 200 ,
101- "body" : API
102- }
123+ return {"statusCode" : 200 , "body" : API }
124+
103125
104126def get_token ():
105127 logger .debug ("Handling GET Token" )
106- return {
107- "statusCode" : 303 ,
108- "headers" : {"Location" : TOKEN_PROVIDER_URL }
109- }
110-
128+ return {"statusCode" : 303 , "headers" : {"Location" : TOKEN_PROVIDER_URL }}
129+
130+
111131def get_topics ():
112132 logger .debug ("Handling GET Topics" )
113133 return {
114134 "statusCode" : 200 ,
115135 "headers" : {"Content-Type" : "application/json" },
116- "body" : json .dumps ([topicName for topicName in TOPICS ])
136+ "body" : json .dumps ([topicName for topicName in TOPICS ]),
117137 }
118-
138+
139+
119140def get_topic_schema (topicName ):
120141 logger .debug (f"Handling GET TopicSchema({ topicName } )" )
121142 if topicName not in TOPICS :
122143 return _error_response (404 , "topic" , f"Topic '{ topicName } ' not found" )
123-
144+
124145 return {
125146 "statusCode" : 200 ,
126147 "headers" : {"Content-Type" : "application/json" },
127- "body" : json .dumps (TOPICS [topicName ])
148+ "body" : json .dumps (TOPICS [topicName ]),
128149 }
129150
151+
130152def post_topic_message (topicName , topicMessage , tokenEncoded ):
131153 logger .debug (f"Handling POST { topicName } " )
132154 try :
133155 token = jwt .decode (tokenEncoded , TOKEN_PUBLIC_KEY , algorithms = ["RS256" ])
134156 except Exception :
135- return _error_response (401 , "auth" , "Invalid or missing token" )
157+ return _error_response (401 , "auth" , "Invalid or missing token" )
136158
137159 if topicName not in TOPICS :
138160 return _error_response (404 , "topic" , f"Topic '{ topicName } ' not found" )
@@ -144,8 +166,8 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
144166 try :
145167 validate (instance = topicMessage , schema = TOPICS [topicName ])
146168 except ValidationError as e :
147- return _error_response (400 , "validation" , e .message )
148-
169+ return _error_response (400 , "validation" , e .message )
170+
149171 # Run all writers independently (avoid short-circuit so failures in one don't skip others)
150172 kafka_ok , kafka_err = writer_kafka .write (topicName , topicMessage )
151173 eventbridge_ok , eventbridge_err = writer_eventbridge .write (topicName , topicMessage )
@@ -163,31 +185,28 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
163185 return {
164186 "statusCode" : 500 ,
165187 "headers" : {"Content-Type" : "application/json" },
166- "body" : json .dumps ({
167- "success" : False ,
168- "statusCode" : 500 ,
169- "errors" : errors
170- })
188+ "body" : json .dumps ({"success" : False , "statusCode" : 500 , "errors" : errors }),
171189 }
172190
173191 return {
174192 "statusCode" : 202 ,
175193 "headers" : {"Content-Type" : "application/json" },
176- "body" : json .dumps ({
177- "success" : True ,
178- "statusCode" : 202
179- })
194+ "body" : json .dumps ({"success" : True , "statusCode" : 202 }),
180195 }
181196
197+
182198def extract_token (eventHeaders ):
183199 # Initial implementation used bearer header directly
184200 if "bearer" in eventHeaders :
185201 return eventHeaders ["bearer" ]
186-
187- if "Authorization" in eventHeaders and eventHeaders ["Authorization" ].startswith ("Bearer " ):
188- return eventHeaders ["Authorization" ][len ("Bearer " ):]
189-
190- return "" # Will result in 401
202+
203+ if "Authorization" in eventHeaders and eventHeaders ["Authorization" ].startswith (
204+ "Bearer "
205+ ):
206+ return eventHeaders ["Authorization" ][len ("Bearer " ) :]
207+
208+ return "" # Will result in 401
209+
191210
192211def lambda_handler (event , context ):
193212 try :
@@ -201,7 +220,11 @@ def lambda_handler(event, context):
201220 if event ["httpMethod" ] == "GET" :
202221 return get_topic_schema (event ["pathParameters" ]["topic_name" ].lower ())
203222 if event ["httpMethod" ] == "POST" :
204- return post_topic_message (event ["pathParameters" ]["topic_name" ].lower (), json .loads (event ["body" ]), extract_token (event ["headers" ]))
223+ return post_topic_message (
224+ event ["pathParameters" ]["topic_name" ].lower (),
225+ json .loads (event ["body" ]),
226+ extract_token (event ["headers" ]),
227+ )
205228 if event ["resource" ].lower () == "/terminate" :
206229 sys .exit ("TERMINATING" )
207230 return _error_response (404 , "route" , "Resource not found" )
0 commit comments