Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions conf/api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,43 @@ paths:
'303':
description: Redirect to actual address of Loing service which performs auth up to its capabilities

/health:
get:
summary: Service health check
description: Service health and dependency status check
responses:
'200':
description: Service is healthy
content:
application/json:
schema:
type: object
properties:
status:
type: string
example: ok
uptime_seconds:
type: integer
example: 12345
'503':
description: Service is degraded
content:
application/json:
schema:
type: object
properties:
status:
type: string
example: degraded
failures:
type: object
additionalProperties:
type: string
example:
eventbridge: client not initialized
kafka: producer not initialized
postgres: host not configured

/topics:
get:
summary: Get a list of topics
Expand Down
6 changes: 6 additions & 0 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

from src.handlers.handler_token import HandlerToken
from src.handlers.handler_topic import HandlerTopic
from src.handlers.handler_health import HandlerHealth
from src.utils.constants import SSL_CA_BUNDLE_KEY
from src.utils.utils import build_error_response
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
Expand Down Expand Up @@ -85,6 +86,9 @@
# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas()

# Initialize health handler
handler_health = HandlerHealth()


def get_api() -> Dict[str, Any]:
"""Return the OpenAPI specification text."""
Expand All @@ -108,6 +112,8 @@ def lambda_handler(event: Dict[str, Any], _context: Any = None) -> Dict[str, Any
return get_api()
if resource == "/token":
return handler_token.get_token_provider_info()
if resource == "/health":
return handler_health.get_health()
if resource == "/topics":
return handler_topic.get_topics_list()
if resource == "/topics/{topic_name}":
Expand Down
92 changes: 92 additions & 0 deletions src/handlers/handler_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module provides the HandlerHealth class for service health monitoring.
"""
import json
import logging
import os
from datetime import datetime, timezone
from typing import Dict, Any

from src.writers import writer_eventbridge, writer_kafka, writer_postgres

logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)


class HandlerHealth:
"""
HandlerHealth manages service health checks and dependency status monitoring.
"""

def __init__(self):
self.start_time: datetime = datetime.now(timezone.utc)

def get_health(self) -> Dict[str, Any]:
"""
Check service health and return status.

Returns:
Dict[str, Any]: API Gateway response with health status.
- 200: All dependencies healthy
- 503: One or more dependencies not initialized
"""
logger.debug("Handling GET Health")

failures: Dict[str, str] = {}

# Check Kafka writer
if writer_kafka.STATE.get("producer") is None:
failures["kafka"] = "producer not initialized"

# Check EventBridge writer
eventbus_arn = writer_eventbridge.STATE.get("event_bus_arn")
eventbridge_client = writer_eventbridge.STATE.get("client")
if eventbus_arn:
if eventbridge_client is None:
failures["eventbridge"] = "client not initialized"

# Check PostgreSQL writer
postgres_config = writer_postgres.POSTGRES
if postgres_config.get("database"):
if not postgres_config.get("host"):
failures["postgres"] = "host not configured"
elif not postgres_config.get("user"):
failures["postgres"] = "user not configured"
elif not postgres_config.get("password"):
failures["postgres"] = "password not configured"
elif not postgres_config.get("port"):
failures["postgres"] = "port not configured"

uptime_seconds = int((datetime.now(timezone.utc) - self.start_time).total_seconds())

if not failures:
logger.debug("Health check passed")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"status": "ok", "uptime_seconds": uptime_seconds}),
}

logger.debug("Health check degraded: %s", failures)
return {
"statusCode": 503,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"status": "degraded", "failures": failures}),
}
164 changes: 164 additions & 0 deletions tests/handlers/test_handler_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
#
# Copyright 2025 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import json
from unittest.mock import MagicMock, patch

from src.handlers.handler_health import HandlerHealth

### get_health()


## Minimal healthy state (just kafka)
def test_get_health_minimal_kafka_healthy():
"""Health check returns 200 when Kafka is initialized and optional writers are disabled."""
handler = HandlerHealth()

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": None, "event_bus_arn": ""}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", {"database": ""}),
):
response = handler.get_health()

assert response["statusCode"] == 200
body = json.loads(response["body"])
assert body["status"] == "ok"
assert "uptime_seconds" in body


## Healthy state with all writers enabled
def test_get_health_all_writers_enabled_and_healthy():
"""Health check returns 200 when all writers are enabled and properly configured."""
handler = HandlerHealth()
postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"}

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config),
):
response = handler.get_health()

assert response["statusCode"] == 200
body = json.loads(response["body"])
assert body["status"] == "ok"
assert "uptime_seconds" in body


## Degraded state with all writers enabled
def test_get_health_kafka_not_initialized():
"""Health check returns 503 when Kafka writer is not initialized."""
handler = HandlerHealth()
postgres_config = {"database": "db", "host": "", "user": "", "password": "", "port": ""}

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": None}),
patch(
"src.handlers.handler_health.writer_eventbridge.STATE",
{"client": None, "event_bus_arn": "arn:aws:events:us-east-1:123:event-bus/bus"},
),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config),
):
response = handler.get_health()

assert response["statusCode"] == 503
body = json.loads(response["body"])
assert body["status"] == "degraded"
assert "kafka" in body["failures"]
assert "eventbridge" in body["failures"]
assert "postgres" in body["failures"]


## Healthy when eventbridge is disabled
def test_get_health_eventbridge_disabled():
"""Health check returns 200 when EventBridge is disabled (empty event_bus_arn)."""
handler = HandlerHealth()
postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"}

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": None, "event_bus_arn": ""}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config),
):
response = handler.get_health()

assert response["statusCode"] == 200


## Healthy when postgres is disabled
def test_get_health_postgres_disabled():
"""Health check returns 200 when PostgreSQL is disabled (empty database)."""
handler = HandlerHealth()

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", {"database": ""}),
):
response = handler.get_health()

assert response["statusCode"] == 200


## Degraded state - postgres host not configured
def test_get_health_postgres_host_not_configured():
"""Health check returns 503 when PostgreSQL host is not configured."""
handler = HandlerHealth()
postgres_config = {"database": "db", "host": "", "user": "user", "password": "pass", "port": "5432"}

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config),
):
response = handler.get_health()

assert response["statusCode"] == 503
body = json.loads(response["body"])
assert body["failures"]["postgres"] == "host not configured"


## Uptime calculation
def test_get_health_uptime_is_positive():
"""Verify uptime_seconds is calculated and is a positive integer."""
handler = HandlerHealth()
postgres_config = {"database": "db", "host": "localhost", "user": "user", "password": "pass", "port": "5432"}

with (
patch("src.handlers.handler_health.writer_kafka.STATE", {"producer": MagicMock()}),
patch("src.handlers.handler_health.writer_eventbridge.STATE", {"client": MagicMock(), "event_bus_arn": "arn"}),
patch("src.handlers.handler_health.writer_postgres.POSTGRES", postgres_config),
):
response = handler.get_health()

body = json.loads(response["body"])
assert "uptime_seconds" in body
assert isinstance(body["uptime_seconds"], int)
assert body["uptime_seconds"] >= 0


## Integration test with event_gate_module
def test_health_endpoint_integration(event_gate_module, make_event):
"""Test /health endpoint through lambda_handler."""
event = make_event("/health")
resp = event_gate_module.lambda_handler(event)

# Should return 200 since writers are mocked as initialized in conftest
assert resp["statusCode"] == 200
body = json.loads(resp["body"])
assert body["status"] == "ok"
assert "uptime_seconds" in body
Loading