Skip to content

Commit 971c38b

Browse files
Refactor event writers to improve type hinting and logging; enhance error handling in write methods
1 parent 6f5b665 commit 971c38b

File tree

5 files changed

+50
-21
lines changed

5 files changed

+50
-21
lines changed

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,8 @@ omit = ["tests/*"]
99
[tool.mypy]
1010
check_untyped_defs = true
1111
exclude = "tests"
12+
ignore_missing_imports = true
13+
python_version = "3.11"
14+
packages = ["src"]
15+
explicit_package_bases = true
16+
disable_error_code = ["import-untyped"]

src/event_gate_lambda.py

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import os
2020
import sys
2121
import urllib3
22+
from typing import Any
2223

2324
import boto3
2425
import jwt
@@ -31,11 +32,7 @@
3132
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
3233
_CONF_DIR = os.path.join(_PROJECT_ROOT, "conf")
3334

34-
sys.path.append(os.path.join(os.path.dirname(__file__)))
35-
36-
import writer_eventbridge
37-
import writer_kafka
38-
import writer_postgres
35+
from . import writer_eventbridge, writer_kafka, writer_postgres
3936

4037
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
4138

@@ -77,7 +74,7 @@
7774

7875
TOKEN_PROVIDER_URL = CONFIG["token_provider_url"]
7976
token_public_key_encoded = requests.get(CONFIG["token_public_key_url"], verify=False).json()["key"]
80-
TOKEN_PUBLIC_KEY = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded))
77+
TOKEN_PUBLIC_KEY: Any = serialization.load_der_public_key(base64.b64decode(token_public_key_encoded))
8178
logger.debug("Loaded TOKEN_PUBLIC_KEY")
8279

8380
writer_eventbridge.init(logger, CONFIG)

src/writer_eventbridge.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,16 @@
1414
# limitations under the License.
1515
#
1616
import json
17+
import logging
18+
from typing import Optional, Tuple
1719

1820
import boto3
1921

22+
# Module globals for typing
23+
_logger: logging.Logger = logging.getLogger(__name__)
24+
EVENT_BUS_ARN: str = ""
25+
aws_eventbridge = None # will hold boto3 client
26+
2027

2128
def init(logger, CONFIG):
2229
global _logger
@@ -30,10 +37,13 @@ def init(logger, CONFIG):
3037
_logger.debug("Initialized EVENTBRIDGE writer")
3138

3239

33-
def write(topicName, message):
40+
def write(topicName, message) -> Tuple[bool, Optional[str]]:
3441
if not EVENT_BUS_ARN:
3542
_logger.debug("No EventBus Arn - skipping")
3643
return True, None
44+
if aws_eventbridge is None: # defensive
45+
_logger.debug("EventBridge client not initialized - skipping")
46+
return True, None
3747

3848
try:
3949
_logger.debug(f"Sending to eventBridge {topicName}")

src/writer_kafka.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,16 @@
1414
# limitations under the License.
1515
#
1616
import json
17+
import logging
18+
from typing import Any, Dict, Optional, Tuple
1719

1820
import boto3
1921
from confluent_kafka import Producer
2022

23+
# Module globals for typing
24+
_logger: logging.Logger = logging.getLogger(__name__)
25+
kafka_producer: Optional[Producer] = None
26+
2127

2228
def init(logger, CONFIG):
2329
global _logger
@@ -45,10 +51,13 @@ def init(logger, CONFIG):
4551
_logger.debug("Initialized KAFKA writer")
4652

4753

48-
def write(topicName, message):
54+
def write(topicName, message) -> Tuple[bool, Optional[str]]:
4955
try:
56+
if kafka_producer is None:
57+
_logger.debug("Kafka producer not initialized - skipping")
58+
return True, None
5059
_logger.debug(f"Sending to kafka {topicName}")
51-
error = []
60+
error: list[Any] = []
5261
kafka_producer.produce(
5362
topicName,
5463
key="",
@@ -57,10 +66,12 @@ def write(topicName, message):
5766
)
5867
kafka_producer.flush()
5968
if error:
60-
_logger.error(str(error))
61-
return False
69+
msg = str(error)
70+
_logger.error(msg)
71+
return False, msg
6272
except Exception as e:
63-
_logger.error(f"The Kafka writer failed with unknown error: {str(e)}")
64-
return False
73+
msg = f"The Kafka writer failed with unknown error: {str(e)}"
74+
_logger.error(msg)
75+
return False, msg
6576

66-
return True
77+
return True, None

src/writer_postgres.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
#
1616
import json
1717
import os
18+
import logging
19+
from typing import Any, Dict, Tuple, Optional
1820

1921
import boto3
2022
from botocore.exceptions import ClientError
2123

2224
try:
2325
import psycopg2 # noqa: F401
2426
except ImportError: # pragma: no cover - environment without psycopg2
25-
psycopg2 = None
27+
psycopg2 = None # type: ignore
28+
29+
# Module level globals for typing
30+
_logger: logging.Logger = logging.getLogger(__name__)
31+
POSTGRES: Dict[str, Any] = {"database": ""}
2632

2733

2834
def init(logger):
@@ -201,23 +207,23 @@ def postgres_test_write(cursor, table, message):
201207
)
202208

203209

204-
def write(topicName, message):
210+
def write(topicName, message) -> Tuple[bool, Optional[str]]:
205211
try:
206-
if not POSTGRES["database"]:
212+
if not POSTGRES.get("database"):
207213
_logger.debug("No Postgres - skipping")
208214
return True, None
209-
if psycopg2 is None:
215+
if psycopg2 is None: # type: ignore
210216
_logger.debug("psycopg2 not available - skipping actual Postgres write")
211217
return True, None
212218

213-
with psycopg2.connect(
219+
with psycopg2.connect( # type: ignore[attr-defined]
214220
database=POSTGRES["database"],
215221
host=POSTGRES["host"],
216222
user=POSTGRES["user"],
217223
password=POSTGRES["password"],
218224
port=POSTGRES["port"],
219225
) as connection:
220-
with connection.cursor() as cursor:
226+
with connection.cursor() as cursor: # type: ignore
221227
if topicName == "public.cps.za.dlchange":
222228
postgres_edla_write(cursor, "public_cps_za_dlchange", message)
223229
elif topicName == "public.cps.za.runs":
@@ -229,7 +235,7 @@ def write(topicName, message):
229235
_logger.error(msg)
230236
return False, msg
231237

232-
connection.commit()
238+
connection.commit() # type: ignore
233239
except Exception as e: # pragma: no cover - defensive (still tested though)
234240
err_msg = f"The Postgres writer with failed unknown error: {str(e)}"
235241
_logger.error(err_msg)

0 commit comments

Comments
 (0)