Skip to content

Commit b72beaf

Browse files
Refactor code for consistency and readability; update requirements for testing and dependencies
1 parent 0dcbd53 commit b72beaf

File tree

8 files changed

+128
-135
lines changed

8 files changed

+128
-135
lines changed

requirements.txt

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,15 @@
1-
pytest==7.4.3
2-
pytest-cov==5.0.0
3-
pytest-mock==3.14.0
4-
pylint==3.2.6
5-
black==24.8.0
6-
mypy==1.15.0
7-
mypy-extensions==1.0.0
1+
pytest==8.4.2
2+
pytest-cov==6.3.0
3+
pytest-mock==3.15.0
4+
pylint==3.3.8
5+
black==25.1.0
6+
mypy==1.17.1
7+
mypy-extensions==1.1.0
8+
urllib3==2.5.0
9+
cryptography==45.0.7
10+
jsonschema==4.25.1
11+
PyJWT==2.10.1
12+
requests==2.32.5
13+
boto3==1.40.25
14+
confluent_kafka
15+
psycopg2-binary==2.9.10

src/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
"""Package initializer for EventGate source modules."""
2+

src/event_gate_lambda.py

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,18 @@
1-
#
1+
#
22
# Copyright 2024 ABSA Group Limited
3-
#
3+
#
44
# Licensed under the Apache License, Version 2.0 (the "License");
55
# you may not use this file except in compliance with the License.
66
# You may obtain a copy of the License at
7-
#
7+
#
88
# http://www.apache.org/licenses/LICENSE-2.0
9-
#
9+
#
1010
# Unless required by applicable law or agreed to in writing, software
1111
# distributed under the License is distributed on an "AS IS" BASIS,
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15-
#
15+
#
1616
import base64
1717
import json
1818
import logging
@@ -58,11 +58,11 @@
5858
CONFIG = json.load(file)
5959
logger.debug("Loaded main CONFIG")
6060

61-
aws_s3 = boto3.Session().resource("s3", verify=False)
61+
aws_s3 = boto3.Session().resource('s3', verify=False)
6262
logger.debug("Initialized AWS S3 Client")
6363

6464
if CONFIG["access_config"].startswith("s3://"):
65-
name_parts = CONFIG["access_config"].split("/")
65+
name_parts = CONFIG["access_config"].split('/')
6666
bucket_name = name_parts[2]
6767
bucket_object = "/".join(name_parts[3:])
6868
ACCESS = json.loads(aws_s3.Bucket(bucket_name).Object(bucket_object).get()["Body"].read().decode("utf-8"))
@@ -80,70 +80,81 @@
8080
writer_kafka.init(logger, CONFIG)
8181
writer_postgres.init(logger)
8282

83-
8483
def get_api():
85-
return {"statusCode": 200, "body": API}
86-
84+
return {
85+
"statusCode": 200,
86+
"body": API
87+
}
8788

8889
def get_token():
8990
logger.debug("Handling GET Token")
90-
return {"statusCode": 303, "headers": {"Location": TOKEN_PROVIDER_URL}}
91-
92-
91+
return {
92+
"statusCode": 303,
93+
"headers": {"Location": TOKEN_PROVIDER_URL}
94+
}
95+
9396
def get_topics():
9497
logger.debug("Handling GET Topics")
9598
return {
9699
"statusCode": 200,
97100
"headers": {"Content-Type": "application/json"},
98-
"body": json.dumps([topicName for topicName in TOPICS]),
101+
"body": json.dumps([topicName for topicName in TOPICS])
99102
}
100-
101-
103+
102104
def get_topic_schema(topicName):
103105
logger.debug(f"Handling GET TopicSchema({topicName})")
104106
if topicName not in TOPICS:
105-
return {"statusCode": 404}
106-
107-
return {"statusCode": 200, "headers": {"Content-Type": "application/json"}, "body": json.dumps(TOPICS[topicName])}
108-
107+
return { "statusCode": 404 }
108+
109+
return {
110+
"statusCode": 200,
111+
"headers": {"Content-Type": "application/json"},
112+
"body": json.dumps(TOPICS[topicName])
113+
}
109114

