@@ -6,7 +6,7 @@ export default {
66 name : "New DynamoDB Stream Event" ,
77 description : "Emit new event when a DynamoDB stream receives new events. [See the docs here](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)" ,
88 type : "source" ,
9- version : "0.0.4 " ,
9+ version : "0.0.5 " ,
1010 dedupe : "unique" ,
1111 props : {
1212 ...common . props ,
@@ -36,65 +36,105 @@ export default {
3636 } ,
3737 hooks : {
3838 async deploy ( ) {
39- await this . _getNewShardIterator ( ) ;
39+ const {
40+ stream,
41+ _getShardIterators,
42+ _setShardIterators,
43+ listShards,
44+ getShardIterator,
45+ } = this ;
46+
47+ const shardIterators = _getShardIterators ( ) ;
48+
49+ const { StreamDescription : streamDescription } = await listShards ( {
50+ StreamArn : stream ,
51+ Limit : 100 ,
52+ } ) ;
53+
54+ if ( streamDescription ?. Shards ?. length === 0 ) {
55+ throw new Error ( "No shards found in stream" ) ;
56+ }
57+
58+ const activeShards =
59+ streamDescription . Shards
60+ . filter ( ( shard ) => ! shard . SequenceNumberRange . EndingSequenceNumber ) ;
61+
62+ if ( activeShards . length === 0 ) {
63+ throw new Error ( "No active shards found" ) ;
64+ }
65+
66+ const shardIds = activeShards . map ( ( { ShardId } ) => ShardId ) ;
67+
68+ for ( const shardId of shardIds ) {
69+ const { ShardIterator : shardIterator } = await getShardIterator ( {
70+ ShardId : shardId ,
71+ StreamArn : stream ,
72+ ShardIteratorType : "LATEST" ,
73+ } ) ;
74+ shardIterators [ shardId ] = shardIterator ;
75+ }
76+
77+ _setShardIterators ( shardIterators ) ;
4078 } ,
4179 } ,
4280 methods : {
4381 ...common . methods ,
44- async _getNewShardIterator ( ) {
45- const { StreamDescription : streamDescription } = await this . listShards ( {
46- StreamArn : this . stream ,
47- } ) ;
48- const shardId = streamDescription . Shards [ streamDescription . Shards . length - 1 ] . ShardId ;
49- const { ShardIterator : shardIterator } = await this . getShardIterator ( {
50- ShardId : shardId ,
51- StreamArn : this . stream ,
52- ShardIteratorType : "LATEST" ,
53- } ) ;
54- this . _setShardIterator ( shardIterator ) ;
55- return shardIterator ;
82+ _getShardIterators ( ) {
83+ return this . db . get ( "shardIterators" ) || { } ;
5684 } ,
57- _getShardIterator ( ) {
58- return this . db . get ( "shardIterator" ) ;
59- } ,
60- _setShardIterator ( shardIterator ) {
61- this . db . set ( "shardIterator" , shardIterator ) ;
85+ _setShardIterators ( value ) {
86+ this . db . set ( "shardIterators" , value ) ;
6287 } ,
6388 generateMeta ( {
6489 eventID, eventName, dynamodb,
6590 } ) {
6691 return {
6792 id : eventID ,
68- summary : `New ${ eventName } event ` ,
93+ summary : `New Event: ${ eventName } ` ,
6994 ts : Date . parse ( dynamodb . ApproximateCreationDateTime ) ,
7095 } ;
7196 } ,
7297 } ,
7398 async run ( ) {
74- if ( ! ( await this . isStreamEnabled ( this . stream ) ) ) {
99+ const {
100+ stream,
101+ isStreamEnabled,
102+ _getShardIterators,
103+ _setShardIterators,
104+ getRecords,
105+ generateMeta,
106+ } = this ;
107+
108+ if ( ! ( await isStreamEnabled ( stream ) ) ) {
75109 throw new Error ( "Stream is no longer enabled." ) ;
76110 }
77111
78- const shardIterator = this . _getShardIterator ( ) ;
112+ const shardIterators = _getShardIterators ( ) ;
113+
114+ for ( const [
115+ shardId ,
116+ shardIterator ,
117+ ] of Object . entries ( shardIterators ) ) {
118+ if ( ! shardIterator ) {
119+ continue ;
120+ }
79121
80- try {
81122 const {
82123 Records : records ,
83124 NextShardIterator : nextShardIterator ,
84- } = await this . getRecords ( {
125+ } = await getRecords ( {
85126 ShardIterator : shardIterator ,
127+ Limit : 100 ,
86128 } ) ;
87129
88130 for ( const record of records ) {
89- const meta = this . generateMeta ( record ) ;
131+ const meta = generateMeta ( record ) ;
90132 this . $emit ( record , meta ) ;
91133 }
92134
93- this . _setShardIterator ( nextShardIterator ) ;
94- } catch ( e ) {
95- console . log ( "Error getting records" , e ) ;
96- console . log ( "Retrieving a new shard iterator" ) ;
97- await this . _getNewShardIterator ( ) ;
135+ shardIterators [ shardId ] = nextShardIterator ;
98136 }
137+
138+ _setShardIterators ( shardIterators ) ;
99139 } ,
100140} ;
0 commit comments