Skip to content

Commit 38c6b53

Browse files
committed
Factorise integration test and stabilise with line processing ✂️
Several lines could be capture captured in same chunk which can cause lambda invocation to be missed. (hence the serverless hang and travis goes into timeout)
1 parent 1738921 commit 38c6b53

File tree

8 files changed

+96
-10135
lines changed

8 files changed

+96
-10135
lines changed

tests/serverless-plugins-integration/package-lock.json

Lines changed: 29 additions & 10096 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/serverless-plugins-integration/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
"aws-sdk": "^2.757.0",
2525
"lodash": "^4.17.20",
2626
"minio": "^7.0.16",
27+
"pump": "^3.0.0",
2728
"serverless": "^1.83.0",
2829
"serverless-offline": "^6.7.0",
2930
"serverless-offline-dynamodb-streams": "^4.1.0",

tests/serverless-plugins-integration/test-dynamodb-streams.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ const {Writable} = require('stream');
22
const {spawn} = require('child_process');
33
const onExit = require('signal-exit');
44
const {DynamoDB} = require('aws-sdk');
5+
const pump = require('pump');
6+
const {delay, getSplitLinesTransform} = require('./utils');
57

68
const client = new DynamoDB({
79
region: 'eu-west-1',
@@ -38,9 +40,7 @@ const putItems = () =>
3840
let setupInProgress = true;
3941
const populateTables = async () => {
4042
await unemptyTables();
41-
await new Promise(resolve => {
42-
setTimeout(resolve, 1200);
43-
});
43+
await delay(1200);
4444
setupInProgress = false;
4545
await putItems();
4646
};
@@ -56,19 +56,20 @@ const serverless = spawn(
5656

5757
const set = new Set();
5858
let invocationCount = 0;
59-
serverless.stdout.pipe(
59+
pump(
60+
serverless.stdout,
61+
getSplitLinesTransform(),
6062
new Writable({
61-
write(chunk, enc, cb) {
62-
const output = chunk.toString();
63-
64-
if (/Starting Offline Dynamodb Streams/.test(output)) {
63+
objectMode: true,
64+
write(line, enc, cb) {
65+
if (/Starting Offline Dynamodb Streams/.test(line)) {
6566
populateTables(); // will run in the background
6667
}
6768

6869
if (setupInProgress) return cb(); // do not consider lambda executions before we post the real items
6970

7071
const matches = /offline: \(λ: (.*)\) RequestId: .* Duration: .* ms {2}Billed Duration: .* ms/g.exec(
71-
output
72+
line
7273
);
7374

7475
if (matches) {

tests/serverless-plugins-integration/test-kinesis.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ const {Writable} = require('stream');
22
const {spawn} = require('child_process');
33
const onExit = require('signal-exit');
44
const {Kinesis} = require('aws-sdk');
5+
const pump = require('pump');
6+
const {delay, getSplitLinesTransform} = require('./utils');
57

68
const client = new Kinesis({
79
region: 'eu-west-1',
@@ -11,9 +13,7 @@ const client = new Kinesis({
1113
});
1214

1315
const putRecords = async () => {
14-
await new Promise(resolve => {
15-
setTimeout(resolve, 1000);
16-
});
16+
await delay(1000);
1717

1818
await Promise.all([
1919
client
@@ -52,19 +52,20 @@ const serverless = spawn('serverless', ['--config', 'serverless.kinesis.yml', 'o
5252
cwd: __dirname
5353
});
5454

55-
serverless.stdout.pipe(
55+
pump(
56+
serverless.stdout,
57+
getSplitLinesTransform(),
5658
new Writable({
57-
write(chunk, enc, cb) {
58-
const output = chunk.toString();
59-
60-
if (/Starting Offline Kinesis/.test(output)) {
59+
objectMode: true,
60+
write(line, enc, cb) {
61+
if (/Starting Offline Kinesis/.test(line)) {
6162
putRecords();
6263
}
6364

6465
this.count =
6566
(this.count || 0) +
6667
(
67-
output.match(
68+
line.match(
6869
/offline: \(λ: .*\) RequestId: .* Duration: .* ms {2}Billed Duration: .* ms/g
6970
) || []
7071
).length;

tests/serverless-plugins-integration/test-s3.js

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ const {Writable} = require('stream');
22
const {spawn} = require('child_process');
33
const onExit = require('signal-exit');
44
const Minio = require('minio');
5+
const pump = require('pump');
6+
const {delay, getSplitLinesTransform} = require('./utils');
57

68
const client = new Minio.Client({
79
region: 'eu-west-1',
@@ -14,9 +16,7 @@ const client = new Minio.Client({
1416

1517
const path = './files/test.txt';
1618
const uploadFiles = async () => {
17-
await new Promise(resolve => {
18-
setTimeout(resolve, 1000);
19-
});
19+
await delay(1000);
2020

2121
await Promise.all([
2222
client.fPutObject('documents', 'first.txt', path),
@@ -34,19 +34,20 @@ const serverless = spawn('serverless', ['--config', 'serverless.s3.yml', 'offlin
3434
cwd: __dirname
3535
});
3636

37-
serverless.stdout.pipe(
37+
pump(
38+
serverless.stdout,
39+
getSplitLinesTransform(),
3840
new Writable({
39-
write(chunk, enc, cb) {
40-
const output = chunk.toString();
41-
42-
if (/Starting Offline S3/.test(output)) {
41+
objectMode: true,
42+
write(line, enc, cb) {
43+
if (/Starting Offline S3/.test(line)) {
4344
uploadFiles();
4445
}
4546

4647
this.count =
4748
(this.count || 0) +
4849
(
49-
output.match(
50+
line.match(
5051
/offline: \(λ: .*\) RequestId: .* Duration: .* ms {2}Billed Duration: .* ms/g
5152
) || []
5253
).length;

tests/serverless-plugins-integration/test-sqs-autocreate.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ const {Writable} = require('stream');
22
const {spawn} = require('child_process');
33
const onExit = require('signal-exit');
44
const {SQS} = require('aws-sdk');
5+
const pump = require('pump');
6+
const {getSplitLinesTransform} = require('./utils');
57

68
const client = new SQS({
79
region: 'eu-west-1',
@@ -43,12 +45,13 @@ const serverless = spawn(
4345
}
4446
);
4547

46-
serverless.stdout.pipe(
48+
pump(
49+
serverless.stdout,
50+
getSplitLinesTransform(),
4751
new Writable({
48-
write(chunk, enc, cb) {
49-
const output = chunk.toString();
50-
51-
if (/Starting Offline SQS/.test(output)) {
52+
objectMode: true,
53+
write(line, enc, cb) {
54+
if (/Starting Offline SQS/.test(line)) {
5255
sendMessages()
5356
.then(() => console.log('sucessfully send messages'))
5457
.catch(err => {
@@ -59,7 +62,7 @@ serverless.stdout.pipe(
5962
this.count =
6063
(this.count || 0) +
6164
(
62-
output.match(
65+
line.match(
6366
/offline: \(λ: .*\) RequestId: .* Duration: .* ms {2}Billed Duration: .* ms/g
6467
) || []
6568
).length;

tests/serverless-plugins-integration/test-sqs.js

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ const {Writable} = require('stream');
22
const {spawn} = require('child_process');
33
const onExit = require('signal-exit');
44
const {SQS} = require('aws-sdk');
5+
const pump = require('pump');
6+
const {getSplitLinesTransform} = require('./utils');
57

68
const client = new SQS({
79
region: 'eu-west-1',
@@ -47,19 +49,20 @@ const serverless = spawn('serverless', ['--config', 'serverless.sqs.yml', 'offli
4749
cwd: __dirname
4850
});
4951

50-
serverless.stdout.pipe(
52+
pump(
53+
serverless.stdout,
54+
getSplitLinesTransform(),
5155
new Writable({
52-
write(chunk, enc, cb) {
53-
const output = chunk.toString();
54-
55-
if (/Starting Offline SQS/.test(output)) {
56+
objectMode: true,
57+
write(line, enc, cb) {
58+
if (/Starting Offline SQS/.test(line)) {
5659
sendMessages();
5760
}
5861

5962
this.count =
6063
(this.count || 0) +
6164
(
62-
output.match(
65+
line.match(
6366
/offline: \(λ: .*\) RequestId: .* Duration: .* ms {2}Billed Duration: .* ms/g
6467
) || []
6568
).length;
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
const {Transform} = require('stream');
2+
3+
const getSplitLinesTransform = () =>
4+
new Transform({
5+
objectMode: true,
6+
transform(chunk, encoding, callback) {
7+
const lines = chunk.toString().trim().split('\n');
8+
lines.forEach(line => this.push(line));
9+
callback();
10+
}
11+
});
12+
13+
const delay = duration =>
14+
new Promise(resolve => {
15+
setTimeout(resolve, duration);
16+
});
17+
18+
module.exports = {getSplitLinesTransform, delay};

0 commit comments

Comments
 (0)