diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 939832a..9401ec5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1 +1 @@ -* @ABMC831 @Zejnilovic @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev +* @oto-macenauer-absa @petr-pokorny-absa @tmikula-dev @lsulak diff --git a/requirements.txt b/requirements.txt index 01e824e..6ef737a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,7 +5,7 @@ pylint==3.3.8 black==25.1.0 mypy==1.17.1 mypy-extensions==1.1.0 -urllib3==2.5.0 +urllib3==2.6.0 cryptography==45.0.7 jsonschema==4.25.1 PyJWT==2.10.1 diff --git a/src/event_gate_lambda.py b/src/event_gate_lambda.py index a18d0d9..a0fcefa 100644 --- a/src/event_gate_lambda.py +++ b/src/event_gate_lambda.py @@ -22,48 +22,37 @@ from typing import Any, Dict import boto3 -import jwt from botocore.exceptions import BotoCoreError, NoCredentialsError -from jsonschema import validate -from jsonschema.exceptions import ValidationError from src.handlers.handler_token import HandlerToken +from src.handlers.handler_topic import HandlerTopic from src.utils.constants import SSL_CA_BUNDLE_KEY +from src.utils.utils import build_error_response from src.writers import writer_eventbridge, writer_kafka, writer_postgres from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV -# Internal aliases used by rest of module -_CONF_DIR = CONF_DIR -_INVALID_CONF_ENV = INVALID_CONF_ENV - +# Initialize logger logger = logging.getLogger(__name__) log_level = os.environ.get("LOG_LEVEL", "INFO") logger.setLevel(log_level) if not logger.handlers: logger.addHandler(logging.StreamHandler()) -logger.debug("Initialized LOGGER") -logger.debug("Using CONF_DIR=%s", _CONF_DIR) -if _INVALID_CONF_ENV: - logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", _INVALID_CONF_ENV, _CONF_DIR) +logger.debug("Initialized logger with level %s", log_level) + +# Load main configuration +logger.debug("Using CONF_DIR=%s", CONF_DIR) +if INVALID_CONF_ENV: + logger.warning("CONF_DIR env var set to non-existent path: %s; fell back to %s", INVALID_CONF_ENV, CONF_DIR) +with open(os.path.join(CONF_DIR, "config.json"), "r", encoding="utf-8") as file: + config = json.load(file) +logger.debug("Loaded main configuration") -with open(os.path.join(_CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file: +# Load API definition +with open(os.path.join(CONF_DIR, "api.yaml"), "r", encoding="utf-8") as file: API = file.read() logger.debug("Loaded API definition") -TOPICS: Dict[str, Dict[str, Any]] = {} -with open(os.path.join(_CONF_DIR, "topic_runs.json"), "r", encoding="utf-8") as file: - TOPICS["public.cps.za.runs"] = json.load(file) -with open(os.path.join(_CONF_DIR, "topic_dlchange.json"), "r", encoding="utf-8") as file: - TOPICS["public.cps.za.dlchange"] = json.load(file) -with open(os.path.join(_CONF_DIR, "topic_test.json"), "r", encoding="utf-8") as file: - TOPICS["public.cps.za.test"] = json.load(file) -logger.debug("Loaded TOPICS") - -with open(os.path.join(_CONF_DIR, "config.json"), "r", encoding="utf-8") as file: - config = json.load(file) -logger.debug("Loaded main CONFIG") - # Initialize S3 client with SSL verification try: ssl_verify = config.get(SSL_CA_BUNDLE_KEY, True) @@ -73,6 +62,8 @@ logger.exception("Failed to initialize AWS S3 client") raise RuntimeError("AWS S3 client initialization failed") from exc +# Load access configuration +ACCESS: Dict[str, list[str]] = {} if config["access_config"].startswith("s3://"): name_parts = config["access_config"].split("/") BUCKET_NAME = name_parts[2] @@ -81,7 +72,7 @@ else: with open(config["access_config"], "r", encoding="utf-8") as file: ACCESS = json.load(file) -logger.debug("Loaded ACCESS definitions") +logger.debug("Loaded access configuration") # Initialize token handler and load token public keys handler_token = HandlerToken(config).load_public_keys() @@ -91,28 +82,8 @@ writer_kafka.init(logger, config) writer_postgres.init(logger) - -def _error_response(status: int, err_type: str, message: str) -> Dict[str, Any]: - """Build a standardized JSON error response body. - - Args: - status: HTTP status code. - err_type: A short error classifier (e.g. 'auth', 'validation'). - message: Human readable error description. - Returns: - A dictionary compatible with API Gateway Lambda Proxy integration. - """ - return { - "statusCode": status, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps( - { - "success": False, - "statusCode": status, - "errors": [{"type": err_type, "message": message}], - } - ), - } +# Initialize topic handler and load topic schemas +handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas() def get_api() -> Dict[str, Any]: @@ -120,85 +91,16 @@ def get_api() -> Dict[str, Any]: return {"statusCode": 200, "body": API} -def get_topics() -> Dict[str, Any]: - """Return list of available topic names.""" - logger.debug("Handling GET Topics") - return { - "statusCode": 200, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps(list(TOPICS)), - } - - -def get_topic_schema(topic_name: str) -> Dict[str, Any]: - """Return the JSON schema for a specific topic. - - Args: - topic_name: The topic whose schema is requested. +def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any]: """ - logger.debug("Handling GET TopicSchema(%s)", topic_name) - if topic_name not in TOPICS: - return _error_response(404, "topic", f"Topic '{topic_name}' not found") - - return {"statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(TOPICS[topic_name])} - - -def post_topic_message(topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]: - """Validate auth and schema; dispatch message to all writers. - + AWS Lambda entry point. Dispatches based on API Gateway proxy 'resource' and 'httpMethod'. Args: - topic_name: Target topic name. - topic_message: JSON message payload. - token_encoded: Encoded bearer JWT token string. - """ - logger.debug("Handling POST %s", topic_name) - try: - token: Dict[str, Any] = handler_token.decode_jwt(token_encoded) - except jwt.PyJWTError: # type: ignore[attr-defined] - return _error_response(401, "auth", "Invalid or missing token") - - if topic_name not in TOPICS: - return _error_response(404, "topic", f"Topic '{topic_name}' not found") - - user = token.get("sub") - if topic_name not in ACCESS or user not in ACCESS[topic_name]: # type: ignore[index] - return _error_response(403, "auth", "User not authorized for topic") - - try: - validate(instance=topic_message, schema=TOPICS[topic_name]) - except ValidationError as exc: - return _error_response(400, "validation", exc.message) - - kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message) - eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message) - postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message) - - errors = [] - if not kafka_ok: - errors.append({"type": "kafka", "message": kafka_err}) - if not eventbridge_ok: - errors.append({"type": "eventbridge", "message": eventbridge_err}) - if not postgres_ok: - errors.append({"type": "postgres", "message": postgres_err}) - - if errors: - return { - "statusCode": 500, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"success": False, "statusCode": 500, "errors": errors}), - } - - return { - "statusCode": 202, - "headers": {"Content-Type": "application/json"}, - "body": json.dumps({"success": True, "statusCode": 202}), - } - - -def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unused-argument,too-many-return-statements - """AWS Lambda entry point. - - Dispatches based on API Gateway proxy 'resource' and 'httpMethod'. + event: The event data from API Gateway. + _context: The mandatory context argument for AWS Lambda invocation (unused). + Returns: + A dictionary compatible with API Gateway Lambda Proxy integration. + Raises: + Request exception: For unexpected errors. """ try: resource = event.get("resource", "").lower() @@ -207,20 +109,20 @@ def lambda_handler(event: Dict[str, Any], context: Any): # pylint: disable=unus if resource == "/token": return handler_token.get_token_provider_info() if resource == "/topics": - return get_topics() + return handler_topic.get_topics_list() if resource == "/topics/{topic_name}": method = event.get("httpMethod") if method == "GET": - return get_topic_schema(event["pathParameters"]["topic_name"].lower()) + return handler_topic.get_topic_schema(event["pathParameters"]["topic_name"].lower()) if method == "POST": - return post_topic_message( + return handler_topic.post_topic_message( event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), handler_token.extract_token(event.get("headers", {})), ) if resource == "/terminate": - sys.exit("TERMINATING") # pragma: no cover - deliberate termination path - return _error_response(404, "route", "Resource not found") - except Exception as exc: # pylint: disable=broad-exception-caught - logger.error("Unexpected exception: %s", exc) - return _error_response(500, "internal", "Unexpected server error") + sys.exit("TERMINATING") + return build_error_response(404, "route", "Resource not found") + except (KeyError, json.JSONDecodeError, ValueError, AttributeError, TypeError, RuntimeError) as request_exc: + logger.exception("Request processing error: %s", request_exc) + return build_error_response(500, "internal", "Unexpected server error") diff --git a/src/handlers/handler_topic.py b/src/handlers/handler_topic.py new file mode 100644 index 0000000..1434fa1 --- /dev/null +++ b/src/handlers/handler_topic.py @@ -0,0 +1,154 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module provides the HandlerTopic class for managing topic-related operations. +""" +import json +import logging +import os +from typing import Dict, Any + +import jwt +from jsonschema import validate +from jsonschema.exceptions import ValidationError + +from src.handlers.handler_token import HandlerToken +from src.utils.utils import build_error_response +from src.writers import writer_eventbridge, writer_kafka, writer_postgres + +logger = logging.getLogger(__name__) +log_level = os.environ.get("LOG_LEVEL", "INFO") +logger.setLevel(log_level) + + +class HandlerTopic: + """ + HandlerTopic manages topic schemas, access control, and message posting. + """ + + def __init__(self, conf_dir: str, access_config: Dict[str, list[str]], handler_token: HandlerToken): + self.conf_dir = conf_dir + self.access_config = access_config + self.handler_token = handler_token + self.topics: Dict[str, Dict[str, Any]] = {} + + def load_topic_schemas(self) -> "HandlerTopic": + """ + Load topic schemas from configuration files. + Returns: + HandlerTopic: The current instance with loaded topic schemas. + """ + logger.debug("Loading topic schemas from %s", self.conf_dir) + + with open(os.path.join(self.conf_dir, "topic_runs.json"), "r", encoding="utf-8") as file: + self.topics["public.cps.za.runs"] = json.load(file) + with open(os.path.join(self.conf_dir, "topic_dlchange.json"), "r", encoding="utf-8") as file: + self.topics["public.cps.za.dlchange"] = json.load(file) + with open(os.path.join(self.conf_dir, "topic_test.json"), "r", encoding="utf-8") as file: + self.topics["public.cps.za.test"] = json.load(file) + + logger.debug("Loaded topic schemas successfully") + return self + + def get_topics_list(self) -> Dict[str, Any]: + """ + Return the list of available topics. + Returns: + Dict[str, Any]: API Gateway response with topic list. + """ + logger.debug("Handling GET Topics") + return { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps(list(self.topics)), + } + + def get_topic_schema(self, topic_name: str) -> Dict[str, Any]: + """ + Return the JSON schema for a specific topic. + Args: + topic_name: The topic whose schema is requested. + Returns: + Dict[str, Any]: API Gateway response with topic schema or error. + """ + logger.debug("Handling GET TopicSchema(%s)", topic_name) + + if topic_name not in self.topics: + return build_error_response(404, "topic", f"Topic '{topic_name}' not found") + + return { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps(self.topics[topic_name]), + } + + def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], token_encoded: str) -> Dict[str, Any]: + """ + Validate auth and schema; dispatch message to all writers. + Args: + topic_name: Target topic name. + topic_message: JSON message payload. + token_encoded: Encoded bearer JWT token string. + Returns: + Dict[str, Any]: API Gateway response indicating success or failure. + Raises: + jwt.PyJWTError: If token decoding fails. + ValidationError: If message validation fails. + """ + logger.debug("Handling POST TopicMessage(%s)", topic_name) + + try: + token: Dict[str, Any] = self.handler_token.decode_jwt(token_encoded) + except jwt.PyJWTError: # type: ignore[attr-defined] + return build_error_response(401, "auth", "Invalid or missing token") + + if topic_name not in self.topics: + return build_error_response(404, "topic", f"Topic '{topic_name}' not found") + + user = token.get("sub") + if topic_name not in self.access_config or user not in self.access_config[topic_name]: + return build_error_response(403, "auth", "User not authorized for topic") + + try: + validate(instance=topic_message, schema=self.topics[topic_name]) + except ValidationError as exc: + return build_error_response(400, "validation", exc.message) + + kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message) + eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message) + postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message) + + errors = [] + if not kafka_ok: + errors.append({"type": "kafka", "message": kafka_err}) + if not eventbridge_ok: + errors.append({"type": "eventbridge", "message": eventbridge_err}) + if not postgres_ok: + errors.append({"type": "postgres", "message": postgres_err}) + + if errors: + return { + "statusCode": 500, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({"success": False, "statusCode": 500, "errors": errors}), + } + + return { + "statusCode": 202, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps({"success": True, "statusCode": 202}), + } diff --git a/src/utils/utils.py b/src/utils/utils.py new file mode 100644 index 0000000..fd04ee0 --- /dev/null +++ b/src/utils/utils.py @@ -0,0 +1,42 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utility functions for the EventGate project.""" +import json +from typing import Dict, Any + + +def build_error_response(status: int, err_type: str, message: str) -> Dict[str, Any]: + """ + Build a standardized JSON error response body. + Args: + status (int): HTTP status code. + err_type (str): A short error classifier (e.g. 'auth', 'validation'). + message (str): Human readable error description. + Returns: + A dictionary compatible with API Gateway Lambda Proxy integration. + """ + return { + "statusCode": status, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps( + { + "success": False, + "statusCode": status, + "errors": [{"type": err_type, "message": message}], + } + ), + } diff --git a/tests/handlers/test_handler_token.py b/tests/handlers/test_handler_token.py index 2761f95..487860d 100644 --- a/tests/handlers/test_handler_token.py +++ b/tests/handlers/test_handler_token.py @@ -18,6 +18,7 @@ from datetime import datetime, timedelta, timezone from unittest.mock import patch, Mock +import jwt import pytest from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey @@ -33,7 +34,7 @@ def token_handler(): def test_get_token_endpoint(event_gate_module, make_event): event = make_event("/token") - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 303 assert "Location" in resp["headers"] @@ -42,10 +43,9 @@ def test_post_expired_token(event_gate_module, make_event, valid_payload): """Expired JWT should yield 401 auth error.""" with patch.object( - event_gate_module.jwt, - "decode", - side_effect=event_gate_module.jwt.ExpiredSignatureError("expired"), - create=True, + event_gate_module.handler_token, + "decode_jwt", + side_effect=jwt.ExpiredSignatureError("expired"), ): event = make_event( "/topics/{topic_name}", @@ -54,7 +54,7 @@ def test_post_expired_token(event_gate_module, make_event, valid_payload): body=valid_payload, headers={"Authorization": "Bearer expiredtoken"}, ) - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 401 body = json.loads(resp["body"]) assert any(e["type"] == "auth" for e in body["errors"]) @@ -67,12 +67,12 @@ def test_decode_jwt_all_second_key_succeeds(event_gate_module): second_key = object() event_gate_module.handler_token.public_keys = [first_key, second_key] - def decode_side_effect(token, key, algorithms): + def decode_side_effect(_token, key, **_kwargs): if key is first_key: - raise event_gate_module.jwt.PyJWTError("signature mismatch") + raise jwt.PyJWTError("signature mismatch") return {"sub": "TestUser"} - with patch.object(event_gate_module.jwt, "decode", side_effect=decode_side_effect, create=True): + with patch("jwt.decode", side_effect=decode_side_effect): claims = event_gate_module.handler_token.decode_jwt("dummy-token") assert claims["sub"] == "TestUser" @@ -82,11 +82,11 @@ def test_decode_jwt_all_all_keys_fail(event_gate_module): bad_keys = [object(), object()] event_gate_module.handler_token.public_keys = bad_keys - def always_fail(token, key, algorithms): - raise event_gate_module.jwt.PyJWTError("bad signature") + def always_fail(_token, _key, **_kwargs): + raise jwt.PyJWTError("bad signature") - with patch.object(event_gate_module.jwt, "decode", side_effect=always_fail, create=True): - with pytest.raises(event_gate_module.jwt.PyJWTError) as exc: + with patch("jwt.decode", side_effect=always_fail): + with pytest.raises(jwt.PyJWTError) as exc: event_gate_module.handler_token.decode_jwt("dummy-token") assert "Verification failed for all public keys" in str(exc.value) diff --git a/tests/handlers/test_handler_topic.py b/tests/handlers/test_handler_topic.py new file mode 100644 index 0000000..3916dd6 --- /dev/null +++ b/tests/handlers/test_handler_topic.py @@ -0,0 +1,215 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +from unittest.mock import patch, mock_open, MagicMock + +import jwt + +from src.handlers.handler_topic import HandlerTopic + + +## load_topic_schemas() +def test_load_topic_schemas_success(): + mock_handler_token = MagicMock() + access_config = {"public.cps.za.test": ["TestUser"]} + handler = HandlerTopic("conf", access_config, mock_handler_token) + + mock_schemas = { + "topic_runs.json": {"type": "object", "properties": {"run_id": {"type": "string"}}}, + "topic_dlchange.json": {"type": "object", "properties": {"change_id": {"type": "string"}}}, + "topic_test.json": {"type": "object", "properties": {"event_id": {"type": "string"}}}, + } + + def mock_open_side_effect(file_path, *_args, **_kwargs): + for filename, schema in mock_schemas.items(): + if filename in file_path: + return mock_open(read_data=json.dumps(schema)).return_value + raise FileNotFoundError(file_path) + + with patch("builtins.open", side_effect=mock_open_side_effect): + result = handler.load_topic_schemas() + + assert result is handler + assert len(handler.topics) == 3 + assert "public.cps.za.runs" in handler.topics + assert "public.cps.za.dlchange" in handler.topics + assert "public.cps.za.test" in handler.topics + + +## get_topics_list() +def test_get_topics(event_gate_module, make_event): + event = make_event("/topics") + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 200 + body = json.loads(resp["body"]) + assert "public.cps.za.test" in body + + +## get_topic_schema() +def test_get_topic_schema_found(event_gate_module, make_event): + event = make_event("/topics/{topic_name}", method="GET", topic="public.cps.za.test") + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 200 + schema = json.loads(resp["body"]) + assert schema["type"] == "object" + + +def test_get_topic_schema_not_found(event_gate_module, make_event): + event = make_event("/topics/{topic_name}", method="GET", topic="no.such.topic") + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 404 + + +## post_topic_message() +# --- POST auth / validation failures --- +def test_post_missing_token(event_gate_module, make_event, valid_payload): + event = make_event( + "/topics/{topic_name}", method="POST", topic="public.cps.za.test", body=valid_payload, headers={} + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 401 + body = json.loads(resp["body"]) + assert not body["success"] + assert body["errors"][0]["type"] == "auth" + + +def test_post_unauthorized_user(event_gate_module, make_event, valid_payload): + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "NotAllowed"}): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 403 + body = json.loads(resp["body"]) + assert body["errors"][0]["type"] == "auth" + + +def test_post_schema_validation_error(event_gate_module, make_event): + payload = {"event_id": "e1", "tenant_id": "t1", "source_app": "app", "environment": "dev"} # missing timestamp + with patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 400 + body = json.loads(resp["body"]) + assert body["errors"][0]["type"] == "validation" + + +def test_post_invalid_token_decode(event_gate_module, make_event, valid_payload): + with patch.object(event_gate_module.handler_token, "decode_jwt", side_effect=jwt.PyJWTError("bad")): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer abc"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 401 + body = json.loads(resp["body"]) + assert body["errors"][0]["type"] == "auth" + + +# --- POST success & failure aggregation --- +def test_post_success_all_writers(event_gate_module, make_event, valid_payload): + with ( + patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), + patch("src.handlers.handler_topic.writer_kafka.write", return_value=(True, None)), + patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), + patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), + ): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 202 + body = json.loads(resp["body"]) + assert body["success"] + assert body["statusCode"] == 202 + + +def test_post_single_writer_failure(event_gate_module, make_event, valid_payload): + with ( + patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), + patch("src.handlers.handler_topic.writer_kafka.write", return_value=(False, "Kafka boom")), + patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), + patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), + ): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 500 + body = json.loads(resp["body"]) + assert not body["success"] + assert len(body["errors"]) == 1 + assert body["errors"][0]["type"] == "kafka" + + +def test_post_multiple_writer_failures(event_gate_module, make_event, valid_payload): + with ( + patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), + patch("src.handlers.handler_topic.writer_kafka.write", return_value=(False, "Kafka A")), + patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(False, "EB B")), + patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), + ): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"Authorization": "Bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 500 + body = json.loads(resp["body"]) + assert sorted(e["type"] for e in body["errors"]) == ["eventbridge", "kafka"] + + +def test_token_extraction_lowercase_bearer_header(event_gate_module, make_event, valid_payload): + with ( + patch.object(event_gate_module.handler_token, "decode_jwt", return_value={"sub": "TestUser"}), + patch("src.handlers.handler_topic.writer_kafka.write", return_value=(True, None)), + patch("src.handlers.handler_topic.writer_eventbridge.write", return_value=(True, None)), + patch("src.handlers.handler_topic.writer_postgres.write", return_value=(True, None)), + ): + event = make_event( + "/topics/{topic_name}", + method="POST", + topic="public.cps.za.test", + body=valid_payload, + headers={"authorization": "bearer token"}, + ) + resp = event_gate_module.lambda_handler(event) + assert resp["statusCode"] == 202 diff --git a/tests/test_event_gate_lambda.py b/tests/test_event_gate_lambda.py index 6f87a2a..cdb9851 100644 --- a/tests/test_event_gate_lambda.py +++ b/tests/test_event_gate_lambda.py @@ -18,184 +18,9 @@ from unittest.mock import patch, MagicMock -# --- GET flows --- - - -def test_get_topics(event_gate_module, make_event): - event = make_event("/topics") - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 200 - body = json.loads(resp["body"]) - assert "public.cps.za.test" in body - - -def test_get_topic_schema_found(event_gate_module, make_event): - event = make_event("/topics/{topic_name}", method="GET", topic="public.cps.za.test") - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 200 - schema = json.loads(resp["body"]) - assert schema["type"] == "object" - - -def test_get_topic_schema_not_found(event_gate_module, make_event): - event = make_event("/topics/{topic_name}", method="GET", topic="no.such.topic") - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 404 - - -# --- POST auth / validation failures --- - - -def test_post_missing_token(event_gate_module, make_event, valid_payload): - event = make_event( - "/topics/{topic_name}", method="POST", topic="public.cps.za.test", body=valid_payload, headers={} - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 401 - body = json.loads(resp["body"]) - assert not body["success"] - assert body["errors"][0]["type"] == "auth" - - -def test_post_unauthorized_user(event_gate_module, make_event, valid_payload): - with patch.object(event_gate_module.jwt, "decode", return_value={"sub": "NotAllowed"}, create=True): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 403 - body = json.loads(resp["body"]) - assert body["errors"][0]["type"] == "auth" - - -def test_post_schema_validation_error(event_gate_module, make_event): - payload = {"event_id": "e1", "tenant_id": "t1", "source_app": "app", "environment": "dev"} # missing timestamp - with patch.object(event_gate_module.jwt, "decode", return_value={"sub": "TestUser"}, create=True): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=payload, - headers={"Authorization": "Bearer token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 400 - body = json.loads(resp["body"]) - assert body["errors"][0]["type"] == "validation" - - -def test_post_invalid_token_decode(event_gate_module, make_event, valid_payload): - class DummyJwtError(Exception): - pass - - # Patch jwt.decode to raise PyJWTError-like exception; use existing attribute if present - with patch.object( - event_gate_module.jwt, "decode", side_effect=event_gate_module.jwt.PyJWTError("bad"), create=True - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer abc"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 401 - body = json.loads(resp["body"]) - assert body["errors"][0]["type"] == "auth" - - -# --- POST success & failure aggregation --- - - -def test_post_success_all_writers(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.jwt, "decode", return_value={"sub": "TestUser"}, create=True), - patch("src.event_gate_lambda.writer_kafka.write", return_value=(True, None)), - patch("src.event_gate_lambda.writer_eventbridge.write", return_value=(True, None)), - patch("src.event_gate_lambda.writer_postgres.write", return_value=(True, None)), - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 202 - body = json.loads(resp["body"]) - assert body["success"] - assert body["statusCode"] == 202 - - -def test_post_single_writer_failure(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.jwt, "decode", return_value={"sub": "TestUser"}, create=True), - patch("src.event_gate_lambda.writer_kafka.write", return_value=(False, "Kafka boom")), - patch("src.event_gate_lambda.writer_eventbridge.write", return_value=(True, None)), - patch("src.event_gate_lambda.writer_postgres.write", return_value=(True, None)), - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 500 - body = json.loads(resp["body"]) - assert not body["success"] - assert len(body["errors"]) == 1 - assert body["errors"][0]["type"] == "kafka" - - -def test_post_multiple_writer_failures(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.jwt, "decode", return_value={"sub": "TestUser"}, create=True), - patch("src.event_gate_lambda.writer_kafka.write", return_value=(False, "Kafka A")), - patch("src.event_gate_lambda.writer_eventbridge.write", return_value=(False, "EB B")), - patch("src.event_gate_lambda.writer_postgres.write", return_value=(True, None)), - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"Authorization": "Bearer token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 500 - body = json.loads(resp["body"]) - assert sorted(e["type"] for e in body["errors"]) == ["eventbridge", "kafka"] - - -def test_token_extraction_lowercase_bearer_header(event_gate_module, make_event, valid_payload): - with ( - patch.object(event_gate_module.jwt, "decode", return_value={"sub": "TestUser"}, create=True), - patch("src.event_gate_lambda.writer_kafka.write", return_value=(True, None)), - patch("src.event_gate_lambda.writer_eventbridge.write", return_value=(True, None)), - patch("src.event_gate_lambda.writer_postgres.write", return_value=(True, None)), - ): - event = make_event( - "/topics/{topic_name}", - method="POST", - topic="public.cps.za.test", - body=valid_payload, - headers={"bearer": "token"}, - ) - resp = event_gate_module.lambda_handler(event, None) - assert resp["statusCode"] == 202 - - def test_unknown_resource(event_gate_module, make_event): event = make_event("/unknown") - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 404 body = json.loads(resp["body"]) assert body["errors"][0]["type"] == "route" @@ -203,15 +28,15 @@ def test_unknown_resource(event_gate_module, make_event): def test_get_api_endpoint(event_gate_module, make_event): event = make_event("/api") - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 200 assert "openapi" in resp["body"].lower() def test_internal_error_path(event_gate_module, make_event): - with patch("src.event_gate_lambda.get_topics", side_effect=RuntimeError("boom")): + with patch.object(event_gate_module.handler_topic, "get_topics_list", side_effect=RuntimeError("boom")): event = make_event("/topics") - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 500 body = json.loads(resp["body"]) assert body["errors"][0]["type"] == "internal" @@ -226,7 +51,7 @@ def test_post_invalid_json_body(event_gate_module, make_event): body="{invalid json", headers={"Authorization": "Bearer token"}, ) - resp = event_gate_module.lambda_handler(event, None) + resp = event_gate_module.lambda_handler(event) assert resp["statusCode"] == 500 body = json.loads(resp["body"]) assert any(e["type"] == "internal" for e in body["errors"]) # internal error path diff --git a/tests/utils/test_utils.py b/tests/utils/test_utils.py new file mode 100644 index 0000000..b73e54c --- /dev/null +++ b/tests/utils/test_utils.py @@ -0,0 +1,35 @@ +# +# Copyright 2025 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json + +from src.utils.utils import build_error_response + + +## build_error_response() +def test_build_error_response_structure(): + """Test that build_error_response returns correct response structure.""" + resp = build_error_response(404, "topic", "Topic not found") + + assert resp["statusCode"] == 404 + assert resp["headers"]["Content-Type"] == "application/json" + + body = json.loads(resp["body"]) + assert body["success"] is False + assert body["statusCode"] == 404 + assert len(body["errors"]) == 1 + assert body["errors"][0]["type"] == "topic" + assert body["errors"][0]["message"] == "Topic not found"