Skip to content

Commit b018614

Browse files
committed
add properties for tumbling window to dynamodb stream
1 parent a0adaee commit b018614

File tree

3 files changed

+175
-0
lines changed

3 files changed

+175
-0
lines changed

aws_lambda_powertools/utilities/data_classes/dynamo_db_stream_event.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,12 +140,25 @@ def user_identity(self) -> dict:
140140
return self.get("userIdentity") or {}
141141

142142

143+
class KinesisStreamWindow(DictWrapper):
144+
@property
145+
def start(self) -> str:
146+
"""The time window started"""
147+
return self["start"]
148+
149+
@property
150+
def end(self) -> str:
151+
"""The time window will end"""
152+
return self["end"]
153+
154+
143155
class DynamoDBStreamEvent(DictWrapper):
144156
"""Dynamo DB Stream Event
145157
146158
Documentation:
147159
-------------
148160
- https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html
161+
- https://docs.aws.amazon.com/lambda/latest/dg/services-ddb-windows.html
149162
150163
Example
151164
-------
@@ -167,3 +180,30 @@ def lambda_handler(event: DynamoDBStreamEvent, context: LambdaContext):
167180
def records(self) -> Iterator[DynamoDBRecord]:
168181
for record in self["Records"]:
169182
yield DynamoDBRecord(record)
183+
184+
@property
185+
def window(self) -> KinesisStreamWindow | None:
186+
window = self.get("window")
187+
if window:
188+
return KinesisStreamWindow(window)
189+
return window
190+
191+
@property
192+
def state(self) -> dict[str, Any]:
193+
return self.get("state") or {}
194+
195+
@property
196+
def shard_id(self) -> str | None:
197+
return self.get("shardId")
198+
199+
@property
200+
def event_source_arn(self) -> str | None:
201+
return self.get("eventSourceARN")
202+
203+
@property
204+
def is_final_invoke_for_window(self) -> bool | None:
205+
return self.get("isFinalInvokeForWindow")
206+
207+
@property
208+
def is_window_terminated_early(self) -> bool | None:
209+
return self.get("isWindowTerminatedEarly")
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
{
2+
"Records": [
3+
{
4+
"eventID": "1",
5+
"eventName": "INSERT",
6+
"eventVersion": "1.0",
7+
"eventSource": "aws:dynamodb",
8+
"awsRegion": "us-east-1",
9+
"dynamodb": {
10+
"Keys": {
11+
"Id": {
12+
"N": "101"
13+
}
14+
},
15+
"NewImage": {
16+
"Message": {
17+
"S": "New item!"
18+
},
19+
"Id": {
20+
"N": "101"
21+
}
22+
},
23+
"SequenceNumber": "111",
24+
"SizeBytes": 26,
25+
"StreamViewType": "NEW_AND_OLD_IMAGES"
26+
},
27+
"eventSourceARN": "stream-ARN"
28+
},
29+
{
30+
"eventID": "2",
31+
"eventName": "MODIFY",
32+
"eventVersion": "1.0",
33+
"eventSource": "aws:dynamodb",
34+
"awsRegion": "us-east-1",
35+
"dynamodb": {
36+
"Keys": {
37+
"Id": {
38+
"N": "101"
39+
}
40+
},
41+
"NewImage": {
42+
"Message": {
43+
"S": "This item has changed"
44+
},
45+
"Id": {
46+
"N": "101"
47+
}
48+
},
49+
"OldImage": {
50+
"Message": {
51+
"S": "New item!"
52+
},
53+
"Id": {
54+
"N": "101"
55+
}
56+
},
57+
"SequenceNumber": "222",
58+
"SizeBytes": 59,
59+
"StreamViewType": "NEW_AND_OLD_IMAGES"
60+
},
61+
"eventSourceARN": "stream-ARN"
62+
},
63+
{
64+
"eventID": "3",
65+
"eventName": "REMOVE",
66+
"eventVersion": "1.0",
67+
"eventSource": "aws:dynamodb",
68+
"awsRegion": "us-east-1",
69+
"dynamodb": {
70+
"Keys": {
71+
"Id": {
72+
"N": "101"
73+
}
74+
},
75+
"OldImage": {
76+
"Message": {
77+
"S": "This item has changed"
78+
},
79+
"Id": {
80+
"N": "101"
81+
}
82+
},
83+
"SequenceNumber": "333",
84+
"SizeBytes": 38,
85+
"StreamViewType": "NEW_AND_OLD_IMAGES"
86+
},
87+
"eventSourceARN": "stream-ARN"
88+
}
89+
],
90+
"window": {
91+
"start": "2020-07-30T17:00:00Z",
92+
"end": "2020-07-30T17:05:00Z"
93+
},
94+
"state": {
95+
"1": "state1"
96+
},
97+
"shardId": "shard123456789",
98+
"eventSourceARN": "stream-ARN",
99+
"isFinalInvokeForWindow": false,
100+
"isWindowTerminatedEarly": false
101+
}

tests/unit/data_classes/required_dependencies/test_dynamo_db_stream_event.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,40 @@ def test_dynamodb_stream_trigger_event():
4646
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES
4747

4848

49+
def test_dynamodb_stream_trigger_with_tumbling_window_event():
50+
raw_event = load_event("dynamoStreamTumblingWindowEvent.json")
51+
parsed_event = DynamoDBStreamEvent(raw_event)
52+
53+
records = list(parsed_event.records)
54+
55+
record = records[0]
56+
record_raw = raw_event["Records"][0]
57+
assert record.aws_region == record_raw["awsRegion"]
58+
assert record.event_id == record_raw["eventID"]
59+
assert record.event_name is DynamoDBRecordEventName.INSERT
60+
assert record.event_source == record_raw["eventSource"]
61+
assert record.event_source_arn == record_raw["eventSourceARN"]
62+
assert record.event_version == record_raw["eventVersion"]
63+
assert record.user_identity == {}
64+
dynamodb = record.dynamodb
65+
assert dynamodb is not None
66+
keys = dynamodb.keys
67+
assert keys is not None
68+
assert keys["Id"] == DECIMAL_CONTEXT.create_decimal(101)
69+
assert dynamodb.new_image.get("Message") == record_raw["dynamodb"]["NewImage"]["Message"]["S"]
70+
assert dynamodb.old_image == {}
71+
assert dynamodb.sequence_number == record_raw["dynamodb"]["SequenceNumber"]
72+
assert dynamodb.size_bytes == record_raw["dynamodb"]["SizeBytes"]
73+
assert dynamodb.stream_view_type == StreamViewType.NEW_AND_OLD_IMAGES
74+
75+
assert parsed_event.window.raw_event == raw_event["window"]
76+
assert parsed_event.state == raw_event["state"]
77+
assert parsed_event.shard_id == raw_event["shardId"]
78+
assert parsed_event.event_source_arn == raw_event["eventSourceARN"]
79+
assert parsed_event.is_final_invoke_for_window == raw_event["isFinalInvokeForWindow"]
80+
assert parsed_event.is_window_terminated_early == raw_event["isWindowTerminatedEarly"]
81+
82+
4983
def test_dynamodb_stream_record_deserialization_large_int():
5084
data = {
5185
"Keys": {"key1": {"attr1": "value1"}},

0 commit comments

Comments
 (0)