Skip to content

Commit 118683d

Browse files
authored
Support of serverless-offline v6 (CoorpAcademy#129)
1 parent fc19ef6 commit 118683d

33 files changed

+1218
-731
lines changed

.nvmrc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
v8.15.1
1+
v10.21.0

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ language: node_js
22
sudo: required
33

44
node_js:
5+
- "14"
56
- "12"
67
- "10"
7-
- "8"
88

99
services:
1010
- docker

.vscode/launch.json

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,39 +8,39 @@
88
"type": "node",
99
"request": "launch",
1010
"name": "SQS",
11-
"program": "${workspaceFolder}/packages/serverless-offline-sqs-integration/node_modules/.bin/serverless",
12-
"cwd": "${workspaceFolder}/packages/serverless-offline-sqs-integration",
13-
"env": {
14-
"PATH": "${env:PATH}:${workspaceFolder}/packages/serverless-offline-sqs-integration/node_modules/.bin"
15-
},
11+
"program": "${workspaceFolder}/tests/serverless-plugins-integration/node_modules/.bin/sls",
12+
"cwd": "${workspaceFolder}/tests/serverless-plugins-integration",
1613
"args": [
17-
"offline"
14+
"--config",
15+
"serverless.sqs.autocreate.yml",
16+
"offline",
17+
"start"
1818
]
1919
},
2020
{
2121
"type": "node",
2222
"request": "launch",
2323
"name": "DynamoDB Streams",
24-
"program": "${workspaceFolder}/packages/serverless-offline-dynamodb-streams-integration/node_modules/.bin/serverless",
25-
"cwd": "${workspaceFolder}/packages/serverless-offline-dynamodb-streams-integration",
26-
"env": {
27-
"PATH": "${env:PATH}:${workspaceFolder}/packages/serverless-offline-dynamodb-streams-integration/node_modules/.bin"
28-
},
24+
"program": "${workspaceFolder}/tests/serverless-plugins-integration/node_modules/.bin/sls",
25+
"cwd": "${workspaceFolder}/tests/serverless-plugins-integration",
2926
"args": [
30-
"offline"
27+
"--config",
28+
"serverless.dynamodb-streams.yml",
29+
"offline",
30+
"start"
3131
]
3232
},
3333
{
3434
"type": "node",
3535
"request": "launch",
3636
"name": "Kinesis",
37-
"program": "${workspaceFolder}/packages/serverless-offline-kinesis-integration/node_modules/.bin/serverless",
38-
"cwd": "${workspaceFolder}/packages/serverless-offline-kinesis-integration",
39-
"env": {
40-
"PATH": "${env:PATH}:${workspaceFolder}/packages/serverless-offline-kinesis-integration/node_modules/.bin"
41-
},
37+
"program": "${workspaceFolder}/tests/serverless-plugins-integration/node_modules/.bin/sls",
38+
"cwd": "${workspaceFolder}/tests/serverless-plugins-integration",
4239
"args": [
43-
"offline"
40+
"--config",
41+
"serverless.kinesis.yml",
42+
"offline",
43+
"start"
4444
]
4545
}
4646
]

