Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/event_gate_lambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@
from src.handlers.handler_health import HandlerHealth
from src.utils.constants import SSL_CA_BUNDLE_KEY
from src.utils.utils import build_error_response
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.writers.writer_eventbridge import WriterEventBridge
from src.writers.writer_kafka import WriterKafka
from src.writers.writer_postgres import WriterPostgres
from src.utils.conf_path import CONF_DIR, INVALID_CONF_ENV


Expand Down Expand Up @@ -79,15 +81,17 @@
handler_token = HandlerToken(config).load_public_keys()

# Initialize EventGate writers
writer_eventbridge.init(logger, config)
writer_kafka.init(logger, config)
writer_postgres.init(logger)
writers = {
"kafka": WriterKafka(config),
"eventbridge": WriterEventBridge(config),
"postgres": WriterPostgres(config),
}
Comment on lines +84 to +88
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "writer_postgres.py" -o -name "event_gate_lambda.py" | head -20

Repository: AbsaOSS/EventGate

Length of output: 120


🏁 Script executed:

# Check WriterPostgres implementation
if [ -f "src/writer_postgres.py" ]; then
  echo "=== writer_postgres.py ===" 
  wc -l src/writer_postgres.py
  head -70 src/writer_postgres.py
elif [ -f "src/writers/writer_postgres.py" ]; then
  echo "=== src/writers/writer_postgres.py ===" 
  wc -l src/writers/writer_postgres.py
  head -70 src/writers/writer_postgres.py
fi

Repository: AbsaOSS/EventGate

Length of output: 2382


🏁 Script executed:

# Check event_gate_lambda.py
if [ -f "src/event_gate_lambda.py" ]; then
  echo "=== event_gate_lambda.py ===" 
  sed -n '80,95p' src/event_gate_lambda.py
fi

Repository: AbsaOSS/EventGate

Length of output: 593


Verify constructor behavior aligns with issue #94 requirements.

The writers registry structure is well-designed, but WriterPostgres.__init__ (lines 50-61 in src/writers/writer_postgres.py) violates issue #94's requirement to "avoid any initialization logic in constructors that can throw exceptions."

When POSTGRES_SECRET_NAME and POSTGRES_SECRET_REGION environment variables are set, the constructor immediately calls aws_secrets.get_secret_value(), which can raise BotoCoreError, ClientError, or KeyError. Since this instantiation occurs during Lambda handler initialization, it will cause the entire handler to fail if Secrets Manager is unavailable, resulting in cold-start failures.

Defer secrets loading to lazy initialization in write() or check_health() to ensure the handler initializes successfully regardless of Secrets Manager availability, fully complying with the stated design goal.

🤖 Prompt for AI Agents
In `@src/event_gate_lambda.py` around lines 84 - 88, WriterPostgres.__init__
performs eager secret fetching via aws_secrets.get_secret_value when
POSTGRES_SECRET_NAME/POSTGRES_SECRET_REGION are set, which can raise exceptions
during Lambda cold-start; refactor WriterPostgres so its constructor only stores
config and env values, and move any call to aws_secrets.get_secret_value (and
parsing of returned secret) into a lazy initializer invoked from
WriterPostgres.write() and WriterPostgres.check_health(), ensuring those methods
guard against repeated initialization and surface errors there instead of in
__init__.


# Initialize topic handler and load topic schemas
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token).load_topic_schemas()
handler_topic = HandlerTopic(CONF_DIR, ACCESS, handler_token, writers).load_topic_schemas()

# Initialize health handler
handler_health = HandlerHealth()
handler_health = HandlerHealth(writers)


def get_api() -> Dict[str, Any]:
Expand Down
31 changes: 7 additions & 24 deletions src/handlers/handler_health.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from datetime import datetime, timezone
from typing import Dict, Any

from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.writers.writer import Writer

logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
Expand All @@ -35,8 +35,9 @@ class HandlerHealth:
HandlerHealth manages service health checks and dependency status monitoring.
"""

def __init__(self):
def __init__(self, writers: Dict[str, Writer]):
self.start_time: datetime = datetime.now(timezone.utc)
self.writers = writers

def get_health(self) -> Dict[str, Any]:
"""
Expand All @@ -51,28 +52,10 @@ def get_health(self) -> Dict[str, Any]:

failures: Dict[str, str] = {}

# Check Kafka writer
if writer_kafka.STATE.get("producer") is None:
failures["kafka"] = "producer not initialized"

