Skip to content

Commit 11c76be

Browse files
committed
update
1 parent 5e94735 commit 11c76be

File tree

3 files changed

+250
-105
lines changed

3 files changed

+250
-105
lines changed

src/data-archive/main.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,21 @@
44
import logging
55
from typing import Any, Callable, Dict
66
from datetime import datetime, timezone
7+
from boto3.dynamodb.types import TypeDeserializer
8+
import decimal
79

810
logger = logging.getLogger()
911
logger.setLevel(logging.INFO)
1012
firehose_client = boto3.client("firehose")
11-
deserializer = boto3.dynamodb.types.TypeDeserializer()
13+
deserializer = TypeDeserializer()
14+
15+
16+
class DecimalEncoder(json.JSONEncoder):
17+
def default(self, o):
18+
if isinstance(o, decimal.Decimal):
19+
return str(o)
20+
return super().default(o)
21+
1222

1323
try:
1424
FIREHOSE_STREAM_NAME = os.environ["FIREHOSE_STREAM_NAME"]
@@ -78,7 +88,7 @@ def lambda_handler(event, context):
7888
)
7989

8090
firehose_records_to_send.append(
81-
{"Data": json.dumps(payload).encode("utf-8")}
91+
{"Data": json.dumps(payload, cls=DecimalEncoder).encode("utf-8")}
8292
)
8393

8494
# 6. **Send Records to Firehose**: If we found any TTL-expired records, send them.

terraform/modules/archival/main.tf

Lines changed: 75 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -91,71 +91,6 @@ resource "aws_iam_policy" "archive_lambda_policy" {
9191
})
9292
}
9393

94-
resource "aws_iam_role_policy_attachment" "archive_lambda_policy_attach" {
95-
role = aws_iam_role.archive_role.name
96-
policy_arn = aws_iam_policy.archive_lambda_policy.arn
97-
}
98-
99-
resource "aws_iam_policy" "archive_policy" {
100-
name = "${local.archive_lambda_name}-ddb-stream-policy"
101-
102-
policy = jsonencode({
103-
Version = "2012-10-17"
104-
Statement = concat(
105-
[
106-
for table in var.MonitorTables : {
107-
Effect = "Allow"
108-
Action = [
109-
"dynamodb:DescribeStream",
110-
"dynamodb:GetRecords",
111-
"dynamodb:GetShardIterator",
112-
"dynamodb:ListStreams"
113-
]
114-
Resource = "arn:aws:dynamodb:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/${table}/stream/*"
115-
}
116-
],
117-
[
118-
{
119-
Effect = "Allow"
120-
Action = [
121-
"s3:AbortMultipartUpload",
122-
"s3:GetBucketLocation",
123-
"s3:GetObject",
124-
"s3:ListBucket",
125-
"s3:ListBucketMultipartUploads",
126-
"s3:PutObject"
127-
]
128-
Resource = ["arn:aws:s3:::${aws_s3_bucket.this.id}/*", "arn:aws:s3:::${aws_s3_bucket.this.id}"]
129-
},
130-
]
131-
)
132-
})
133-
}
134-
135-
resource "aws_iam_role_policy_attachment" "archive_attach" {
136-
role = aws_iam_role.archive_role.name
137-
policy_arn = aws_iam_policy.archive_policy.arn
138-
}
139-
140-
resource "aws_lambda_function" "api_lambda" {
141-
depends_on = [aws_cloudwatch_log_group.archive_logs]
142-
function_name = local.archive_lambda_name
143-
role = aws_iam_role.archive_role.arn
144-
architectures = ["arm64"]
145-
handler = "lambda.handler"
146-
runtime = "python3.13"
147-
filename = data.archive_file.api_lambda_code.output_path
148-
timeout = 90
149-
memory_size = 512
150-
source_code_hash = data.archive_file.api_lambda_code.output_sha256
151-
description = "DynamoDB stream reader to archive data."
152-
environment {
153-
variables = {
154-
"RunEnvironment" = var.RunEnvironment
155-
"FIREHOSE_STREAM_NAME" = local.firehose_stream_name
156-
}
157-
}
158-
}
15994

