Skip to content

Conversation

@tmikula-dev
Copy link
Collaborator

@tmikula-dev tmikula-dev commented Jan 14, 2026

Overview

This pull request refactors the EventGate writer architecture to use an abstract base class and individual writer classes for Kafka, EventBridge, and PostgreSQL. It replaces global state and initialization functions with lazy-initialized, dependency-injected writer objects, improving modularity, testability, and health check logic. This PR should be purely refactoring, no new enhancement should be implemented.

Release Notes

  • OOP writer classes with abstract base Writer class

Related

Closes #94

Summary by CodeRabbit

Release Notes

  • Refactor
    • Improved internal architecture of writer components (Kafka, EventBridge, PostgreSQL) to enhance code maintainability and system reliability. All existing functionality remains unchanged.

✏️ Tip: You can customize this high-level summary in your review settings.

@tmikula-dev tmikula-dev self-assigned this Jan 14, 2026
@tmikula-dev tmikula-dev added the refactoring Improving code quality, paying off tech debt, aligning APIs label Jan 14, 2026
@coderabbitai
Copy link

coderabbitai bot commented Jan 14, 2026

Walkthrough

This refactoring converts writer modules from procedural code with module-level state to a class-based OOP architecture. An abstract Writer base class provides a consistent interface (write, check_health). Handlers now receive a writers registry via dependency injection instead of importing and initializing writers directly.

Changes

Cohort / File(s) Summary
Abstract Writer Base Class
src/writers/writer.py
New abstract base class defining Writer interface with __init__(config), write(topic_name, message), and check_health() methods; enforces consistent contract across all writer implementations.
Writer Class Implementations
src/writers/writer_kafka.py, src/writers/writer_eventbridge.py, src/writers/writer_postgres.py
Converted from procedural modules with global STATE to class-based writers inheriting from Writer; introduced lazy initialization via constructors, instance-level state management (_producer, _client, _db_config), and per-instance write/check_health implementations; replaced module-level init functions with parameterized constructors.
Handler Dependency Injection
src/handlers/handler_topic.py, src/handlers/handler_health.py
Updated constructors to accept writers: Dict[str, Writer] registry parameter; replaced individual writer imports and module-level calls with loops over writer instances; unified error aggregation across writers.
Entry Point Writer Registry
src/event_gate_lambda.py
Instantiates writer classes with configuration, builds writers registry dictionary, and injects it into HandlerTopic and HandlerHealth constructors.
Handler Tests
tests/handlers/test_handler_health.py, tests/handlers/test_handler_topic.py
Refactored to instantiate handlers with mock writers dictionary; replaced module-state patching with per-instance writer mock setup; tests now exercise injected writer behavior.
Writer Tests
tests/writers/test_writer_eventbridge.py, tests/writers/test_writer_kafka.py, tests/writers/test_writer_postgres.py
Refactored to directly instantiate writer classes and inject state via constructor/attributes; added comprehensive health-check test scenarios (configured, not-configured, error states); dropped global STATE manipulation.
Trace Logging Tests
tests/utils/test_trace_logging.py
Updated to instantiate writer objects directly and configure via instance attributes; replaced module-level state aliases with explicit writer instances; aligned with new class-based writer approach.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested labels

enhancement

Suggested reviewers

  • petr-pokorny-absa
  • oto-macenauer-absa
  • ABMC831

Poem

