Skip to content

Commit caa9bc2

Browse files
Refactor code for consistency; update string formatting and improve readability
1 parent 3d4194c commit caa9bc2

File tree

5 files changed

+127
-113
lines changed

5 files changed

+127
-113
lines changed

src/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1 @@
11
"""Package initializer for EventGate source modules."""
2-

src/event_gate_lambda.py

Lines changed: 46 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
#
1+
#
22
# Copyright 2024 ABSA Group Limited
3-
#
3+
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
7-
#
7+
#
88
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
9+
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
#
15+
#
1616
import base64
1717
import json
1818
import logging
@@ -28,8 +28,8 @@
2828
from jsonschema.exceptions import ValidationError
2929

3030
# Resolve project root (parent directory of this file's directory)
31-
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))
32-
_CONF_DIR = os.path.join(_PROJECT_ROOT, 'conf')
31+
_PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
32+
_CONF_DIR = os.path.join(_PROJECT_ROOT, "conf")
3333

3434
sys.path.append(os.path.join(os.path.dirname(__file__)))
3535

@@ -45,28 +45,28 @@
4545
logger.addHandler(logging.StreamHandler())
4646
logger.debug("Initialized LOGGER")
4747

48-
with open(os.path.join(_CONF_DIR, 'api.yaml'), 'r') as file:
48+
with open(os.path.join(_CONF_DIR, "api.yaml"), "r") as file:
4949
API = file.read()
5050
logger.debug("Loaded API definition")
5151

5252
TOPICS = {}
53-
with open(os.path.join(_CONF_DIR, 'topic_runs.json'), 'r') as file:
53+
with open(os.path.join(_CONF_DIR, "topic_runs.json"), "r") as file:
5454
TOPICS["public.cps.za.runs"] = json.load(file)
55-
with open(os.path.join(_CONF_DIR, 'topic_dlchange.json'), 'r') as file:
55+
with open(os.path.join(_CONF_DIR, "topic_dlchange.json"), "r") as file:
5656
TOPICS["public.cps.za.dlchange"] = json.load(file)
57-
with open(os.path.join(_CONF_DIR, 'topic_test.json'), 'r') as file:
57+
with open(os.path.join(_CONF_DIR, "topic_test.json"), "r") as file:
5858
TOPICS["public.cps.za.test"] = json.load(file)
5959
logger.debug("Loaded TOPICS")
6060

61-
with open(os.path.join(_CONF_DIR, 'config.json'), 'r') as file:
61+
with open(os.path.join(_CONF_DIR, "config.json"), "r") as file:
6262
CONFIG = json.load(file)
6363
logger.debug("Loaded main CONFIG")
6464

65-
aws_s3 = boto3.Session().resource('s3', verify=False)
65+
aws_s3 = boto3.Session().resource("s3", verify=False)
6666
logger.debug("Initialized AWS S3 Client")
6767

6868
if CONFIG["access_config"].startswith("s3://"):
69-
name_parts = CONFIG["access_config"].split('/')
69+
name_parts = CONFIG["access_config"].split("/")
7070
bucket_name = name_parts[2]
7171
bucket_object = "/".join(name_parts[3:])
7272
ACCESS = json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object).get()["Body"].read().decode("utf-8"))
@@ -84,55 +84,49 @@
8484
writer_kafka.init(logger, CONFIG)
8585
writer_postgres.init(logger)
8686

87+
8788
def _error_response(status, err_type, message):
8889
return {
8990
"statusCode": status,
9091
"headers": {"Content-Type": "application/json"},
91-
"body": json.dumps({
92-
"success": False,
93-
"statusCode": status,
94-
"errors": [{"type": err_type, "message": message}]
95-
})
92+
"body": json.dumps(
93+
{"success": False, "statusCode": status, "errors": [{"type": err_type, "message": message}]}
94+
),
9695
}
9796

97+
9898
def get_api():
99-
return {
100-
"statusCode": 200,
101-
"body": API
102-
}
99+
return {"statusCode": 200, "body": API}
100+
103101

