Skip to content

Commit 42e1c3a

Browse files
committed
Fully integrates feature from #94 with modifications from #89
1 parent 51366fc commit 42e1c3a

File tree

1 file changed

+17
-34
lines changed
  • packages/serverless-offline-kinesis/src

1 file changed

+17
-34
lines changed

packages/serverless-offline-kinesis/src/index.js

Lines changed: 17 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const LambdaContext = require('serverless-offline/src/LambdaContext');
2929

3030
const NO_KINESIS_FOUND = 'Could not find kinesis stream';
3131
const KINESIS_RETRY_DELAY = 200;
32+
const KINESIS_RETRY_TIMEOUT = 30000;
3233

3334
const printBlankLine = () => console.log();
3435

@@ -166,65 +167,45 @@ class ServerlessOfflineKinesis {
166167
if (isString(physicalResourceName)) return physicalResourceName;
167168
}
168169

169-
this.serverless.cli.log(`Could not resolve stream name for spec: ${JSON.stringify(streamEvent, null, 2)}`);
170+
this.serverless.cli.log(
171+
`Could not resolve stream name for spec: ${JSON.stringify(streamEvent, null, 2)}`
172+
);
170173

171174
throw new Error(
172175
`StreamName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-kinesis#functions`
173176
);
174177
}
175178

176-
// FIXME: to really incorporate [to be done after conflict resolving]
177-
pollStreamUntilActive(streamName, timeout) {
178-
const client = this.getClient();
179-
const lastTime = Date.now() + timeout;
180-
return new Promise((resolve, reject) => {
181-
const poll = async () => {
182-
const {
183-
StreamDescription: {StreamStatus}
184-
} = await client.describeStream({StreamName: streamName}).promise();
185-
if (StreamStatus === 'ACTIVE') {
186-
resolve();
187-
} else if (Date.now() > lastTime) {
188-
reject(
189-
new Error(
190-
`Stream ${streamName} did not become active within timeout of ${Math.floor(
191-
timeout / 1000
192-
)}s`
193-
)
194-
);
195-
} else {
196-
setTimeout(poll, 1000);
197-
}
198-
};
199-
poll();
200-
});
201-
}
202-
203-
async createKinesisReadable(functionName, streamEvent, retry = false) {
179+
async createKinesisReadable(functionName, streamEvent, delay = null) {
204180
const client = this.getClient();
205181
const streamName = this.getStreamName(streamEvent);
206182

207183
this.serverless.cli.log(`Waiting for ${streamName} to become active`);
208184

209-
await this.pollStreamUntilActive(streamName, this.getConfig().waitForActiveTimeout || 30000); // FIXME
210-
211185
const kinesisStream = await client
212186
.describeStream({
213187
StreamName: streamName
214188
})
215189
.promise()
190+
.then(({StreamDescription}) => {
191+
if (StreamDescription.StreamStatus !== 'ACTIVE')
192+
throw new Error('Stream found but not yet active');
193+
return {StreamDescription};
194+
})
216195
.catch(err => err);
217196

218197
if (kinesisStream instanceof Error) {
219-
if (!retry) throw new Error(NO_KINESIS_FOUND);
198+
if (delay === null) throw new Error(NO_KINESIS_FOUND);
199+
if (delay < KINESIS_RETRY_DELAY)
200+
throw new Error(`Stream ${streamName} did not become active within specified timeout`);
220201

221202
this.serverless.cli.log(
222203
`${streamName} - not found because of ${
223204
kinesisStream.code
224205
}, retrying in ${KINESIS_RETRY_DELAY}ms`
225206
);
226207
return setTimeout(() => {
227-
this.createKinesisReadable(functionName, streamEvent, retry);
208+
this.createKinesisReadable(functionName, streamEvent, delay - KINESIS_RETRY_DELAY);
228209
}, KINESIS_RETRY_DELAY);
229210
}
230211

@@ -286,8 +267,10 @@ class ServerlessOfflineKinesis {
286267
printBlankLine();
287268
this.serverless.cli.log(`Kinesis for ${functionName}:`);
288269

270+
const waitForStreamDelay = this.getConfig().waitForActiveTimeout || KINESIS_RETRY_TIMEOUT;
271+
// ! FIXME: probably rename (and document the variable name)
289272
forEach(streamEvent => {
290-
this.createKinesisReadable(functionName, streamEvent, true); // TMP: retry is not configurable so far
273+
this.createKinesisReadable(functionName, streamEvent, waitForStreamDelay);
291274
}, streams);
292275

293276
printBlankLine();

0 commit comments

Comments
 (0)