110115
def post_topic_message(topicName, topicMessage, tokenEncoded):
111116
logger.debug(f"Handling POST {topicName}")
112117
try:
113118
token = jwt.decode(tokenEncoded, TOKEN_PUBLIC_KEY, algorithms=["RS256"])
114119
except Exception as e:
115-
return {"statusCode": 401, "headers": {"Content-Type": "text/plain"}, "body": str(e)}
120+
return {
121+
"statusCode": 401,
122+
"headers": {"Content-Type": "text/plain"},
123+
"body": str(e)
124+
}
116125

117126
if topicName not in TOPICS:
118-
return {"statusCode": 404}
127+
return { "statusCode": 404 }
119128

120129
user = token["sub"]
121130
if topicName not in ACCESS or user not in ACCESS[topicName]:
122-
return {"statusCode": 403}
131+
return { "statusCode": 403 }
123132

124133
try:
125134
validate(instance=topicMessage, schema=TOPICS[topicName])
126135
except ValidationError as e:
127-
return {"statusCode": 400, "headers": {"Content-Type": "text/plain"}, "body": e.message}
128-
136+
return {
137+
"statusCode": 400,
138+
"headers": {"Content-Type": "text/plain"},
139+
"body": e.message
140+
}
141+
129142
success = (
130-
writer_kafka.write(topicName, topicMessage)
131-
and writer_eventbridge.write(topicName, topicMessage)
132-
and writer_postgres.write(topicName, topicMessage)
143+
writer_kafka.write(topicName, topicMessage) and
144+
writer_eventbridge.write(topicName, topicMessage) and
145+
writer_postgres.write(topicName, topicMessage)
133146
)
134147
return {"statusCode": 202} if success else {"statusCode": 500}
135148

136-
137149
def extract_token(eventHeaders):
138150
# Initial implementation used bearer header directly
139151
if "bearer" in eventHeaders:
140152
return eventHeaders["bearer"]
141-
153+
142154
if "Authorization" in eventHeaders and eventHeaders["Authorization"].startswith("Bearer "):
143-
return eventHeaders["Authorization"][len("Bearer ") :]
144-
145-
return "" # Will result in 401
146-
155+
return eventHeaders["Authorization"][len("Bearer "):]
156+
157+
return "" # Will result in 401
147158

148159
def lambda_handler(event, context):
149160
try:
@@ -157,11 +168,7 @@ def lambda_handler(event, context):
157168
if event["httpMethod"] == "GET":
158169
return get_topic_schema(event["pathParameters"]["topic_name"].lower())
159170
if event["httpMethod"] == "POST":
160-
return post_topic_message(
161-
event["pathParameters"]["topic_name"].lower(),
162-
json.loads(event["body"]),
163-
extract_token(event["headers"]),
164-
)
171+
return post_topic_message(event["pathParameters"]["topic_name"].lower(), json.loads(event["body"]), extract_token(event["headers"]))
165172
if event["resource"].lower() == "/terminate":
166173
sys.exit("TERMINATING")
167174
return {"statusCode": 404}

src/requirements.txt

Lines changed: 0 additions & 8 deletions
This file was deleted.

src/writer_eventbridge.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,17 @@
22

33
import boto3
44

5-
65
def init(logger, CONFIG):
76
global _logger
87
global EVENT_BUS_ARN
98
global aws_eventbridge
10-
9+
1110
_logger = logger
12-
13-
aws_eventbridge = boto3.client("events")
11+
12+
aws_eventbridge = boto3.client('events')
1413
EVENT_BUS_ARN = CONFIG["event_bus_arn"] if "event_bus_arn" in CONFIG else ""
1514
_logger.debug("Initialized EVENTBRIDGE writer")
1615