104102
def get_token():
105103
logger.debug("Handling GET Token")
106-
return {
107-
"statusCode": 303,
108-
"headers": {"Location": TOKEN_PROVIDER_URL}
109-
}
110-
104+
return {"statusCode": 303, "headers": {"Location": TOKEN_PROVIDER_URL}}
105+
106+
111107
def get_topics():
112108
logger.debug("Handling GET Topics")
113109
return {
114110
"statusCode": 200,
115111
"headers": {"Content-Type": "application/json"},
116-
"body": json.dumps([topicName for topicName in TOPICS])
112+
"body": json.dumps([topicName for topicName in TOPICS]),
117113
}
118-
114+
115+
119116
def get_topic_schema(topicName):
120117
logger.debug(f"Handling GET TopicSchema({topicName})")
121118
if topicName not in TOPICS:
122119
return _error_response(404, "topic", f"Topic '{topicName}' not found")
123-
124-
return {
125-
"statusCode": 200,
126-
"headers": {"Content-Type": "application/json"},
127-
"body": json.dumps(TOPICS[topicName])
128-
}
120+
121+
return {"statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(TOPICS[topicName])}
122+
129123

130124
def post_topic_message(topicName, topicMessage, tokenEncoded):
131125
logger.debug(f"Handling POST {topicName}")
132126
try:
133127
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
134128
except Exception:
135-
return _error_response(401, "auth", "Invalid or missing token")
129+
return _error_response(401, "auth", "Invalid or missing token")
136130

137131
if topicName not in TOPICS:
138132
return _error_response(404, "topic", f"Topic '{topicName}' not found")
@@ -144,8 +138,8 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
144138
try:
145139
validate(instance=topicMessage, schema=TOPICS[topicName])
146140
except ValidationError as e:
147-
return _error_response(400, "validation", e.message)
148-
141+
return _error_response(400, "validation", e.message)
142+
149143
# Run all writers independently (avoid short-circuit so failures in one don't skip others)
150144
kafka_ok, kafka_err = writer_kafka.write(topicName, topicMessage)
151145
eventbridge_ok, eventbridge_err = writer_eventbridge.write(topicName, topicMessage)
@@ -163,31 +157,26 @@ def post_topic_message(topicName, topicMessage, tokenEncoded):
163157
return {
164158
"statusCode": 500,
165159
"headers": {"Content-Type": "application/json"},
166-
"body": json.dumps({
167-
"success": False,
168-
"statusCode": 500,
169-
"errors": errors
170-
})
160+
"body": json.dumps({"success": False, "statusCode": 500, "errors": errors}),
171161
}
172162

173163
return {
174164
"statusCode": 202,
175165
"headers": {"Content-Type": "application/json"},
176-
"body": json.dumps({
177-
"success": True,
178-
"statusCode": 202
179-
})
166+
"body": json.dumps({"success": True, "statusCode": 202}),
180167
}
181168

169+
182170
def extract_token(eventHeaders):
183171
# Initial implementation used bearer header directly
184172
if "bearer" in eventHeaders:
185173
return eventHeaders["bearer"]
186-
174+
187175
if "Authorization" in eventHeaders and eventHeaders["Authorization"].startswith("Bearer "):
188-
return eventHeaders["Authorization"][len("Bearer "):]
189-
190-
return "" # Will result in 401
176+
return eventHeaders["Authorization"][len("Bearer ") :]
177+
178+
return "" # Will result in 401
179+
191180

192181
def lambda_handler(event, context):
193182
try:
@@ -201,7 +190,11 @@ def lambda_handler(event, context):
201190
if event["httpMethod"] == "GET":
202191
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
203192
if event["httpMethod"] == "POST":
204-
return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), extract_token(event["headers"]))
193+
return post_topic_message(
194+
event["pathParameters"]["topic_name"].lower(),
195+
json.loads(event["body"]),
196+
extract_token(event["headers"]),
197+
)
205198
if event["resource"].lower() == "/terminate":
206199
sys.exit("TERMINATING")
207200
return _error_response(404, "route", "Resource not found")

