2222from typing import Any , Dict
2323
2424import boto3
25- import jwt
2625from botocore .exceptions import BotoCoreError , NoCredentialsError
27- from jsonschema import validate
28- from jsonschema .exceptions import ValidationError
2926
3027from src .handlers .handler_token import HandlerToken
28+ from src .handlers .handler_topic import HandlerTopic
3129from src .utils .constants import SSL_CA_BUNDLE_KEY
30+ from src .utils .utils import build_error_response
3231from src .writers import writer_eventbridge , writer_kafka , writer_postgres
3332from src .utils .conf_path import CONF_DIR , INVALID_CONF_ENV
3433
35- # Internal aliases used by rest of module
36- _CONF_DIR = CONF_DIR
37- _INVALID_CONF_ENV = INVALID_CONF_ENV
38-
3934
35+ # Initialize logger
4036logger = logging .getLogger (__name__ )
4137log_level = os .environ .get ("LOG_LEVEL" , "INFO" )
4238logger .setLevel (log_level )
4339if not logger .handlers :
4440 logger .addHandler (logging .StreamHandler ())
45- logger .debug ("Initialized LOGGER" )
46- logger .debug ("Using CONF_DIR=%s" , _CONF_DIR )
47- if _INVALID_CONF_ENV :
48- logger .warning ("CONF_DIR env var set to non-existent path: %s; fell back to %s" , _INVALID_CONF_ENV , _CONF_DIR )
41+ logger .debug ("Initialized logger with level %s" , log_level )
42+
43+ # Load main configuration
44+ logger .debug ("Using CONF_DIR=%s" , CONF_DIR )
45+ if INVALID_CONF_ENV :
46+ logger .warning ("CONF_DIR env var set to non-existent path: %s; fell back to %s" , INVALID_CONF_ENV , CONF_DIR )
47+ with open (os .path .join (CONF_DIR , "config.json" ), "r" , encoding = "utf-8" ) as file :
48+ config = json .load (file )
49+ logger .debug ("Loaded main configuration" )
4950
50- with open (os .path .join (_CONF_DIR , "api.yaml" ), "r" , encoding = "utf-8" ) as file :
51+ # Load API definition
52+ with open (os .path .join (CONF_DIR , "api.yaml" ), "r" , encoding = "utf-8" ) as file :
5153 API = file .read ()
5254logger .debug ("Loaded API definition" )
5355
54- TOPICS : Dict [str , Dict [str , Any ]] = {}
55- with open (os .path .join (_CONF_DIR , "topic_runs.json" ), "r" , encoding = "utf-8" ) as file :
56- TOPICS ["public.cps.za.runs" ] = json .load (file )
57- with open (os .path .join (_CONF_DIR , "topic_dlchange.json" ), "r" , encoding = "utf-8" ) as file :
58- TOPICS ["public.cps.za.dlchange" ] = json .load (file )
59- with open (os .path .join (_CONF_DIR , "topic_test.json" ), "r" , encoding = "utf-8" ) as file :
60- TOPICS ["public.cps.za.test" ] = json .load (file )
61- logger .debug ("Loaded TOPICS" )
62-
63- with open (os .path .join (_CONF_DIR , "config.json" ), "r" , encoding = "utf-8" ) as file :
64- config = json .load (file )
65- logger .debug ("Loaded main CONFIG" )
66-
6756# Initialize S3 client with SSL verification
6857try :
6958 ssl_verify = config .get (SSL_CA_BUNDLE_KEY , True )
7362 logger .exception ("Failed to initialize AWS S3 client" )
7463 raise RuntimeError ("AWS S3 client initialization failed" ) from exc
7564
65+ # Load access configuration
66+ ACCESS : Dict [str , list [str ]] = {}
7667if config ["access_config" ].startswith ("s3://" ):
7768 name_parts = config ["access_config" ].split ("/" )
7869 BUCKET_NAME = name_parts [2 ]
8172else :
8273 with open (config ["access_config" ], "r" , encoding = "utf-8" ) as file :
8374 ACCESS = json .load (file )
84- logger .debug ("Loaded ACCESS definitions " )
75+ logger .debug ("Loaded access configuration " )
8576
8677# Initialize token handler and load token public keys
8778handler_token = HandlerToken (config ).load_public_keys ()
9182writer_kafka .init (logger , config )
9283writer_postgres .init (logger )
9384
94-
95- def _error_response (status : int , err_type : str , message : str ) -> Dict [str , Any ]:
96- """Build a standardized JSON error response body.
97-
98- Args:
99- status: HTTP status code.
100- err_type: A short error classifier (e.g. 'auth', 'validation').
101- message: Human readable error description.
102- Returns:
103- A dictionary compatible with API Gateway Lambda Proxy integration.
104- """
105- return {
106- "statusCode" : status ,
107- "headers" : {"Content-Type" : "application/json" },
108- "body" : json .dumps (
109- {
110- "success" : False ,
111- "statusCode" : status ,
112- "errors" : [{"type" : err_type , "message" : message }],
113- }
114- ),
115- }
85+ # Initialize topic handler and load topic schemas
86+ handler_topic = HandlerTopic (CONF_DIR , ACCESS , handler_token ).load_topic_schemas ()
11687
11788
11889def get_api () -> Dict [str , Any ]:
11990 """Return the OpenAPI specification text."""
12091 return {"statusCode" : 200 , "body" : API }
12192
12293
123- def get_topics () -> Dict [str , Any ]:
124- """Return list of available topic names."""
125- logger .debug ("Handling GET Topics" )
126- return {
127- "statusCode" : 200 ,
128- "headers" : {"Content-Type" : "application/json" },
129- "body" : json .dumps (list (TOPICS )),
130- }
131-
132-
133- def get_topic_schema (topic_name : str ) -> Dict [str , Any ]:
134- """Return the JSON schema for a specific topic.
135-
136- Args:
137- topic_name: The topic whose schema is requested.
94+ def lambda_handler (event : Dict [str , Any ], _context : Any = None ) -> Dict [str , Any ]:
13895 """
139- logger .debug ("Handling GET TopicSchema(%s)" , topic_name )
140- if topic_name not in TOPICS :
141- return _error_response (404 , "topic" , f"Topic '{ topic_name } ' not found" )
142-
143- return {"statusCode" : 200 , "headers" : {"Content-Type" : "application/json" }, "body" : json .dumps (TOPICS [topic_name ])}
144-
145-
146- def post_topic_message (topic_name : str , topic_message : Dict [str , Any ], token_encoded : str ) -> Dict [str , Any ]:
147- """Validate auth and schema; dispatch message to all writers.
148-
96+ AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
14997 Args:
150- topic_name: Target topic name.
151- topic_message: JSON message payload.
152- token_encoded: Encoded bearer JWT token string.
153- """
154- logger .debug ("Handling POST %s" , topic_name )
155- try :
156- token : Dict [str , Any ] = handler_token .decode_jwt (token_encoded )
157- except jwt .PyJWTError : # type: ignore[attr-defined]
158- return _error_response (401 , "auth" , "Invalid or missing token" )
159-
160- if topic_name not in TOPICS :
161- return _error_response (404 , "topic" , f"Topic '{ topic_name } ' not found" )
162-
163- user = token .get ("sub" )
164- if topic_name not in ACCESS or user not in ACCESS [topic_name ]: # type: ignore[index]
165- return _error_response (403 , "auth" , "User not authorized for topic" )
166-
167- try :
168- validate (instance = topic_message , schema = TOPICS [topic_name ])
169- except ValidationError as exc :
170- return _error_response (400 , "validation" , exc .message )
171-
172- kafka_ok , kafka_err = writer_kafka .write (topic_name , topic_message )
173- eventbridge_ok , eventbridge_err = writer_eventbridge .write (topic_name , topic_message )
174- postgres_ok , postgres_err = writer_postgres .write (topic_name , topic_message )
175-
176- errors = []
177- if not kafka_ok :
178- errors .append ({"type" : "kafka" , "message" : kafka_err })
179- if not eventbridge_ok :
180- errors .append ({"type" : "eventbridge" , "message" : eventbridge_err })
181- if not postgres_ok :
182- errors .append ({"type" : "postgres" , "message" : postgres_err })
183-
184- if errors :
185- return {
186- "statusCode" : 500 ,
187- "headers" : {"Content-Type" : "application/json" },
188- "body" : json .dumps ({"success" : False , "statusCode" : 500 , "errors" : errors }),
189- }
190-
191- return {
192- "statusCode" : 202 ,
193- "headers" : {"Content-Type" : "application/json" },
194- "body" : json .dumps ({"success" : True , "statusCode" : 202 }),
195- }
196-
197-
198- def lambda_handler (event : Dict [str , Any ], context : Any ): # pylint: disable=unused-argument,too-many-return-statements
199- """AWS Lambda entry point.
200-
201- Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
98+ event: The event data from API Gateway.
99+ _context: The mandatory context argument for AWS Lambda invocation (unused).
100+ Returns:
101+ A dictionary compatible with API Gateway Lambda Proxy integration.
102+ Raises:
103+ Request exception: For unexpected errors.
202104 """
203105 try :
204106 resource = event .get ("resource" , "" ).lower ()
@@ -207,20 +109,20 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus
207109 if resource == "/token" :
208110 return handler_token .get_token_provider_info ()
209111 if resource == "/topics" :
210- return get_topics ()
112+ return handler_topic . get_topics_list ()
211113 if resource == "/topics/{topic_name}" :
212114 method = event .get ("httpMethod" )
213115 if method == "GET" :
214- return get_topic_schema (event ["pathParameters" ]["topic_name" ].lower ())
116+ return handler_topic . get_topic_schema (event ["pathParameters" ]["topic_name" ].lower ())
215117 if method == "POST" :
216- return post_topic_message (
118+ return handler_topic . post_topic_message (
217119 event ["pathParameters" ]["topic_name" ].lower (),
218120 json .loads (event ["body" ]),
219121 handler_token .extract_token (event .get ("headers" , {})),
220122 )
221123 if resource == "/terminate" :
222- sys .exit ("TERMINATING" ) # pragma: no cover - deliberate termination path
223- return _error_response (404 , "route" , "Resource not found" )
224- except Exception as exc : # pylint: disable=broad-exception-caught
225- logger .error ( "Unexpected exception : %s" , exc )
226- return _error_response (500 , "internal" , "Unexpected server error" )
124+ sys .exit ("TERMINATING" )
125+ return build_error_response (404 , "route" , "Resource not found" )
126+ except ( KeyError , json . JSONDecodeError , ValueError , AttributeError , TypeError , RuntimeError ) as request_exc :
127+ logger .exception ( "Request processing error : %s" , request_exc )
128+ return build_error_response (500 , "internal" , "Unexpected server error" )
0 commit comments