@@ -25,6 +25,9 @@ const {
25
25
const functionHelper = require ( 'serverless-offline/src/functionHelper' ) ;
26
26
const LambdaContext = require ( 'serverless-offline/src/LambdaContext' ) ;
27
27
28
+ const NO_KINESIS_FOUND = 'Could not find kinesis stream' ;
29
+ const KINESIS_RETRY_DELAY = 2000 ;
30
+
28
31
const fromCallback = fun =>
29
32
new Promise ( ( resolve , reject ) => {
30
33
fun ( ( err , data ) => {
@@ -151,22 +154,36 @@ class ServerlessOfflineKinesis {
151
154
) ;
152
155
}
153
156
154
- async createKinesisReadable ( functionName , streamEvent ) {
157
+ async createKinesisReadable ( functionName , streamEvent , retry = false ) {
155
158
const client = this . getClient ( ) ;
156
159
const streamName = this . getStreamName ( streamEvent ) ;
157
160
158
161
this . serverless . cli . log ( `${ streamName } ` ) ;
159
162
160
- const {
161
- StreamDescription : { Shards : shards }
162
- } = await fromCallback ( cb =>
163
+ const kinesisStream = await fromCallback ( cb =>
163
164
client . describeStream (
164
165
{
165
166
StreamName : streamName
166
167
} ,
167
168
cb
168
169
)
169
- ) ;
170
+ ) . catch ( err => null ) ;
171
+ if ( ! kinesisStream ) {
172
+ if ( retry ) {
173
+ this . serverless . cli . log ( `${ streamName } - not Found, retrying in ${ KINESIS_RETRY_DELAY } ms` ) ;
174
+ setTimeout (
175
+ this . createKinesisReadable . bind ( this ) ,
176
+ KINESIS_RETRY_DELAY ,
177
+ functionName ,
178
+ streamEvent ,
179
+ retry
180
+ ) ;
181
+ return ;
182
+ } else throw new Error ( NO_KINESIS_FOUND ) ;
183
+ }
184
+ const {
185
+ StreamDescription : { Shards : shards }
186
+ } = kinesisStream ;
170
187
171
188
forEach ( ( { ShardId : shardId } ) => {
172
189
const readable = KinesisReadable (
@@ -222,7 +239,7 @@ class ServerlessOfflineKinesis {
222
239
this . serverless . cli . log ( `Kinesis for ${ functionName } :` ) ;
223
240
224
241
forEach ( streamEvent => {
225
- this . createKinesisReadable ( functionName , streamEvent ) ;
242
+ this . createKinesisReadable ( functionName , streamEvent , true ) ; // TMP: retry is not configurable so far
226
243
} , streams ) ;
227
244
228
245
printBlankLine ( ) ;
0 commit comments