src/writer_eventbridge.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,19 @@
22

33
import boto3
44

5+
56
def init(logger, CONFIG):
67
global _logger
78
global EVENT_BUS_ARN
89
global aws_eventbridge
9-
10+
1011
_logger = logger
11-
12-
aws_eventbridge = boto3.client('events')
12+
13+
aws_eventbridge = boto3.client("events")
1314
EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else ""
1415
_logger.debug("Initialized EVENTBRIDGE writer")
1516

17+
1618
def write(topicName, message):
1719
if not EVENT_BUS_ARN:
1820
_logger.debug("No EventBus Arn - skipping")
@@ -24,17 +26,17 @@ def write(topicName, message):
2426
Entries=[
2527
{
2628
"Source": topicName,
27-
'DetailType': 'JSON',
28-
'Detail': json.dumps(message),
29-
'EventBusName': EVENT_BUS_ARN,
29+
"DetailType": "JSON",
30+
"Detail": json.dumps(message),
31+
"EventBusName": EVENT_BUS_ARN,
3032
}
3133
]
3234
)
3335
if response["FailedEntryCount"] > 0:
3436
_logger.error(str(response))
3537
return False
3638
except Exception as e:
37-
_logger.error(f'The EventBridge writer failed with unknown error: {str(e)}')
39+
_logger.error(f"The EventBridge writer failed with unknown error: {str(e)}")
3840
return False
39-
41+
4042
return True

src/writer_kafka.py

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,43 +3,49 @@
33
import boto3
44
from confluent_kafka import Producer
55

6+
67
def init(logger, CONFIG):
78
global _logger
89
global kafka_producer
9-
10+
1011
_logger = logger
11-
12+
1213
producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]}
1314
if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG:
14-
producer_config.update({
15-
"security.protocol": "SASL_SSL",
16-
"sasl.mechanism": "GSSAPI",
17-
"sasl.kerberos.service.name": "kafka",
18-
"sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"],
19-
"sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"],
20-
"ssl.ca.location": CONFIG["kafka_ssl_ca_path"],
21-
"ssl.certificate.location": CONFIG["kafka_ssl_cert_path"],
22-
"ssl.key.location": CONFIG["kafka_ssl_key_path"],
23-
"ssl.key.password": CONFIG["kafka_ssl_key_password"]
24-
})
15+
producer_config.update(
16+
{
17+
"security.protocol": "SASL_SSL",
18+
"sasl.mechanism": "GSSAPI",
19+
"sasl.kerberos.service.name": "kafka",
20+
"sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"],
21+
"sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"],
22+
"ssl.ca.location": CONFIG["kafka_ssl_ca_path"],
23+
"ssl.certificate.location": CONFIG["kafka_ssl_cert_path"],
24+
"ssl.key.location": CONFIG["kafka_ssl_key_path"],
25+
"ssl.key.password": CONFIG["kafka_ssl_key_password"],
26+
}
27+
)
2528
_logger.debug("producer will use SASL_SSL")
2629
kafka_producer = Producer(producer_config)
2730
_logger.debug("Initialized KAFKA writer")
2831

32+
2933
def write(topicName, message):
3034
try:
3135
_logger.debug(f"Sending to kafka {topicName}")
3236
error = []
33-
kafka_producer.produce(topicName,
34-
key="",
35-
value=json.dumps(message).encode("utf-8"),
36-
callback = lambda err, msg: error.append(err) if err is not None else None)
37+
kafka_producer.produce(
38+
topicName,
39+
key="",
40+
value=json.dumps(message).encode("utf-8"),
41+
callback=lambda err, msg: error.append(err) if err is not None else None,
42+
)
3743
kafka_producer.flush()
3844
if error:
3945
_logger.error(str(error))
4046
return False
4147
except Exception as e:
42-
_logger.error(f'The Kafka writer failed with unknown error: {str(e)}')
48+
_logger.error(f"The Kafka writer failed with unknown error: {str(e)}")
4349
return False
44-
50+
4551
return True

0 commit comments

Comments
 (0)