# Check EventBridge writer
eventbus_arn = writer_eventbridge.STATE.get("event_bus_arn")
eventbridge_client = writer_eventbridge.STATE.get("client")
if eventbus_arn:
if eventbridge_client is None:
failures["eventbridge"] = "client not initialized"

# Check PostgreSQL writer
postgres_config = writer_postgres.POSTGRES
if postgres_config.get("database"):
if not postgres_config.get("host"):
failures["postgres"] = "host not configured"
elif not postgres_config.get("user"):
failures["postgres"] = "user not configured"
elif not postgres_config.get("password"):
failures["postgres"] = "password not configured"
elif not postgres_config.get("port"):
failures["postgres"] = "port not configured"
for name, writer in self.writers.items():
healthy, msg = writer.check_health()
if not healthy:
failures[name] = msg

uptime_seconds = int((datetime.now(timezone.utc) - self.start_time).total_seconds())

Expand Down
25 changes: 13 additions & 12 deletions src/handlers/handler_topic.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

from src.handlers.handler_token import HandlerToken
from src.utils.utils import build_error_response
from src.writers import writer_eventbridge, writer_kafka, writer_postgres
from src.writers.writer import Writer

logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
Expand All @@ -40,10 +40,17 @@ class HandlerTopic:
HandlerTopic manages topic schemas, access control, and message posting.
"""

def __init__(self, conf_dir: str, access_config: Dict[str, list[str]], handler_token: HandlerToken):
def __init__(
self,
conf_dir: str,
access_config: Dict[str, list[str]],
handler_token: HandlerToken,
writers: Dict[str, Writer],
):
self.conf_dir = conf_dir
self.access_config = access_config
self.handler_token = handler_token
self.writers = writers
self.topics: Dict[str, Dict[str, Any]] = {}

def load_topic_schemas(self) -> "HandlerTopic":
Expand Down Expand Up @@ -128,17 +135,11 @@ def post_topic_message(self, topic_name: str, topic_message: Dict[str, Any], tok
except ValidationError as exc:
return build_error_response(400, "validation", exc.message)

kafka_ok, kafka_err = writer_kafka.write(topic_name, topic_message)
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topic_name, topic_message)
postgres_ok, postgres_err = writer_postgres.write(topic_name, topic_message)

errors = []
if not kafka_ok:
errors.append({"type": "kafka", "message": kafka_err})
if not eventbridge_ok:
errors.append({"type": "eventbridge", "message": eventbridge_err})
if not postgres_ok:
errors.append({"type": "postgres", "message": postgres_err})
for writer_name, writer in self.writers.items():
ok, err = writer.write(topic_name, topic_message)
if not ok:
errors.append({"type": writer_name, "message": err})

if errors:
return {
Expand Down
59 changes: 59 additions & 0 deletions src/writers/writer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
#
# Copyright 2026 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""
This module provides abstract base class for all EventGate writers.
"""

from abc import ABC, abstractmethod
from typing import Any, Dict, Optional, Tuple


class Writer(ABC):
"""
Abstract base class for EventGate writers.
All writers inherit from this class and implement the write() method. Writers use lazy initialization.
"""

def __init__(self, config: Dict[str, Any]) -> None:
self.config = config

@abstractmethod
def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Publish a message to the target system.
Args:
topic_name: Target writer topic (destination) name.
message: JSON-serializable payload to publish.
Returns:
Tuple of (success: bool, error_message: Optional[str]).
- (True, None) on success or when writer is disabled/skipped.
- (False, "error description") on failure.
"""

@abstractmethod
def check_health(self) -> Tuple[bool, str]:
"""
Check writer health and connectivity.
Returns:
Tuple of (is_healthy: bool, message: str).
- (True, "ok") - configured and working.
- (True, "not configured") - not configured, skipped.
- (False, "error message") - configured but failing.
"""
151 changes: 87 additions & 64 deletions src/writers/writer_eventbridge.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,86 +14,109 @@
# limitations under the License.
#

"""EventBridge writer module.

"""
EventBridge writer module.
Provides initialization and write functionality for publishing events to AWS EventBridge.
"""

import json
import logging
import os
from typing import Any, Dict, Optional, Tuple, List

import boto3
from botocore.exceptions import BotoCoreError, ClientError

from src.utils.trace_logging import log_payload_at_trace
from src.writers.writer import Writer