16095
data "aws_dynamodb_table" "existing_tables" {
16196
for_each = toset(var.MonitorTables)
@@ -262,7 +197,7 @@ resource "aws_kinesis_firehose_delivery_stream" "dynamic_stream" {
262197
type = "MetadataExtraction"
263198
parameters {
264199
parameter_name = "MetadataExtractionQuery"
265-
parameter_value = "{table:.table, event_ts:.timestamp}"
200+
parameter_value = "{table: .table, year: (.timestamp | strftime(\"%Y\")), month: (.timestamp | strftime(\"%m\")), day: (.timestamp | strftime(\"%d\"))}"
266201
}
267202
parameters {
268203
parameter_name = "JsonParsingEngine"
@@ -271,7 +206,80 @@ resource "aws_kinesis_firehose_delivery_stream" "dynamic_stream" {
271206
}
272207
}
273208

274-
prefix = "table=!{partitionKeyFromQuery:table}/year=!{partitionKeyFromQueryAsTimestamp:event_ts:yyyy}/month=!{partitionKeyFromQueryAsTimestamp:event_ts:MM}/day=!{partitionKeyFromQueryAsTimestamp:event_ts:dd}/"
209+
prefix = "table=!{partitionKeyFromQuery:table}/year=!{partitionKeyFromQuery:year}/month=!{partitionKeyFromQuery:month}/day=!{partitionKeyFromQuery:day}/"
275210
error_output_prefix = "firehose-errors/!{firehose:error-output-type}/!{timestamp:yyyy/MM/dd}/"
276211
}
277212
}
213+
214+
resource "aws_iam_role_policy_attachment" "archive_lambda_policy_attach" {
215+
role = aws_iam_role.archive_role.name
216+
policy_arn = aws_iam_policy.archive_lambda_policy.arn
217+
}
218+
219+
resource "aws_iam_policy" "archive_policy" {
220+
name = "${local.archive_lambda_name}-ddb-stream-policy"
221+
222+
policy = jsonencode({
223+
Version = "2012-10-17"
224+
Statement = concat(
225+
[
226+
for table in var.MonitorTables : {
227+
Effect = "Allow"
228+
Action = [
229+
"dynamodb:DescribeStream",
230+
"dynamodb:GetRecords",
231+
"dynamodb:GetShardIterator",
232+
"dynamodb:ListStreams"
233+
]
234+
Resource = "arn:aws:dynamodb:${data.aws_region.current.name}:${data.aws_caller_identity.current.account_id}:table/${table}/stream/*"
235+
}
236+
],
237+
[
238+
{
239+
Effect = "Allow"
240+
Action = [
241+
"s3:AbortMultipartUpload",
242+
"s3:GetBucketLocation",
243+
"s3:GetObject",
244+
"s3:ListBucket",
245+
"s3:ListBucketMultipartUploads",
246+
"s3:PutObject"
247+
]
248+
Resource = ["arn:aws:s3:::${aws_s3_bucket.this.id}/*", "arn:aws:s3:::${aws_s3_bucket.this.id}"]
249+
},
250+
{
251+
Effect = "Allow"
252+
Action = [
253+
"firehose:PutRecordBatch",
254+
]
255+
Resource = [aws_kinesis_firehose_delivery_stream.dynamic_stream.arn]
256+
},
257+
]
258+
)
259+
})
260+
}
261+
262+
resource "aws_iam_role_policy_attachment" "archive_attach" {
263+
role = aws_iam_role.archive_role.name
264+
policy_arn = aws_iam_policy.archive_policy.arn
265+
}
266+
267+
resource "aws_lambda_function" "api_lambda" {
268+
depends_on = [aws_cloudwatch_log_group.archive_logs]
269+
function_name = local.archive_lambda_name
270+
role = aws_iam_role.archive_role.arn
271+
architectures = ["arm64"]
272+
handler = "main.lambda_handler"
273+
runtime = "python3.13"
274+
filename = data.archive_file.api_lambda_code.output_path
275+
timeout = 90
276+
memory_size = 512
277+
source_code_hash = data.archive_file.api_lambda_code.output_sha256
278+
description = "DynamoDB stream reader to archive data."
279+
environment {
280+
variables = {
281+
"RunEnvironment" = var.RunEnvironment
282+
"FIREHOSE_STREAM_NAME" = local.firehose_stream_name
283+
}
284+
}
285+
}

0 commit comments

Comments
 (0)