🐰 Classes rise where functions dwelled,
Writers now in registry held,
No more state in module's fold—
Each instance, independent and bold!
OOP patterns shine so bright,
Refactoring done just right! ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 29.89% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'OOP for project writers' is concise and directly describes the main refactoring work—transitioning the writer modules to object-oriented design with classes.
Linked Issues check ✅ Passed The PR fulfills all coding requirements from issue #94: implements Python classes for writers, creates a parent Writer class with write and check_health methods, avoids exception-throwing logic in constructors, and simplifies HandlerTopic write logic via dependency injection.
Out of Scope Changes check ✅ Passed All changes are within scope—the refactoring replaces module-level state with class-based writers, updates handlers to use dependency injection, and adjusts tests to match the new architecture. No new features or unrelated changes are introduced.
Description check ✅ Passed The PR description includes all required sections (Overview, Release Notes, Related) and clearly explains the refactoring scope, objectives, and linked issue.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Comment on lines +89 to +138
operation,
"location",
"format",
format_options,
additional_info
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)""",
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
job.get("country", ""),
job["catalog_id"],
job["status"],
job["timestamp_start"],
job["timestamp_end"],
job.get("message"),
(json.dumps(job.get("additional_info")) if "additional_info" in job else None),
message["tenant_id"],
message["source_app"],
message["source_app_version"],
message["environment"],
message["timestamp_event"],
message.get("country", ""),
message["catalog_id"],
message["operation"],
message.get("location"),
message["format"],
(json.dumps(message.get("format_options")) if "format_options" in message else None),
(json.dumps(message.get("additional_info")) if "additional_info" in message else None),
),
)


def postgres_test_write(cursor, table: str, message: Dict[str, Any]) -> None:
"""Insert a test topic row.
Args:
cursor: Database cursor.
table: Target table name.
message: Event payload.
"""
logger.debug("Sending to Postgres - %s", table)
cursor.execute(
f"""
INSERT INTO {table}
(
event_id,
tenant_id,
source_app,
environment,
timestamp_event,
additional_info
def _postgres_run_write(self, cursor: Any, table_runs: str, table_jobs: str, message: Dict[str, Any]) -> None:
"""
Insert a run event row plus related job rows.
Args:
cursor: Database cursor.
table_runs: Runs table name.
table_jobs: Jobs table name.
message: Event payload (includes jobs array).
"""
logger.debug("Sending to Postgres - %s and %s", table_runs, table_jobs)

Check failure

Code scanning / AquaSec

sqlalchemy safe query execution High

Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
Comment on lines +151 to +185
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
message["job_ref"],
message["tenant_id"],
message["source_app"],
message["source_app_version"],
message["environment"],
message["timestamp_start"],
message["timestamp_end"],
),
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
message["tenant_id"],
message["source_app"],
message["environment"],
message["timestamp"],
(json.dumps(message.get("additional_info")) if "additional_info" in message else None),
),
)


def write(topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""Dispatch insertion for a topic into the correct Postgres table(s).
Skips if Postgres not configured or psycopg2 unavailable. Returns success flag and optional error.
Args:
topic_name: Incoming topic identifier.
message: Event payload.
"""
try:
if not POSTGRES.get("database"):
logger.debug("No Postgres - skipping")
return True, None
if psycopg2 is None: # type: ignore
logger.debug("psycopg2 not available - skipping actual Postgres write")
return True, None

log_payload_at_trace(logger, "Postgres", topic_name, message)

with psycopg2.connect( # type: ignore[attr-defined]
database=POSTGRES["database"],
host=POSTGRES["host"],
user=POSTGRES["user"],
password=POSTGRES["password"],
port=POSTGRES["port"],
) as connection: # type: ignore[call-arg]
with connection.cursor() as cursor: # type: ignore
if topic_name == "public.cps.za.dlchange":
postgres_edla_write(cursor, "public_cps_za_dlchange", message)
elif topic_name == "public.cps.za.runs":
postgres_run_write(cursor, "public_cps_za_runs", "public_cps_za_runs_jobs", message)
elif topic_name == "public.cps.za.test":
postgres_test_write(cursor, "public_cps_za_test", message)
else:
msg = f"unknown topic for postgres {topic_name}"
logger.error(msg)
return False, msg

connection.commit() # type: ignore
except (RuntimeError, PsycopgError) as e: # narrowed exception set
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
logger.exception(err_msg)
return False, err_msg
for job in message["jobs"]:
cursor.execute(
f"""
INSERT INTO {table_jobs}
(
event_id,
country,
catalog_id,
status,
timestamp_start,
timestamp_end,

Check failure

Code scanning / AquaSec

sqlalchemy safe query execution High

Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
Comment on lines +188 to +222
)
VALUES
(
%s,
%s,
%s,
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
job.get("country", ""),
job["catalog_id"],
job["status"],
job["timestamp_start"],
job["timestamp_end"],
job.get("message"),
(json.dumps(job.get("additional_info")) if "additional_info" in job else None),
),
)