packages/dynamodb-streams-readable/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"author": "godu",
1818
"license": "MIT",
1919
"devDependencies": {
20-
"uuid": "^7.0.2"
20+
"uuid": "^8.1.0"
2121
},
2222
"keywords": [
2323
"dynamodb",

packages/serverless-apigateway-access-logs/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
"author": "Adrien Becchis @Coorpacademy (https://github.com/AdrieanKhisbe)",
1818
"license": "MIT",
1919
"dependencies": {
20-
"aws-sdk": "^2.642.0",
20+
"aws-sdk": "^2.696.0",
2121
"lodash": "^4.17.15"
2222
},
2323
"keywords": [

packages/serverless-offline-dynamodb-streams/package.json

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@
1212
},
1313
"homepage": "https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-dynamodb-streams#readme",
1414
"files": [
15-
"src/index.js"
15+
"src/index.js",
16+
"src/dynamodb-streams.js",
17+
"src/dynamodb-streams-event.js",
18+
"src/dynamodb-streams-event-definition.js"
1619
],
1720
"author": "godu",
1821
"license": "MIT",
1922
"peerDependencies": {
2023
"serverless-offline": "^5.12.1"
2124
},
2225
"dependencies": {
23-
"aws-sdk": "^2.642.0",
26+
"aws-sdk": "^2.696.0",
2427
"dynamodb-streams-readable": "^1.0.5",
25-
"figures": "^3.2.0",
2628
"lodash": "^4.17.15"
2729
},
2830
"devDependencies": {
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
const {isNil, omit} = require('lodash/fp');
2+
3+
const extractTableNameFromARN = arn => {
4+
const [, , , , , TableURI] = arn.split(':');
5+
const [, TableName] = TableURI.split('/');
6+
return TableName;
7+
};
8+
9+
class DynamodbStreamsEventDefinition {
10+
constructor(rawSqsEventDefinition, region, accountId) {
11+
this.batchSize = 100;
12+
this.maximumRetryAttempts = 10;
13+
this.startingPosition = 'LATEST';
14+
15+
let enabled;
16+
let tableName;
17+
18+
if (typeof rawSqsEventDefinition === 'string') {
19+
tableName = extractTableNameFromARN(rawSqsEventDefinition);
20+
} else if (typeof rawSqsEventDefinition.arn === 'string') {
21+
tableName = extractTableNameFromARN(rawSqsEventDefinition.arn);
22+
} else if (typeof rawSqsEventDefinition.tableName === 'string') {
23+
tableName = rawSqsEventDefinition.tableName;
24+
}
25+
26+
this.enabled = isNil(enabled) ? true : enabled;
27+
28+
this.arn = `arn:aws:dynamodb:${region}:${accountId}:${tableName}`;
29+
this.tableName = tableName;
30+
31+
if (typeof rawSqsEventDefinition !== 'string') {
32+
Object.assign(this, omit(['arn', 'tableName', 'enabled'], rawSqsEventDefinition));
33+
}
34+
}
35+
}
36+
37+
module.exports = DynamodbStreamsEventDefinition;
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
const {assign} = require('lodash/fp');
2+
3+
class KinesisEvent {
4+
constructor(Records, region, streamArn) {
5+
this.Records = Records.map(
6+
assign({
7+
eventSourceARN: streamArn,
8+
awsRegion: region
9+
})
10+
);
11+
}
12+
}
13+
14+
module.exports = KinesisEvent;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
const {Writable} = require('stream');
2+
const DynamodbClient = require('aws-sdk/clients/dynamodb');
3+
const DynamodbStreamsClient = require('aws-sdk/clients/dynamodbstreams');
4+
const DynamodbStreamsReadable = require('dynamodb-streams-readable');
5+
const {assign} = require('lodash/fp');
6+
const DynamodbStreamsEventDefinition = require('./dynamodb-streams-event-definition');
7+
const DynamodbStreamsEvent = require('./dynamodb-streams-event');
8+
9+
const delay = timeout => new Promise(resolve => setTimeout(resolve, timeout));
10+
11+
class DynamodbStreams {
12+
constructor(lambda, options) {
13+
this.lambda = null;
14+
this.options = null;
15+
16+
this.lambda = lambda;
17+
this.options = options;
18+
19+
this.client = new DynamodbClient(this.options);
20+
this.streamsClient = new DynamodbStreamsClient(this.options);
21+
22+
this.readables = [];
23+
}
24+
25+
create(events) {
26+
return Promise.all(
27+
events.map(({functionKey, dynamodbStreams}) => this._create(functionKey, dynamodbStreams))
28+
);
29+
}
30+
31+
start() {
32+
this.readables.forEach(readable => readable.resume());
33+
}
34+
35+
stop(timeout) {
36+
this.readables.forEach(readable => readable.pause());
37+
}
38+
39+
_create(functionKey, rawDynamodbStreamsEventDefinition) {
40+
const dynamodbStreamsEvent = new DynamodbStreamsEventDefinition(
41+
rawDynamodbStreamsEventDefinition,
42+
this.options.region,
43+
this.options.accountId
44+
);
45+
46+
return this._dynamodbStreamsEvent(functionKey, dynamodbStreamsEvent);
47+
}
48+
49+
async _dynamodbStreamsEvent(functionKey, dynamodbStreamsEvent) {
50+
const {
51+
enabled,
52+
tableName,
53+
arn,
54+
batchSize,
55+
startingPosition,
56+
maximumRetryAttempts
57+
} = dynamodbStreamsEvent;
58+
59+
if (!enabled) return;
60+
61+
const {
62+
Table: {LatestStreamArn}
63+
} = await this.client
64+
.describeTable({
65+
TableName: tableName
66+
})
67+
.promise();
68+
69+
const {
70+
StreamDescription: {Shards: shards}
71+
} = await this.streamsClient
72+
.describeStream({
73+
StreamArn: LatestStreamArn
74+
})
75+
.promise();
76+
77+
shards.forEach(({ShardId: shardId}) => {
78+
const readable = DynamodbStreamsReadable(
79+
this.streamsClient,
80+
LatestStreamArn,
81+
assign(dynamodbStreamsEvent, {
82+
shardId,
83+
limit: batchSize,
84+
iterator: startingPosition
85+
})
86+
);
87+
88+
const writable = new Writable({
89+
objectMode: true,
90+
write: (chunk, _, cb) => {
91+
const task = async remainingAttempts => {
92+
try {
93+
const lambdaFunction = this.lambda.get(functionKey);
94+
95+
const event = new DynamodbStreamsEvent(chunk, this.region, arn);
96+
lambdaFunction.setEvent(event);
97+
98+
await lambdaFunction.runHandler();
99+
} catch (err) {
100+
if (remainingAttempts > 0) {
101+
await delay(500);
102+
return task(remainingAttempts - 1);
103+
}
104+
}
105+
};
106+
107+
task(maximumRetryAttempts - 1)
108+
.then(() => cb())
109+
.catch(cb);
110+
}
111+
});
112+
113+
readable.pipe(writable);
114+
readable.pause();
115+
116+
this.readables.push(readable);
117+
});
118+
}
119+
}
120+
121+
module.exports = DynamodbStreams;

0 commit comments

Comments
 (0)