17-
1816
def write(topicName, message):
1917
if not EVENT_BUS_ARN:
2018
_logger.debug("No EventBus Arn - skipping")
@@ -26,17 +24,17 @@ def write(topicName, message):
2624
Entries=[
2725
{
2826
"Source": topicName,
29-
"DetailType": "JSON",
30-
"Detail": json.dumps(message),
31-
"EventBusName": EVENT_BUS_ARN,
27+
'DetailType': 'JSON',
28+
'Detail': json.dumps(message),
29+
'EventBusName': EVENT_BUS_ARN,
3230
}
3331
]
3432
)
3533
if response["FailedEntryCount"] > 0:
3634
_logger.error(str(response))
3735
return False
3836
except Exception as e:
39-
_logger.error(f"The EventBridge writer failed with unknown error: {str(e)}")
37+
_logger.error(f'The EventBridge writer failed with unknown error: {str(e)}')
4038
return False
41-
39+
4240
return True

src/writer_kafka.py

Lines changed: 19 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,43 @@
33
import boto3
44
from confluent_kafka import Producer
55

6-
76
def init(logger, CONFIG):
87
global _logger
98
global kafka_producer
10-
9+
1110
_logger = logger
12-
11+
1312
producer_config = {"bootstrap.servers": CONFIG["kafka_bootstrap_server"]}
1413
if "kafka_sasl_kerberos_principal" in CONFIG and "kafka_ssl_key_path" in CONFIG:
15-
producer_config.update(
16-
{
17-
"security.protocol": "SASL_SSL",
18-
"sasl.mechanism": "GSSAPI",
19-
"sasl.kerberos.service.name": "kafka",
20-
"sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"],
21-
"sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"],
22-
"ssl.ca.location": CONFIG["kafka_ssl_ca_path"],
23-
"ssl.certificate.location": CONFIG["kafka_ssl_cert_path"],
24-
"ssl.key.location": CONFIG["kafka_ssl_key_path"],
25-
"ssl.key.password": CONFIG["kafka_ssl_key_password"],
26-
}
27-
)
14+
producer_config.update({
15+
"security.protocol": "SASL_SSL",
16+
"sasl.mechanism": "GSSAPI",
17+
"sasl.kerberos.service.name": "kafka",
18+
"sasl.kerberos.keytab": CONFIG["kafka_sasl_kerberos_keytab_path"],
19+
"sasl.kerberos.principal": CONFIG["kafka_sasl_kerberos_principal"],
20+
"ssl.ca.location": CONFIG["kafka_ssl_ca_path"],
21+
"ssl.certificate.location": CONFIG["kafka_ssl_cert_path"],
22+
"ssl.key.location": CONFIG["kafka_ssl_key_path"],
23+
"ssl.key.password": CONFIG["kafka_ssl_key_password"]
24+
})
2825
_logger.debug("producer will use SASL_SSL")
2926
kafka_producer = Producer(producer_config)
3027
_logger.debug("Initialized KAFKA writer")
3128

32-
3329
def write(topicName, message):
3430
try:
3531
_logger.debug(f"Sending to kafka {topicName}")
3632
error = []
37-
kafka_producer.produce(
38-
topicName,
39-
key="",
40-
value=json.dumps(message).encode("utf-8"),
41-
callback=lambda err, msg: error.append(err) if err is not None else None,
42-
)
33+
kafka_producer.produce(topicName,
34+
key="",
35+
value=json.dumps(message).encode("utf-8"),
36+
callback = lambda err, msg: error.append(err) if err is not None else None)
4337
kafka_producer.flush()
4438
if error:
4539
_logger.error(str(error))
4640
return False
4741
except Exception as e:
48-
_logger.error(f"The Kafka writer failed with unknown error: {str(e)}")
42+
_logger.error(f'The Kafka writer failed with unknown error: {str(e)}')
4943
return False
50-
44+
5145
return True

0 commit comments

Comments
 (0)