Skip to content

Commit 77c9cfd

Browse files
committed
init: ingestion logging
1 parent 3c684b7 commit 77c9cfd

File tree

3 files changed

+46
-1
lines changed

3 files changed

+46
-1
lines changed

recordprocessor/src/batch_processor.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,14 @@
88
from send_to_kinesis import send_to_kinesis
99
from clients import logger
1010
from file_level_validation import file_level_validation
11+
from logging_decorator import ingestion_logging_decorator
1112
from errors import NoOperationPermissions, InvalidHeaders
1213

1314

15+
@ingestion_logging_decorator
16+
def ingestion_progress(finished: bool, message_body: dict, row_count: int = 0, start_time: float = 0.0) -> str:
17+
return("Ingestion finished" if finished else "Ingestion started")
18+
1419
def process_csv_to_fhir(incoming_message_body: dict) -> None:
1520
"""
1621
For each row of the csv, attempts to transform into FHIR format, sends a message to kinesis,
@@ -31,8 +36,12 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
3136
csv_reader = interim_message_body.get("csv_dict_reader")
3237

3338
target_disease = map_target_disease(vaccine)
34-
3539
row_count = 0
40+
ingestion_start_time = time.time()
41+
logger.info(ingestion_progress(
42+
finished=False,
43+
message_body=interim_message_body))
44+
3645
for row in csv_reader:
3746
row_count += 1
3847
row_id = f"{file_id}^{row_count}"
@@ -55,6 +64,12 @@ def process_csv_to_fhir(incoming_message_body: dict) -> None:
5564

5665
logger.info("Total rows processed: %s", row_count)
5766

67+
logger.info(ingestion_progress(
68+
finished=True,
69+
message_body=interim_message_body,
70+
row_count=row_count,
71+
start_time=ingestion_start_time))
72+
5873

5974
def main(event: str) -> None:
6075
"""Process each row of the file"""

recordprocessor/src/logging_decorator.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,3 +69,31 @@ def wrapper(*args, **kwargs):
6969
raise
7070

7171
return wrapper
72+
73+
def ingestion_logging_decorator(func):
74+
"""
75+
Sends the appropriate logs to Cloudwatch and Firehose based on ingestion started/finished.
76+
"""
77+
78+
@wraps(func)
79+
def wrapper(*args, **kwargs):
80+
incoming_message_body = kwargs.get("message_body")
81+
base_log_data = {
82+
"function_name": f"record_processor_{func.__name__}",
83+
"date_time": str(datetime.now()),
84+
"file_key": incoming_message_body.get("file_key"),
85+
"message_id": incoming_message_body.get("message_id"),
86+
"vaccine_type": incoming_message_body.get("vaccine"),
87+
"supplier": incoming_message_body.get("supplier"),
88+
}
89+
finished = kwargs.get("finished")
90+
91+
message = func(*args, **kwargs)
92+
additional_log_data = {"statusCode": 200, "message": message}
93+
if finished:
94+
additional_log_data["row_count"] = kwargs.get("row_count")
95+
start_time = kwargs.get("start_time") if finished else time.time()
96+
generate_and_send_logs(start_time, base_log_data, additional_log_data=additional_log_data)
97+
return message
98+
99+
return wrapper

recordprocessor/tests/test_logging_decorator.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,3 +253,5 @@ def test_splunk_logger_unhandled_failure(self):
253253
mock_firehose_client.put_record.assert_called_once_with(
254254
DeliveryStreamName=Firehose.STREAM_NAME, Record=expected_firehose_record
255255
)
256+
257+
# TODO: unit tests for ingestion decorator

0 commit comments

Comments
 (0)