Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -1 +1 @@
* @ABMC831 @Zejnilovic @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev
* @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev @lsulak
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ pylint==3.3.8
black==25.1.0
mypy==1.17.1
mypy-extensions==1.1.0
urllib3==2.5.0
urllib3==2.6.0
cryptography==45.0.7
jsonschema==4.25.1
PyJWT==2.10.1
Expand Down
168 changes: 35 additions & 133 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,48 +22,37 @@
from typing import Any, Dict

import boto3
import jwt
from botocore.exceptions import BotoCoreError, NoCredentialsError
from jsonschema import validate
from jsonschema.exceptions import ValidationError

from src.handlers.handler_token import HandlerToken
from src.handlers.handler_topic import HandlerTopic
from src.utils.constants import SSL_CA_BUNDLE_KEY
from src.utils.utils import build_error_response
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV

# Internal aliases used by rest of module
_CONF_DIR = CONF_DIR
_INVALID_CONF_ENV = INVALID_CONF_ENV


# Initialize logger
logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)
if not logger.handlers:
logger.addHandler(logging.StreamHandler())
logger.debug("Initialized LOGGER")
logger.debug("Using CONF_DIR=%s", _CONF_DIR)
if _INVALID_CONF_ENV:
logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", _INVALID_CONF_ENV, _CONF_DIR)
logger.debug("Initialized logger with level %s", log_level)

# Load main configuration
logger.debug("Using CONF_DIR=%s", CONF_DIR)
if INVALID_CONF_ENV:
logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", INVALID_CONF_ENV, CONF_DIR)
with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Loaded main configuration")

with open(os.path.join(_CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file:
# Load API definition
with open(os.path.join(CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file:
API = file.read()
logger.debug("Loaded API definition")

TOPICS: Dict[str, Dict[str, Any]] = {}
with open(os.path.join(_CONF_DIR, "topic_runs.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.runs"] = json.load(file)
with open(os.path.join(_CONF_DIR, "topic_dlchange.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.dlchange"] = json.load(file)
with open(os.path.join(_CONF_DIR, "topic_test.json"), "r", encoding="utf-8") as file:
TOPICS["public.cps.za.test"] = json.load(file)
logger.debug("Loaded TOPICS")

with open(os.path.join(_CONF_DIR, "config.json"), "r", encoding="utf-8") as file:
config = json.load(file)
logger.debug("Loaded main CONFIG")

# Initialize S3 client with SSL verification
try:
ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True)
Expand All @@ -73,6 +62,8 @@
logger.exception("Failed to initialize AWS S3 client")
raise RuntimeError("AWS S3 client initialization failed") from exc

# Load access configuration
ACCESS: Dict[str, list[str]] = {}
if config["access_config"].startswith("s3://"):
name_parts = config["access_config"].split("/")
BUCKET_NAME = name_parts[2]
Expand All @@ -81,7 +72,7 @@
else:
with open(config["access_config"], "r", encoding="utf-8") as file:
ACCESS = json.load(file)
logger.debug("Loaded ACCESS definitions")
logger.debug("Loaded access configuration")

# Initialize token handler and load token public keys
handler_token = HandlerToken(config).load_public_keys()
Expand All @@ -91,114 +82,25 @@
writer_kafka.init(logger, config)
writer_postgres.init(logger)


def _error_response(status: int, err_type: str, message: str) -> Dict[str, Any]:
"""Build a standardized JSON error response body.

Args:
status: HTTP status code.
err_type: A short error classifier (e.g. 'auth', 'validation').
message: Human readable error description.
Returns:
A dictionary compatible with API Gateway Lambda Proxy integration.
"""
return {
"statusCode": status,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(
{
"success": False,
"statusCode": status,
"errors": [{"type": err_type, "message": message}],
}
),
}
# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas()


def get_api() -> Dict[str, Any]:
"""Return the OpenAPI specification text."""
return {"statusCode": 200, "body": API}


def get_topics() -> Dict[str, Any]:
"""Return list of available topic names."""
logger.debug("Handling GET Topics")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(list(TOPICS)),
}


def get_topic_schema(topic_name: str) -> Dict[str, Any]:
"""Return the JSON schema for a specific topic.

Args:
topic_name: The topic whose schema is requested.
def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any]:
"""
logger.debug("Handling GET TopicSchema(%s)", topic_name)
if topic_name not in TOPICS:
return _error_response(404, "topic", f"Topic '{topic_name}' not found")

return {"statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(TOPICS[topic_name])}


def post_topic_message(topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]:
"""Validate auth and schema; dispatch message to all writers.

AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
Args:
topic_name: Target topic name.
topic_message: JSON message payload.
token_encoded: Encoded bearer JWT token string.
"""
logger.debug("Handling POST %s", topic_name)
try:
token: Dict[str, Any] = handler_token.decode_jwt(token_encoded)
except jwt.PyJWTError: # type: ignore[attr-defined]
return _error_response(401, "auth", "Invalid or missing token")

if topic_name not in TOPICS:
return _error_response(404, "topic", f"Topic '{topic_name}' not found")

user = token.get("sub")
if topic_name not in ACCESS or user not in ACCESS[topic_name]: # type: ignore[index]
return _error_response(403, "auth", "User not authorized for topic")

try:
validate(instance=topic_message, schema=TOPICS[topic_name])
except ValidationError as exc:
return _error_response(400, "validation", exc.message)

kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message)
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message)
postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message)

errors = []
if not kafka_ok:
errors.append({"type": "kafka", "message": kafka_err})
if not eventbridge_ok:
errors.append({"type": "eventbridge", "message": eventbridge_err})
if not postgres_ok:
errors.append({"type": "postgres", "message": postgres_err})

if errors:
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"success": False, "statusCode": 500, "errors": errors}),
}

return {
"statusCode": 202,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"success": True, "statusCode": 202}),
}


def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unused-argument,too-many-return-statements
"""AWS Lambda entry point.

Dispatches based on API Gateway proxy 'resource' and 'httpMethod'.
event: The event data from API Gateway.
_context: The mandatory context argument for AWS Lambda invocation (unused).
Returns:
A dictionary compatible with API Gateway Lambda Proxy integration.
Raises:
Request exception: For unexpected errors.
"""
try:
resource = event.get("resource", "").lower()
Expand All @@ -207,20 +109,20 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus
if resource == "/token":
return handler_token.get_token_provider_info()
if resource == "/topics":
return get_topics()
return handler_topic.get_topics_list()
if resource == "/topics/{topic_name}":
method = event.get("httpMethod")
if method == "GET":
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower())
if method == "POST":
return post_topic_message(
return handler_topic.post_topic_message(
event["pathParameters"]["topic_name"].lower(),
json.loads(event["body"]),
handler_token.extract_token(event.get("headers", {})),
)
if resource == "/terminate":
sys.exit("TERMINATING") # pragma: no cover - deliberate termination path
return _error_response(404, "route", "Resource not found")
except Exception as exc: # pylint: disable=broad-exception-caught
logger.error("Unexpected exception: %s", exc)
return _error_response(500, "internal", "Unexpected server error")
sys.exit("TERMINATING")
return build_error_response(404, "route", "Resource not found")
except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc:
logger.exception("Request processing error: %s", request_exc)
return build_error_response(500, "internal", "Unexpected server error")
Loading