def _postgres_test_write(self, cursor: Any, table: str, message: Dict[str, Any]) -> None:
"""
Insert a test topic row.
Args:
cursor: Database cursor.
table: Target table name.
message: Event payload.
"""
logger.debug("Sending to Postgres - %s", table)
cursor.execute(

Check failure

Code scanning / AquaSec

sqlalchemy safe query execution High

Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
Comment on lines +234 to +262
(
%s,
%s,
%s,
%s,
%s,
%s
)""",
(
message["event_id"],
message["tenant_id"],
message["source_app"],
message["environment"],
message["timestamp"],
(json.dumps(message.get("additional_info")) if "additional_info" in message else None),
),
)

return True, None
def write(self, topic_name: str, message: Dict[str, Any]) -> Tuple[bool, Optional[str]]:
"""
Dispatch insertion for a topic into the correct Postgres table(s).
Args:
topic_name: Incoming topic identifier.
message: JSON-serializable payload.
Returns:
Tuple of (success: bool, error_message: Optional[str]).
"""
try:

Check failure

Code scanning / AquaSec

sqlalchemy safe query execution High

Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.
@github-actions
Copy link

AquaSec has completed a full security repository scan ✅ You can find the analysis results for this PR branch on this overview.
Below is the summary of the findings:

AQUASEC CRITICAL HIGH MEDIUM LOW TOTAL
sast 0 4 2 0 6
vulnerabilities 0 0 0 0 0
iacMisconfigurations 1 2 2 10 15
secrets 0 0 0 0 0
pipelineMisconfigurations 0 0 0 16 16
license 0 0 0 0 0
➡️ Total 1 6 4 26 37

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@src/event_gate_lambda.py`:
- Around line 84-88: WriterPostgres.__init__ performs eager secret fetching via
aws_secrets.get_secret_value when POSTGRES_SECRET_NAME/POSTGRES_SECRET_REGION
are set, which can raise exceptions during Lambda cold-start; refactor
WriterPostgres so its constructor only stores config and env values, and move
any call to aws_secrets.get_secret_value (and parsing of returned secret) into a
lazy initializer invoked from WriterPostgres.write() and
WriterPostgres.check_health(), ensuring those methods guard against repeated
initialization and surface errors there instead of in __init__.

In `@src/writers/writer_postgres.py`:
- Around line 292-295: The error message inside the except block catching
(RuntimeError, PsycopgError) as e is grammatically incorrect; update the err_msg
construction so it reads "The Postgres writer failed with unknown error" (keep
including the exception string via str(e)), then call logger.exception with that
corrected err_msg and return the same False, err_msg tuple; locate the except
block that defines err_msg and uses logger.exception to make this change.
- Around line 53-64: The constructor (__init__) is performing AWS Secrets
Manager calls (boto3.Session().client and get_secret_value) which may raise
exceptions; change it to only read and store the environment variables (e.g.,
secret_name, secret_region) and initialize _db_config to a safe default (e.g.,
{"database": ""}) without making network calls. Add a lazy-initialization step
in write() (or a helper like _load_db_config_if_needed / _ensure_db_config)
that, on first write, uses boto3 to fetch and json.loads the secret into
self._db_config, catching and surfacing BotoCoreError/ClientError appropriately
(and caching the loaded config so subsequent writes don't re-fetch). Ensure
unique symbols referenced: __init__, _db_config, write (or
_load_db_config_if_needed).

In `@src/writers/writer.py`:
- Around line 1-16: The file header in src/writers/writer.py uses "Copyright
2026" which is inconsistent with other files using "Copyright 2025"; update the
copyright year in the header comment from 2026 to 2025 to match the repository
convention (no code changes required).
🧹 Nitpick comments (6)
tests/writers/test_writer_eventbridge.py (1)

57-68: Remove redundant import inside test function.

BotoCoreError is already imported at line 18. The local import at line 58 is unnecessary.

Suggested fix
 def test_write_client_error():
-    from botocore.exceptions import BotoCoreError
-
     class DummyError(BotoCoreError):
         pass
tests/handlers/test_handler_health.py (1)

69-86: Test name doesn't match test behavior.

The function test_get_health_kafka_not_initialized actually tests when all writers are failing, not just Kafka. Consider renaming for clarity.

Suggested rename
-def test_get_health_kafka_not_initialized():
-    """Health check returns 503 when Kafka writer is not initialized."""
+def test_get_health_all_writers_failing():
+    """Health check returns 503 when all writers report failures."""
src/writers/writer_kafka.py (2)

53-82: Minor: Simplify redundant condition.

Line 60 has a redundant check - "kafka_bootstrap_server" not in self.config and not self.config.get("kafka_bootstrap_server") overlap. The .get() check alone is sufficient since it handles both missing key and falsy value.

♻️ Suggested simplification
-        if "kafka_bootstrap_server" not in self.config or not self.config.get("kafka_bootstrap_server"):
+        if not self.config.get("kafka_bootstrap_server"):
             return None

101-167: LGTM - well-structured write logic with retry handling.

The write() method correctly implements:

  • Lazy producer initialization
  • Graceful skip when not configured
  • Error aggregation from produce callback and flush exceptions
  • Retry loop with backoff for transient flush failures
  • Appropriate logging levels (warning vs error vs exception)

Regarding the static analysis hint on Line 132 (msg unused in lambda): this is intentional since the Kafka callback signature requires both err and msg parameters. Consider using _msg to signal intent:

♻️ Optional: Use underscore prefix to indicate unused parameter
-                callback=lambda err, msg: (errors.append(str(err)) if err is not None else None),
+                callback=lambda err, _msg: (errors.append(str(err)) if err is not None else None),
src/writers/writer_eventbridge.py (1)

44-49: Constructor correctly avoids throwing operations - aligns with PR objectives.

The constructor only stores configuration and initializes internal state. The boto3 client creation is deferred to the first write() call (lazy initialization), which adheres to the requirement from Issue #94 to "avoid any initialization logic in constructors that can throw exceptions."

Minor type hint issue: Optional["boto3.client"] is technically incorrect since boto3.client is a factory function, not a type.

💡 Optional: Fix the type hint
-        self._client: Optional["boto3.client"] = None
+        self._client: Optional[Any] = None  # boto3 EventBridge client

Or for stricter typing, use Optional[botocore.client.BaseClient] with an appropriate import.

src/writers/writer_postgres.py (1)

78-110: SQL injection risk is mitigated but the pattern is fragile.

The static analysis correctly flags the f-string table name interpolation. While the current implementation is safe because table names are hardcoded internally (lines 280-285), this pattern is risky for future maintenance:

  1. Future developers may not realize the security implications
  2. The pattern could be copied elsewhere with user-controlled input

The data values are correctly parameterized with %s placeholders, which is good.

💡 Consider using psycopg2's identifier quoting
from psycopg2 import sql

# Instead of:
cursor.execute(f"INSERT INTO {table} ...")

# Use:
cursor.execute(
    sql.SQL("INSERT INTO {} ...").format(sql.Identifier(table)),
    (...)
)

Alternatively, define table names as module-level constants with explicit validation.

Also applies to: 140-162, 177-199, 223-241

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between efc9000 and 4afdeb9.

📒 Files selected for processing (13)
  • src/event_gate_lambda.py
  • src/handlers/handler_health.py
  • src/handlers/handler_topic.py
  • src/writers/writer.py
  • src/writers/writer_eventbridge.py
  • src/writers/writer_kafka.py
  • src/writers/writer_postgres.py
  • tests/handlers/test_handler_health.py
  • tests/handlers/test_handler_topic.py
  • tests/utils/test_trace_logging.py
  • tests/writers/test_writer_eventbridge.py
  • tests/writers/test_writer_kafka.py
  • tests/writers/test_writer_postgres.py
🧰 Additional context used
🧬 Code graph analysis (11)
tests/utils/test_trace_logging.py (3)
src/writers/writer_eventbridge.py (2)
  • WriterEventBridge (38-122)
  • write (61-104)
src/writers/writer_kafka.py (2)
  • write (101-167)
  • WriterKafka (42-187)
src/writers/writer_postgres.py (2)
  • write (252-297)
  • WriterPostgres (47-318)
src/writers/writer_eventbridge.py (2)
src/utils/trace_logging.py (1)
  • log_payload_at_trace (29-48)
src/writers/writer.py (3)
  • Writer (25-59)
  • write (35-47)
  • check_health (50-59)
tests/writers/test_writer_eventbridge.py (1)
src/writers/writer_eventbridge.py (3)
  • WriterEventBridge (38-122)
  • write (61-104)
  • check_health (106-122)
src/writers/writer.py (3)
src/writers/writer_kafka.py (2)
  • write (101-167)
  • check_health (169-187)
src/writers/writer_eventbridge.py (2)
  • write (61-104)
  • check_health (106-122)
src/writers/writer_postgres.py (2)
  • write (252-297)
  • check_health (299-318)
src/handlers/handler_health.py (4)
src/writers/writer.py (2)
  • Writer (25-59)
  • check_health (50-59)
src/writers/writer_kafka.py (1)
  • check_health (169-187)
src/writers/writer_eventbridge.py (1)
  • check_health (106-122)
src/writers/writer_postgres.py (1)
  • check_health (299-318)
tests/writers/test_writer_postgres.py (1)
src/writers/writer_postgres.py (6)
  • WriterPostgres (47-318)
  • _postgres_edla_write (67-126)
  • _postgres_run_write (128-210)
  • _postgres_test_write (212-250)
  • write (252-297)
  • check_health (299-318)
src/writers/writer_kafka.py (3)
src/utils/trace_logging.py (1)
  • log_payload_at_trace (29-48)
src/writers/writer.py (3)
  • Writer (25-59)
  • write (35-47)
  • check_health (50-59)
tests/writers/test_writer_kafka.py (7)
  • flush (33-34)
  • flush (49-56)
  • flush (65-67)
  • flush (76-78)
  • produce (28-31)
  • produce (38-40)
  • produce (110-111)
src/event_gate_lambda.py (5)
src/writers/writer_eventbridge.py (1)
  • WriterEventBridge (38-122)
src/writers/writer_kafka.py (1)
  • WriterKafka (42-187)
src/writers/writer_postgres.py (1)
  • WriterPostgres (47-318)
src/handlers/handler_topic.py (2)
  • HandlerTopic (38-155)
  • load_topic_schemas (56-72)
src/handlers/handler_health.py (1)
  • HandlerHealth (33-75)
src/writers/writer_postgres.py (2)
src/utils/trace_logging.py (1)
  • log_payload_at_trace (29-48)
src/writers/writer.py (3)
  • Writer (25-59)
  • write (35-47)
  • check_health (50-59)
tests/writers/test_writer_kafka.py (1)
src/writers/writer_kafka.py (3)
  • WriterKafka (42-187)
  • write (101-167)
  • check_health (169-187)
tests/handlers/test_handler_health.py (1)
src/handlers/handler_health.py (2)
  • HandlerHealth (33-75)
  • get_health (42-75)
🪛 GitHub Check: AquaSec
src/writers/writer_postgres.py

[failure] 89-138: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.


[failure] 151-185: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.


[failure] 188-222: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.


[failure] 234-262: sqlalchemy safe query execution
Merging user inputs directly into SQL queries without precautions can lead to SQL Injection vulnerabilities. It is highly recomended to use prepared statements to ensure data is handled securely, with SQLAlchemy's TextualSQL being an ideal tool for this. It supports named parameters for safer query construction. For more complex queries, SQLAlchemy's SQL Expression Language or Schema Definition Language are recommended. Opting for SQLAlchemy ORM is often the safest and most efficient route for database operations.

🪛 Ruff (0.14.11)
tests/utils/test_trace_logging.py

48-48: Unused method argument: timeout

(ARG002)

src/writers/writer_eventbridge.py

120-120: Consider moving this statement to an else block

(TRY300)

tests/writers/test_writer_postgres.py

209-209: Unused function argument: reset_env

(ARG001)


216-216: Unused function argument: reset_env

(ARG001)


224-224: Unused function argument: reset_env

(ARG001)


233-233: Unused function argument: reset_env

(ARG001)


243-243: Unused function argument: reset_env

(ARG001)


245-245: Unused method argument: kwargs

(ARG002)


255-255: Unused function argument: reset_env

(ARG001)


270-270: Unused function argument: reset_env

(ARG001)


290-290: Unused function argument: reset_env

(ARG001)

src/writers/writer_kafka.py

132-132: Unused lambda argument: msg

(ARG005)


185-185: Consider moving this statement to an else block

(TRY300)

src/writers/writer_postgres.py

78-110: Possible SQL injection vector through string-based query construction

(S608)


140-162: Possible SQL injection vector through string-based query construction

(S608)


177-199: Possible SQL injection vector through string-based query construction

(S608)


223-241: Possible SQL injection vector through string-based query construction

(S608)


293-293: Use explicit conversion flag

Replace with conversion flag

(RUF010)

tests/writers/test_writer_kafka.py

33-33: Unused method argument: timeout

(ARG002)


49-49: Unused method argument: timeout

(ARG002)


65-65: Unused method argument: timeout

(ARG002)

🔇 Additional comments (41)
tests/writers/test_writer_eventbridge.py (3)

1-21: LGTM! Well-structured test file for the refactored EventBridge writer.

The imports and file setup are clean. The tests properly instantiate WriterEventBridge with per-instance configuration, aligning with the new OOP architecture.


26-39: Good test coverage for write() happy paths.

Tests correctly verify:

  • Skipping when event_bus_arn is empty
  • Successful write with mocked client
  • Proper assertion of put_events being called

74-96: Good health check test coverage.

The health check tests properly cover:

  • Not configured state (empty event_bus_arn)
  • Success scenario with mocked boto3.client
  • Client error scenario with proper BotoCoreError subclass

The fmt attribute on line 91 is correctly added to satisfy BotoCoreError's format string requirement.

tests/handlers/test_handler_health.py (4)

23-28: Well-designed helper function for test mocking.

The _create_mock_writer helper provides a clean, reusable way to create mock writers with specific check_health return values, supporting the dependency injection pattern.


33-48: Good test for minimal healthy state.

Test correctly validates that health returns 200 when Kafka is healthy and optional writers are "not configured" (which is a healthy state per the Writer interface contract).


89-116: Good coverage for individual writer disabled states.

Tests properly verify that disabling EventBridge or Postgres (returning (True, "not configured")) still results in a healthy overall status.


137-163: Good uptime and integration tests.

The uptime test validates the response structure, and the integration test ensures the full lambda handler path works with the mocked writers from conftest.

tests/writers/test_writer_postgres.py (6)

38-89: Thorough tests for Postgres insert helper methods.

Tests properly verify:

  • _postgres_edla_write with optional and missing optional fields
  • Correct parameter positioning and JSON serialization

91-154: Good coverage for run and test write helpers.

_postgres_run_write test validates both the run insert and multiple job inserts with correct parameter handling. _postgres_test_write properly tests the test table insertion.


159-164: The reset_env fixture is intentionally used for side-effect cleanup.

The static analysis warning about unused reset_env argument is a false positive. This is a standard pytest pattern where fixtures are included for their setup/teardown effects rather than direct usage. The fixture cleans up environment variables after each test.


206-253: Good write() behavioral path coverage.

Tests properly verify:

  • Skip when no database configured
  • Skip when psycopg2 is missing
  • Error on unknown topic
  • Success on known topic
  • Exception handling returns error

255-307: Good init and topic-specific write tests.

Tests verify:

  • Secret loading from AWS Secrets Manager
  • dlchange topic write success
  • runs topic write success (validates 2 inserts: run + job)

313-331: Good check_health() coverage.

Tests cover:

  • Not configured state (empty database)
  • Success state (all config present)
  • Missing host detection

Consider adding tests for other missing config fields (user, password, port) for completeness, though this is optional since the logic is similar.

src/writers/writer.py (2)

25-48: Well-designed abstract base class.

The Writer ABC provides a clean, minimal interface for all writers with:

  • Configuration storage in __init__
  • write() with clear success/failure return semantics
  • Documented convention for skip scenarios returning (True, None)

This aligns well with the concrete implementations in WriterKafka, WriterEventBridge, and WriterPostgres.


49-59: Good health check contract documentation.

The docstring clearly defines the three possible health states:

  • (True, "ok") - configured and working
  • (True, "not configured") - not configured, skipped
  • (False, "error message") - configured but failing

This convention is consistently implemented across all concrete writers.

tests/utils/test_trace_logging.py (4)

17-23: Clean module imports for testing.

The explicit module imports allow proper access to both the writer classes and their module-level loggers for setting trace level.


25-38: Good EventBridge trace logging test.

Test properly:

  • Sets trace level on the module's logger
  • Creates writer with event_bus_arn
  • Mocks the client
  • Verifies trace payload appears in logs

41-61: FakeProducer timeout parameter is intentionally unused.

The timeout parameter on flush (line 48) matches the real Producer.flush(timeout) signature that WriterKafka._flush_with_timeout calls. The static analysis warning is a false positive - the parameter exists for interface compatibility, not for use in this mock.


64-107: Good Postgres trace logging test.

Test properly:

  • Monkeypatches psycopg2 with dummy implementation
  • Sets trace level on module's logger
  • Creates writer and sets _db_config
  • Verifies trace payload appears in logs
src/handlers/handler_health.py (3)

26-26: LGTM!

Clean import of the Writer abstract base class, aligning with the new OOP architecture.


38-40: LGTM!

The constructor now accepts a writers dictionary via dependency injection, which improves testability and decouples HandlerHealth from specific writer implementations. This is a well-designed change that aligns with the PR objectives.


55-58: LGTM!

The generalized health check loop cleanly iterates over all writers and aggregates failures. This replaces hardcoded per-writer checks with a uniform approach, making it easy to add or remove writers without modifying this code.

tests/handlers/test_handler_topic.py (5)

28-34: LGTM!

Good test setup with mock writers dictionary. This properly aligns with the new constructor signature that accepts a writers parameter via dependency injection.


143-146: LGTM!

Clean approach to mock all writers uniformly for success scenarios. Iterating over writers.values() makes the test resilient to adding new writers.


162-166: LGTM!

Good pattern for testing single writer failure - explicitly setting each writer's mock return value makes the test intent clear and verifiable.


183-187: LGTM!

Proper setup for testing multiple writer failures. The assertion on Line 198 correctly verifies that both failure types are reported.


202-205: LGTM!

Consistent mocking pattern for the lowercase bearer header test case.

src/handlers/handler_topic.py (3)

31-31: LGTM!

Clean import of the Writer abstract base class.


43-54: LGTM!

Well-structured constructor with dependency injection of the writers registry. This design cleanly separates handler logic from writer instantiation, improving testability and aligning with the OOP refactoring goals.


139-142: LGTM!

The unified writer iteration loop is a significant improvement. It:

  • Treats all writers uniformly via the Writer interface
  • Collects errors with descriptive type/message structure
  • Enables easy addition of new writers without code changes here
  • Sets the foundation for future async dispatch (per issue #94)
src/event_gate_lambda.py (2)

32-34: LGTM!

Explicit imports of concrete writer classes. This makes dependencies clear and avoids wildcard imports.


91-94: LGTM!

Clean wiring of the writers registry into both HandlerTopic and HandlerHealth via dependency injection. This enables uniform health checks and message dispatch across all writers.

src/writers/writer_kafka.py (4)

17-20: LGTM!

Clear module docstring describing the Kafka writer's purpose.


42-51: LGTM!

Clean class definition with proper inheritance from Writer. The constructor correctly defers producer creation to lazy initialization, aligning with issue #94's requirement to avoid throwing in constructors.


84-99: LGTM!

Good defensive helper method. The TypeError catch handles stubs or older client versions that don't support the timeout parameter.


169-187: LGTM!

The check_health() method properly implements the Writer contract:

  • Returns (True, "not configured") when bootstrap server is missing
  • Lazily initializes producer during health check if needed
  • Returns (False, "producer initialization failed") if creation fails
  • Catches KafkaException and returns error details

The static analysis hint (TRY300) about moving Line 185 to an else block is a minor style preference and doesn't affect correctness.

tests/writers/test_writer_kafka.py (2)

24-78: LGTM - Fake producer classes correctly implement the interface.

The static analysis warnings about unused timeout parameters (lines 33, 49, 65) are false positives. These fake/mock classes intentionally accept but ignore the timeout parameter to match the real Kafka Producer interface signature. The FakeProducerTypeError class (line 76) intentionally omits the parameter to test the TypeError fallback path.


84-183: Test coverage for the new OOP design looks comprehensive.

The tests properly exercise:

  • Lazy initialization skip when not configured
  • Success and error paths for write operations
  • Kafka exception handling
  • Flush retry logic with various producer behaviors
  • Health check scenarios (not configured, success, initialization failure)

The dependency injection pattern using writer._producer = FakeProducer*() is appropriate for unit testing.

src/writers/writer_eventbridge.py (2)

61-104: LGTM - Write method implementation is clean and follows the lazy initialization pattern.

The method properly:

  • Skips when not configured (returns True, None)
  • Lazily initializes the boto3 client on first use
  • Logs payload at trace level before publishing
  • Handles AWS-specific exceptions with clear error messages

106-122: Health check implementation is correct.

The static analysis hint (TRY300) about moving the return statement to an else block is a minor style suggestion. The current structure is functionally correct and readable.

src/writers/writer_postgres.py (1)

299-318: Health check only validates config presence, not connectivity.

Unlike the Kafka and EventBridge writers which attempt to create clients during check_health(), the Postgres writer only checks if configuration values are present. This means a health check could return (True, "ok") even if the database is unreachable.

This may be intentional to avoid establishing database connections during health checks, but it's inconsistent with the other writers' behavior.

Is this the intended behavior? If actual connectivity testing is desired, consider adding an optional connection test:

# Optional: Test actual connectivity
with psycopg2.connect(...) as conn:
    conn.cursor().execute("SELECT 1")

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines +84 to +88
writers = {
"kafka": WriterKafka(config),
"eventbridge": WriterEventBridge(config),
"postgres": WriterPostgres(config),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

find . -name "writer_postgres.py" -o -name "event_gate_lambda.py" | head -20

Repository: AbsaOSS/EventGate

Length of output: 120


🏁 Script executed:

# Check WriterPostgres implementation
if [ -f "src/writer_postgres.py" ]; then
  echo "=== writer_postgres.py ===" 
  wc -l src/writer_postgres.py
  head -70 src/writer_postgres.py
elif [ -f "src/writers/writer_postgres.py" ]; then
  echo "=== src/writers/writer_postgres.py ===" 
  wc -l src/writers/writer_postgres.py
  head -70 src/writers/writer_postgres.py
fi

Repository: AbsaOSS/EventGate

Length of output: 2382


🏁 Script executed:

# Check event_gate_lambda.py
if [ -f "src/event_gate_lambda.py" ]; then
  echo "=== event_gate_lambda.py ===" 
  sed -n '80,95p' src/event_gate_lambda.py
fi

Repository: AbsaOSS/EventGate

Length of output: 593


Verify constructor behavior aligns with issue #94 requirements.

The writers registry structure is well-designed, but WriterPostgres.__init__ (lines 50-61 in src/writers/writer_postgres.py) violates issue #94's requirement to "avoid any initialization logic in constructors that can throw exceptions."

When POSTGRES_SECRET_NAME and POSTGRES_SECRET_REGION environment variables are set, the constructor immediately calls aws_secrets.get_secret_value(), which can raise BotoCoreError, ClientError, or KeyError. Since this instantiation occurs during Lambda handler initialization, it will cause the entire handler to fail if Secrets Manager is unavailable, resulting in cold-start failures.

Defer secrets loading to lazy initialization in write() or check_health() to ensure the handler initializes successfully regardless of Secrets Manager availability, fully complying with the stated design goal.

🤖 Prompt for AI Agents
In `@src/event_gate_lambda.py` around lines 84 - 88, WriterPostgres.__init__
performs eager secret fetching via aws_secrets.get_secret_value when
POSTGRES_SECRET_NAME/POSTGRES_SECRET_REGION are set, which can raise exceptions
during Lambda cold-start; refactor WriterPostgres so its constructor only stores
config and env values, and move any call to aws_secrets.get_secret_value (and
parsing of returned secret) into a lazy initializer invoked from
WriterPostgres.write() and WriterPostgres.check_health(), ensuring those methods
guard against repeated initialization and surface errors there instead of in
__init__.

Comment on lines +53 to +64
def __init__(self, config: Dict[str, Any]) -> None:
super().__init__(config)
secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")

if secret_name and secret_region:
aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region)
postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"]
self._db_config: Dict[str, Any] = json.loads(postgres_secret)
else:
self._db_config = {"database": ""}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Constructor performs AWS calls that can throw exceptions - violates PR objective.

The constructor calls boto3.Session().client() and get_secret_value() (lines 59-60), which can throw BotoCoreError or ClientError. This directly violates the requirement from Issue #94: "avoid any initialization logic in constructors that can throw exceptions."

The EventBridge writer correctly defers client creation to lazy initialization in write(). The Postgres writer should follow the same pattern for loading secrets.

🔧 Suggested fix: Defer secret loading to first write()
     def __init__(self, config: Dict[str, Any]) -> None:
         super().__init__(config)
-        secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
-        secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
-
-        if secret_name and secret_region:
-            aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region)
-            postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"]
-            self._db_config: Dict[str, Any] = json.loads(postgres_secret)
-        else:
-            self._db_config = {"database": ""}
-
+        self._db_config: Optional[Dict[str, Any]] = None
         logger.debug("Initialized PostgreSQL writer")
+
+    def _load_db_config(self) -> Dict[str, Any]:
+        """Lazily load database configuration from AWS Secrets Manager."""
+        secret_name = os.environ.get("POSTGRES_SECRET_NAME", "")
+        secret_region = os.environ.get("POSTGRES_SECRET_REGION", "")
+        if secret_name and secret_region:
+            aws_secrets = boto3.Session().client(service_name="secretsmanager", region_name=secret_region)
+            postgres_secret = aws_secrets.get_secret_value(SecretId=secret_name)["SecretString"]
+            return json.loads(postgres_secret)
+        return {"database": ""}

Then in write():

+        if self._db_config is None:
+            self._db_config = self._load_db_config()
         if not self._db_config.get("database"):
🤖 Prompt for AI Agents
In `@src/writers/writer_postgres.py` around lines 53 - 64, The constructor
(__init__) is performing AWS Secrets Manager calls (boto3.Session().client and
get_secret_value) which may raise exceptions; change it to only read and store
the environment variables (e.g., secret_name, secret_region) and initialize
_db_config to a safe default (e.g., {"database": ""}) without making network
calls. Add a lazy-initialization step in write() (or a helper like
_load_db_config_if_needed / _ensure_db_config) that, on first write, uses boto3
to fetch and json.loads the secret into self._db_config, catching and surfacing
BotoCoreError/ClientError appropriately (and caching the loaded config so
subsequent writes don't re-fetch). Ensure unique symbols referenced: __init__,
_db_config, write (or _load_db_config_if_needed).

Comment on lines +292 to +295
except (RuntimeError, PsycopgError) as e:
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
logger.exception(err_msg)
return False, err_msg
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix grammatical error in error message.

The error message "The Postgres writer with failed unknown error" should be "The Postgres writer failed with unknown error".

🔧 Proposed fix
-            err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
+            err_msg = f"The Postgres writer failed with unknown error: {e!s}"
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
except (RuntimeError, PsycopgError) as e:
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
logger.exception(err_msg)
return False, err_msg
except (RuntimeError, PsycopgError) as e:
err_msg = f"The Postgres writer failed with unknown error: {e!s}"
logger.exception(err_msg)
return False, err_msg
🧰 Tools
🪛 Ruff (0.14.11)

293-293: Use explicit conversion flag

Replace with conversion flag

(RUF010)

🤖 Prompt for AI Agents
In `@src/writers/writer_postgres.py` around lines 292 - 295, The error message
inside the except block catching (RuntimeError, PsycopgError) as e is
grammatically incorrect; update the err_msg construction so it reads "The
Postgres writer failed with unknown error" (keep including the exception string
via str(e)), then call logger.exception with that corrected err_msg and return
the same False, err_msg tuple; locate the except block that defines err_msg and
uses logger.exception to make this change.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

refactoring Improving code quality, paying off tech debt, aligning APIs

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Writers logic needs an OOP approach

2 participants