Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion components/aws/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@pipedream/aws",
"version": "0.7.7",
"version": "0.7.8",
"description": "Pipedream Aws Components",
"main": "aws.app.mjs",
"keywords": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export default {
name: "New DynamoDB Stream Event",
description: "Emit new event when a DynamoDB stream receives new events. [See the docs here](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)",
type: "source",
version: "0.0.4",
version: "0.0.5",
dedupe: "unique",
props: {
...common.props,
Expand Down Expand Up @@ -36,65 +36,105 @@ export default {
},
hooks: {
async deploy() {
await this._getNewShardIterator();
const {
stream,
_getShardIterators,
_setShardIterators,
listShards,
getShardIterator,
} = this;

const shardIterators = _getShardIterators();

const { StreamDescription: streamDescription } = await listShards({
StreamArn: stream,
Limit: 100,
});

if (streamDescription?.Shards?.length === 0) {
throw new Error("No shards found in stream");
}

const activeShards =
streamDescription.Shards
.filter((shard) => !shard.SequenceNumberRange.EndingSequenceNumber);

if (activeShards.length === 0) {
throw new Error("No active shards found");
}

const shardIds = activeShards.map(({ ShardId }) => ShardId);

for (const shardId of shardIds) {
const { ShardIterator: shardIterator } = await getShardIterator({
ShardId: shardId,
StreamArn: stream,
ShardIteratorType: "LATEST",
});
shardIterators[shardId] = shardIterator;
}

_setShardIterators(shardIterators);
},
},
methods: {
...common.methods,
async _getNewShardIterator() {
const { StreamDescription: streamDescription } = await this.listShards({
StreamArn: this.stream,
});
const shardId = streamDescription.Shards[streamDescription.Shards.length - 1].ShardId;
const { ShardIterator: shardIterator } = await this.getShardIterator({
ShardId: shardId,
StreamArn: this.stream,
ShardIteratorType: "LATEST",
});
this._setShardIterator(shardIterator);
return shardIterator;
_getShardIterators() {
return this.db.get("shardIterators") || {};
},
_getShardIterator() {
return this.db.get("shardIterator");
},
_setShardIterator(shardIterator) {
this.db.set("shardIterator", shardIterator);
_setShardIterators(value) {
this.db.set("shardIterators", value);
},
generateMeta({
eventID, eventName, dynamodb,
}) {
return {
id: eventID,
summary: `New ${eventName} event`,
summary: `New Event: ${eventName}`,
ts: Date.parse(dynamodb.ApproximateCreationDateTime),
};
},
},
async run() {
if (!(await this.isStreamEnabled(this.stream))) {
const {
stream,
isStreamEnabled,
_getShardIterators,
_setShardIterators,
getRecords,
generateMeta,
} = this;

if (!(await isStreamEnabled(stream))) {
throw new Error("Stream is no longer enabled.");
}

const shardIterator = this._getShardIterator();
const shardIterators = _getShardIterators();

for (const [
shardId,
shardIterator,
] of Object.entries(shardIterators)) {
if (!shardIterator) {
continue;
}

try {
const {
Records: records,
NextShardIterator: nextShardIterator,
} = await this.getRecords({
} = await getRecords({
ShardIterator: shardIterator,
Limit: 100,
});

for (const record of records) {
const meta = this.generateMeta(record);
const meta = generateMeta(record);
this.$emit(record, meta);
}

this._setShardIterator(nextShardIterator);
} catch (e) {
console.log("Error getting records", e);
console.log("Retrieving a new shard iterator");
await this._getNewShardIterator();
shardIterators[shardId] = nextShardIterator;
}

_setShardIterators(shardIterators);
},
};
21 changes: 7 additions & 14 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading