Skip to content

Commit 4a3ce09

Browse files
committed
send it
1 parent f160c08 commit 4a3ce09

File tree

2 files changed

+26
-8
lines changed

2 files changed

+26
-8
lines changed

src/data-archive/main.py

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
import os
33
import json
44
import logging
5+
from typing import Any, Callable, Dict
6+
from datetime import datetime
57

68
logger = logging.getLogger()
79
logger.setLevel(logging.INFO)
@@ -15,6 +17,12 @@
1517
logger.error("The 'FIREHOSE_STREAM_NAME' environment variable is not set.")
1618
raise
1719

20+
TimestampMapper = Dict[str, Callable[[Dict[str, Any]], str]]
21+
22+
ARCHIVE_TIMESTAMP_MAPPER: TimestampMapper = {
23+
"infra-core-api-room-requests-status": lambda x: x["createdAt#status"].split("#")[0]
24+
}
25+
1826

1927
def deserialize_dynamodb_item(item):
2028
"""
@@ -53,12 +61,22 @@ def lambda_handler(event, context):
5361

5462
deserialized_data = deserialize_dynamodb_item(old_image)
5563

56-
# 4. **Construct the Payload**: Create the specified {'table': ..., 'data': ...}
57-
# payload that will be sent to Firehose.
58-
payload = {"table": table_name, "data": deserialized_data}
64+
# 4. Construct the Payload
65+
payload = {
66+
"table": table_name,
67+
"data": deserialized_data,
68+
"timestamp": datetime.now().isoformat(),
69+
}
70+
if table_name in ARCHIVE_TIMESTAMP_MAPPER:
71+
try:
72+
payload["timestamp"] = ARCHIVE_TIMESTAMP_MAPPER[table_name](
73+
deserialized_data
74+
)
75+
except Exception as e:
76+
logger.error(
77+
f"Failed to extract timestamp for record from {table_name}: {str(e)}. Using now as timestamp."
78+
)
5979

60-
# 5. **Format for Firehose**: The PutRecordBatch API expects each record
61-
# to have a 'Data' key with a byte-encoded string value.
6280
firehose_records_to_send.append(
6381
{"Data": json.dumps(payload).encode("utf-8")}
6482
)

terraform/modules/archival/main.tf

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,7 +249,7 @@ resource "aws_kinesis_firehose_delivery_stream" "dynamic_stream" {
249249
bucket_arn = aws_s3_bucket.this.arn
250250
role_arn = aws_iam_role.firehose_role.arn
251251
buffering_interval = 60
252-
buffering_size = 10
252+
buffering_size = 64
253253
compression_format = "GZIP"
254254

255255
dynamic_partitioning_configuration {
@@ -262,7 +262,7 @@ resource "aws_kinesis_firehose_delivery_stream" "dynamic_stream" {
262262
type = "MetadataExtraction"
263263
parameters {
264264
parameter_name = "MetadataExtractionQuery"
265-
parameter_value = "{table:.table}"
265+
parameter_value = "{table:.table, event_ts:.timestamp}"
266266
}
267267
parameters {
268268
parameter_name = "JsonParsingEngine"
@@ -271,7 +271,7 @@ resource "aws_kinesis_firehose_delivery_stream" "dynamic_stream" {
271271
}
272272
}
273273

274-
prefix = "table=!{partitionKeyFromQuery:table}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
274+
prefix = "table=!{partitionKeyFromQuery:table}/year=!{partitionKeyFromQueryAsTimestamp:event_ts:yyyy}/month=!{partitionKeyFromQueryAsTimestamp:event_ts:MM}/day=!{partitionKeyFromQueryAsTimestamp:event_ts:dd}/"
275275
error_output_prefix = "firehose-errors/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/"
276276
}
277277
}

0 commit comments

Comments
 (0)