STATE: Dict[str, Any] = {"logger": logging.getLogger(__name__), "event_bus_arn": "", "client": None}

logger = logging.getLogger(__name__)
log_level = os.environ.get("LOG_LEVEL", "INFO")
logger.setLevel(log_level)

def init(logger: logging.Logger, config: Dict[str, Any]) -> None:
"""Initialize the EventBridge writer.

Args:
logger: Shared application logger.
config: Configuration dictionary (expects optional 'event_bus_arn').
class WriterEventBridge(Writer):
"""
STATE["logger"] = logger
STATE["client"] = boto3.client("events")
STATE["event_bus_arn"] = config.get("event_bus_arn", "")
STATE["logger"].debug("Initialized EVENTBRIDGE writer")


def _format_failed_entries(entries: List[Dict[str, Any]]) -> str:
failed = [e for e in entries if "ErrorCode" in e or "ErrorMessage" in e]
# Keep message concise but informative
return json.dumps(failed) if failed else "[]"


def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Publish a message to EventBridge.

Args:
topic_name: Source topic name used as event Source.
message: JSON-serializable payload.
Returns:
Tuple of success flag and optional error message.
EventBridge writer for publishing events to AWS EventBridge.
The boto3 EventBridge client is created on the first write() call.
"""
logger = STATE["logger"]
event_bus_arn = STATE["event_bus_arn"]
client = STATE["client"]

if not event_bus_arn:
logger.debug("No EventBus Arn - skipping")
return True, None
if client is None: # defensive
logger.debug("EventBridge client not initialized - skipping")
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
self._client: Optional["boto3.client"] = None
self._entries: List[Dict[str, Any]] = []
self.event_bus_arn: str = config.get("event_bus_arn", "")
logger.debug("Initialized EventBridge writer")

def _format_failed_entries(self) -> str:
"""
Format failed EventBridge entries for error message.

Returns:
JSON string of failed entries.
"""
failed = [e for e in self._entries if "ErrorCode" in e or "ErrorMessage" in e]
return json.dumps(failed) if failed else "[]"

def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Publish a message to EventBridge.

Args:
topic_name: Target EventBridge writer topic (destination) name.
message: JSON-serializable payload to publish.
Returns:
Tuple of (success: bool, error_message: Optional[str]).
"""
if not self.event_bus_arn:
logger.debug("No EventBus Arn - skipping EventBridge writer")
return True, None

if self._client is None:
self._client = boto3.client("events")
logger.debug("EventBridge client initialized")

log_payload_at_trace(logger, "EventBridge", topic_name, message)

try:
logger.debug("Sending to EventBridge %s", topic_name)
response = self._client.put_events(
Entries=[
{
"Source": topic_name,
"DetailType": "JSON",
"Detail": json.dumps(message),
"EventBusName": self.event_bus_arn,
}
]
)
failed_count = response.get("FailedEntryCount", 0)
if failed_count > 0:
self._entries = response.get("Entries", [])
failed_repr = self._format_failed_entries()
msg = f"{failed_count} EventBridge entries failed: {failed_repr}"
logger.error(msg)
return False, msg
except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors
logger.exception("EventBridge put_events call failed")
return False, str(err)

return True, None

log_payload_at_trace(logger, "EventBridge", topic_name, message)

try:
logger.debug("Sending to eventBridge %s", topic_name)
response = client.put_events(
Entries=[
{
"Source": topic_name,
"DetailType": "JSON",
"Detail": json.dumps(message),
"EventBusName": event_bus_arn,
}
]
)
failed_count = response.get("FailedEntryCount", 0)
if failed_count > 0:
entries = response.get("Entries", [])
failed_repr = _format_failed_entries(entries)
msg = f"{failed_count} EventBridge entries failed: {failed_repr}"
logger.error(msg)
return False, msg
except (BotoCoreError, ClientError) as err: # explicit AWS client-related errors
logger.exception("EventBridge put_events call failed")
return False, str(err)

# Let any unexpected exception propagate for upstream handler (avoids broad except BLE001 / TRY400)
return True, None
def check_health(self) -> Tuple[bool, str]:
"""
Check EventBridge writer health.

Returns:
Tuple of (is_healthy: bool, message: str).
"""
if not self.event_bus_arn:
return True, "not configured"

try:
if self._client is None:
self._client = boto3.client("events")
logger.debug("EventBridge client initialized during health check")
return True, "ok"
except (BotoCoreError, ClientError) as err:
return False, str(err)
Loading