Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @ABMC831 @Zejnilovic @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev
* @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev @lsulak
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pylint==3.3.8
black==25.1.0
mypy==1.17.1
mypy-extensions==1.1.0
urllib3==2.5.0
urllib3==2.6.0
cryptography==45.0.7
jsonschema==4.25.1
PyJWT==2.10.1
Expand Down
168 changes: 35 additions & 133 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,37 @@
from typing import Any, Dict

import boto3
import jwt
from botocore.exceptions import BotoCoreError, NoCredentialsError
from jsonschema import validate
from jsonschema.exceptions import ValidationError

from src.handlers.handler_token import HandlerToken
from src.handlers.handler_topic import HandlerTopic
from src.utils.constants import SSL_CA_BUNDLE_KEY
from src.utils.utils import build_error_response
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV

# Internal aliases used by rest of module
_CONF_DIR = CONF_DIR
_INVALID_CONF_ENV = INVALID_CONF_ENV


# Initialize logger
logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
logger.debug("Initialized LOGGER")
logger.debug("Using CONF_DIR=%s", _CONF_DIR)
if _INVALID_CONF_ENV:
logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", _INVALID_CONF_ENV, _CONF_DIR)
logger.debug("Initialized logger with level %s", log_level)

# Load main configuration
logger.debug("Using CONF_DIR=%s", CONF_DIR)
if INVALID_CONF_ENV:
logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", INVALID_CONF_ENV, CONF_DIR)
with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Loaded main configuration")

with open(os.path.join(_CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file:
# Load API definition
with open(os.path.join(CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file:
API = file.read()
logger.debug("Loaded API definition")

TOPICS: Dict[str, Dict[str, Any]] = {}
with open(os.path.join(_CONF_DIR, "topic_runs.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.runs"] = json.load(file)
with open(os.path.join(_CONF_DIR, "topic_dlchange.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.dlchange"] = json.load(file)
with open(os.path.join(_CONF_DIR, "topic_test.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.test"] = json.load(file)
logger.debug("Loaded TOPICS")

with open(os.path.join(_CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Loaded main CONFIG")

# Initialize S3 client with SSL verification
try:
ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True)
Expand All @@ -73,6 +62,8 @@
logger.exception("Failed to initialize AWS S3 client")
raise RuntimeError("AWS S3 client initialization failed") from exc

# Load access configuration
ACCESS: Dict[str, list[str]] = {}
if config["access_config"].startswith("s3://"):
name_parts = config["access_config"].split("/")
BUCKET_NAME = name_parts[2]
Expand All @@ -81,7 +72,7 @@
else:
with open(config["access_config"], "r", encoding="utf-8") as file:
ACCESS = json.load(file)
logger.debug("Loaded ACCESS definitions")
logger.debug("Loaded access configuration")

# Initialize token handler and load token public keys
handler_token = HandlerToken(config).load_public_keys()
Expand All @@ -91,114 +82,25 @@
writer_kafka.init(logger, config)
writer_postgres.init(logger)


def _error_response(status: int, err_type: str, message: str) -> Dict[str, Any]:
"""Build a standardized JSON error response body.

Args:
status: HTTP status code.
err_type: A short error classifier (e.g. 'auth', 'validation').
message: Human readable error description.
Returns:
A dictionary compatible with API Gateway Lambda Proxy integration.
"""
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(
{
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}],
}
),
}
# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas()


def get_api() -> Dict[str, Any]:
"""Return the OpenAPI specification text."""
return {"statusCode": 200, "body": API}


def get_topics() -> Dict[str, Any]:
"""Return list of available topic names."""
logger.debug("Handling GET Topics")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(list(TOPICS)),
}


def get_topic_schema(topic_name: str) -> Dict[str, Any]:
"""Return the JSON schema for a specific topic.

Args:
topic_name: The topic whose schema is requested.
def lambda_handler(event: Dict[str, Any], context: Any = None) -> Dict[str, Any]:
"""
logger.debug("Handling GET TopicSchema(%s)", topic_name)
if topic_name not in TOPICS:
return _error_response(404, "topic", f"Topic '{topic_name}' not found")

return {"statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(TOPICS[topic_name])}


def post_topic_message(topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
"""Validate auth and schema; dispatch message to all writers.

AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
Args:
topic_name: Target topic name.
topic_message: JSON message payload.
token_encoded: Encoded bearer JWT token string.
"""
logger.debug("Handling POST %s", topic_name)
try:
token: Dict[str, Any] = handler_token.decode_jwt(token_encoded)
except jwt.PyJWTError: # type: ignore[attr-defined]
return _error_response(401, "auth", "Invalid or missing token")

if topic_name not in TOPICS:
return _error_response(404, "topic", f"Topic '{topic_name}' not found")

user = token.get("sub")
if topic_name not in ACCESS or user not in ACCESS[topic_name]: # type: ignore[index]
return _error_response(403, "auth", "User not authorized for topic")

try:
validate(instance=topic_message, schema=TOPICS[topic_name])
except ValidationError as exc:
return _error_response(400, "validation", exc.message)

kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message)
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message)
postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message)

errors = []
if not kafka_ok:
errors.append({"type": "kafka", "message": kafka_err})
if not eventbridge_ok:
errors.append({"type": "eventbridge", "message": eventbridge_err})
if not postgres_ok:
errors.append({"type": "postgres", "message": postgres_err})

if errors:
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"success": False, "statusCode": 500, "errors": errors}),
}

return {
"statusCode": 202,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"success": True, "statusCode": 202}),
}


def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unused-argument,too-many-return-statements
"""AWS Lambda entry point.

Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
event: The event data from API Gateway.
context: The mandatory context argument for AWS Lambda invocation.
Returns:
A dictionary compatible with API Gateway Lambda Proxy integration.
Raises:
Request exception: For unexpected errors.
"""
try:
resource = event.get("resource", "").lower()
Expand All @@ -207,20 +109,20 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus
if resource == "/token":
return handler_token.get_token_provider_info()
if resource == "/topics":
return get_topics()
return handler_topic.get_topics_list()
if resource == "/topics/{topic_name}":
method = event.get("httpMethod")
if method == "GET":
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower())
if method == "POST":
return post_topic_message(
return handler_topic.post_topic_message(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
handler_token.extract_token(event.get("headers", {})),
)
if resource == "/terminate":
sys.exit("TERMINATING") # pragma: no cover - deliberate termination path
return _error_response(404, "route", "Resource not found")
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error("Unexpected exception: %s", exc)
return _error_response(500, "internal", "Unexpected server error")
sys.exit("TERMINATING")
return build_error_response(404, "route", "Resource not found")
except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc:
logger.error("Request processing error: %s", request_exc)
return build_error_response(500, "internal", "Unexpected server error")
Loading
Loading