Skip to content

Commit 1d8a832

Browse files
authored
[BUG] AWS DynamoDB Stream Event not emitting all events (#16109)
1 parent 5654d23 commit 1d8a832

File tree

3 files changed

+73
-35
lines changed

3 files changed

+73
-35
lines changed

components/aws/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@pipedream/aws",
3-
"version": "0.7.7",
3+
"version": "0.7.8",
44
"description": "Pipedream Aws Components",
55
"main": "aws.app.mjs",
66
"keywords": [

components/aws/sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs

Lines changed: 70 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ export default {
66
name: "New DynamoDB Stream Event",
77
description: "Emit new event when a DynamoDB stream receives new events. [See the docs here](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)",
88
type: "source",
9-
version: "0.0.4",
9+
version: "0.0.5",
1010
dedupe: "unique",
1111
props: {
1212
...common.props,
@@ -36,65 +36,105 @@ export default {
3636
},
3737
hooks: {
3838
async deploy() {
39-
await this._getNewShardIterator();
39+
const {
40+
stream,
41+
_getShardIterators,
42+
_setShardIterators,
43+
listShards,
44+
getShardIterator,
45+
} = this;
46+
47+
const shardIterators = _getShardIterators();
48+
49+
const { StreamDescription: streamDescription } = await listShards({
50+
StreamArn: stream,
51+
Limit: 100,
52+
});
53+
54+
if (streamDescription?.Shards?.length === 0) {
55+
throw new Error("No shards found in stream");
56+
}
57+
58+
const activeShards =
59+
streamDescription.Shards
60+
.filter((shard) => !shard.SequenceNumberRange.EndingSequenceNumber);
61+
62+
if (activeShards.length === 0) {
63+
throw new Error("No active shards found");
64+
}
65+
66+
const shardIds = activeShards.map(({ ShardId }) => ShardId);
67+
68+
for (const shardId of shardIds) {
69+
const { ShardIterator: shardIterator } = await getShardIterator({
70+
ShardId: shardId,
71+
StreamArn: stream,
72+
ShardIteratorType: "LATEST",
73+
});
74+
shardIterators[shardId] = shardIterator;
75+
}
76+
77+
_setShardIterators(shardIterators);
4078
},
4179
},
4280
methods: {
4381
...common.methods,
44-
async _getNewShardIterator() {
45-
const { StreamDescription: streamDescription } = await this.listShards({
46-
StreamArn: this.stream,
47-
});
48-
const shardId = streamDescription.Shards[streamDescription.Shards.length - 1].ShardId;
49-
const { ShardIterator: shardIterator } = await this.getShardIterator({
50-
ShardId: shardId,
51-
StreamArn: this.stream,
52-
ShardIteratorType: "LATEST",
53-
});
54-
this._setShardIterator(shardIterator);
55-
return shardIterator;
82+
_getShardIterators() {
83+
return this.db.get("shardIterators") || {};
5684
},
57-
_getShardIterator() {
58-
return this.db.get("shardIterator");
59-
},
60-
_setShardIterator(shardIterator) {
61-
this.db.set("shardIterator", shardIterator);
85+
_setShardIterators(value) {
86+
this.db.set("shardIterators", value);
6287
},
6388
generateMeta({
6489
eventID, eventName, dynamodb,
6590
}) {
6691
return {
6792
id: eventID,
68-
summary: `New ${eventName} event`,
93+
summary: `New Event: ${eventName}`,
6994
ts: Date.parse(dynamodb.ApproximateCreationDateTime),
7095
};
7196
},
7297
},
7398
async run() {
74-
if (!(await this.isStreamEnabled(this.stream))) {
99+
const {
100+
stream,
101+
isStreamEnabled,
102+
_getShardIterators,
103+
_setShardIterators,
104+
getRecords,
105+
generateMeta,
106+
} = this;
107+
108+
if (!(await isStreamEnabled(stream))) {
75109
throw new Error("Stream is no longer enabled.");
76110
}
77111

78-
const shardIterator = this._getShardIterator();
112+
const shardIterators = _getShardIterators();
113+
114+
for (const [
115+
shardId,
116+
shardIterator,
117+
] of Object.entries(shardIterators)) {
118+
if (!shardIterator) {
119+
continue;
120+
}
79121

80-
try {
81122
const {
82123
Records: records,
83124
NextShardIterator: nextShardIterator,
84-
} = await this.getRecords({
125+
} = await getRecords({
85126
ShardIterator: shardIterator,
127+
Limit: 100,
86128
});
87129

88130
for (const record of records) {
89-
const meta = this.generateMeta(record);
131+
const meta = generateMeta(record);
90132
this.$emit(record, meta);
91133
}
92134

93-
this._setShardIterator(nextShardIterator);
94-
} catch (e) {
95-
console.log("Error getting records", e);
96-
console.log("Retrieving a new shard iterator");
97-
await this._getNewShardIterator();
135+
shardIterators[shardId] = nextShardIterator;
98136
}
137+
138+
_setShardIterators(shardIterators);
99139
},
100140
};

pnpm-lock.yaml

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)