diff --git a/components/aws/package.json b/components/aws/package.json index c30db608e61d2..d877e074bddb6 100644 --- a/components/aws/package.json +++ b/components/aws/package.json @@ -1,6 +1,6 @@ { "name": "@pipedream/aws", - "version": "0.7.7", + "version": "0.7.8", "description": "Pipedream Aws Components", "main": "aws.app.mjs", "keywords": [ diff --git a/components/aws/sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs b/components/aws/sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs index 6f40b856b69cc..f91f5685f2726 100644 --- a/components/aws/sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs +++ b/components/aws/sources/new-dynamodb-stream-event/new-dynamodb-stream-event.mjs @@ -6,7 +6,7 @@ export default { name: "New DynamoDB Stream Event", description: "Emit new event when a DynamoDB stream receives new events. [See the docs here](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)", type: "source", - version: "0.0.4", + version: "0.0.5", dedupe: "unique", props: { ...common.props, @@ -36,65 +36,105 @@ export default { }, hooks: { async deploy() { - await this._getNewShardIterator(); + const { + stream, + _getShardIterators, + _setShardIterators, + listShards, + getShardIterator, + } = this; + + const shardIterators = _getShardIterators(); + + const { StreamDescription: streamDescription } = await listShards({ + StreamArn: stream, + Limit: 100, + }); + + if (streamDescription?.Shards?.length === 0) { + throw new Error("No shards found in stream"); + } + + const activeShards = + streamDescription.Shards + .filter((shard) => !shard.SequenceNumberRange.EndingSequenceNumber); + + if (activeShards.length === 0) { + throw new Error("No active shards found"); + } + + const shardIds = activeShards.map(({ ShardId }) => ShardId); + + for (const shardId of shardIds) { + const { ShardIterator: shardIterator } = await getShardIterator({ + ShardId: shardId, + StreamArn: stream, + ShardIteratorType: "LATEST", + }); + shardIterators[shardId] = shardIterator; + } + + _setShardIterators(shardIterators); }, }, methods: { ...common.methods, - async _getNewShardIterator() { - const { StreamDescription: streamDescription } = await this.listShards({ - StreamArn: this.stream, - }); - const shardId = streamDescription.Shards[streamDescription.Shards.length - 1].ShardId; - const { ShardIterator: shardIterator } = await this.getShardIterator({ - ShardId: shardId, - StreamArn: this.stream, - ShardIteratorType: "LATEST", - }); - this._setShardIterator(shardIterator); - return shardIterator; + _getShardIterators() { + return this.db.get("shardIterators") || {}; }, - _getShardIterator() { - return this.db.get("shardIterator"); - }, - _setShardIterator(shardIterator) { - this.db.set("shardIterator", shardIterator); + _setShardIterators(value) { + this.db.set("shardIterators", value); }, generateMeta({ eventID, eventName, dynamodb, }) { return { id: eventID, - summary: `New ${eventName} event`, + summary: `New Event: ${eventName}`, ts: Date.parse(dynamodb.ApproximateCreationDateTime), }; }, }, async run() { - if (!(await this.isStreamEnabled(this.stream))) { + const { + stream, + isStreamEnabled, + _getShardIterators, + _setShardIterators, + getRecords, + generateMeta, + } = this; + + if (!(await isStreamEnabled(stream))) { throw new Error("Stream is no longer enabled."); } - const shardIterator = this._getShardIterator(); + const shardIterators = _getShardIterators(); + + for (const [ + shardId, + shardIterator, + ] of Object.entries(shardIterators)) { + if (!shardIterator) { + continue; + } - try { const { Records: records, NextShardIterator: nextShardIterator, - } = await this.getRecords({ + } = await getRecords({ ShardIterator: shardIterator, + Limit: 100, }); for (const record of records) { - const meta = this.generateMeta(record); + const meta = generateMeta(record); this.$emit(record, meta); } - this._setShardIterator(nextShardIterator); - } catch (e) { - console.log("Error getting records", e); - console.log("Retrieving a new shard iterator"); - await this._getNewShardIterator(); + shardIterators[shardId] = nextShardIterator; } + + _setShardIterators(shardIterators); }, }; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 73e876321fefd..88e831b83f62f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -883,8 +883,7 @@ importers: specifier: ^1.1.1 version: 1.6.6 - components/apiverve: - specifiers: {} + components/apiverve: {} components/apollo_io: dependencies: @@ -7473,8 +7472,7 @@ importers: specifier: ^1.6.0 version: 1.6.6 - components/lucca: - specifiers: {} + components/lucca: {} components/lucid: dependencies: @@ -8072,8 +8070,7 @@ importers: specifier: ^1.4.1 version: 1.6.6 - components/mistral_ai: - specifiers: {} + components/mistral_ai: {} components/mitra: {} @@ -9299,8 +9296,7 @@ importers: specifier: ^2.0.0 version: 2.0.0 - components/pdf4me: - specifiers: {} + components/pdf4me: {} components/pdf_api_io: {} @@ -10069,8 +10065,7 @@ importers: components/proprofs_quiz_maker: {} - components/provesource: - specifiers: {} + components/provesource: {} components/proworkflow: dependencies: @@ -13688,8 +13683,7 @@ importers: specifier: 3.2.2 version: 3.2.2 - components/verifiedemail: - specifiers: {} + components/verifiedemail: {} components/verifone: {} @@ -14370,8 +14364,7 @@ importers: specifier: ^0.1.4 version: 0.1.6 - components/yepcode: - specifiers: {} + components/yepcode: {} components/yespo: dependencies: