Skip to content

Commit 3d9a107

Browse files
Chris ArmstrongAdrieanKhisbe
authored andcommitted
handle Fn::GetAtt and Fn::Join in kinesis stream spec [reworked from #94]
1 parent d02f5b2 commit 3d9a107

File tree

3 files changed

+144
-6
lines changed

3 files changed

+144
-6
lines changed

packages/serverless-offline-kinesis/package.json

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,11 @@
3232
"kinesis",
3333
"serverless",
3434
"lambda"
35-
]
35+
],
36+
"devDependencies": {
37+
"jest": "^24.9.0"
38+
},
39+
"scripts": {
40+
"test": "jest"
41+
}
3642
}

packages/serverless-offline-kinesis/src/index.js

Lines changed: 61 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
const {join} = require('path');
1+
const path = require('path');
22
const {Writable} = require('stream');
33
const figures = require('figures');
44
const Kinesis = require('aws-sdk/clients/kinesis');
@@ -19,6 +19,8 @@ const {
1919
matchesProperty,
2020
omitBy,
2121
isString,
22+
isObject,
23+
isArray,
2224
pipe,
2325
startsWith
2426
} = require('lodash/fp');
@@ -36,6 +38,22 @@ const extractStreamNameFromARN = arn => {
3638
return StreamNames.join('/');
3739
};
3840

41+
const extractStreamNameFromGetAtt = getAtt => {
42+
if (isArray(getAtt)) return getAtt[0];
43+
if (isString(getAtt) && getAtt.endsWith('.Arn')) return getAtt.replace(/\.Arn$/, '');
44+
throw new Error('Unable to parse Fn::GetAtt for stream cross-reference');
45+
};
46+
47+
const extractStreamNameFromJoin = ([delimiter, parts]) => {
48+
const resolvedParts = parts.map(part => {
49+
if (isString(part)) return part;
50+
// TODO maybe handle getAtt in Join?
51+
if (isObject(part)) return ''; // empty string as placeholder
52+
return '';
53+
});
54+
return extractStreamNameFromARN(resolvedParts.join(delimiter));
55+
};
56+
3957
class ServerlessOfflineKinesis {
4058
constructor(serverless, options) {
4159
this.serverless = serverless;
@@ -87,7 +105,7 @@ class ServerlessOfflineKinesis {
87105
process.env = functionEnv;
88106

89107
const serviceRuntime = this.service.provider.runtime;
90-
const servicePath = join(this.serverless.config.servicePath, location);
108+
const servicePath = path.join(this.serverless.config.servicePath, location);
91109
const funOptions = functionHelper.getFunctionOptions(
92110
__function,
93111
functionName,
@@ -134,23 +152,59 @@ class ServerlessOfflineKinesis {
134152
if (isString(streamEvent.arn)) return extractStreamNameFromARN(streamEvent.arn);
135153
if (isString(streamEvent.streamName)) return streamEvent.streamName;
136154

137-
if (streamEvent.arn['Fn::GetAtt']) {
138-
const [ResourceName] = streamEvent.arn['Fn::GetAtt'];
155+
const {'Fn::GetAtt': getAtt, 'Fn::Join': join} = streamEvent.arn;
156+
if (getAtt) {
157+
const [ResourceName] = streamEvent.arn[getAtt];
158+
// const logicalResourceName = extractStreamNameFromGetAtt(getAtt);
159+
// const physicalResourceName = get(['service', 'resources', 'Resources', logicalResourceName, 'Properties', 'Name'])(this);
139160

140161
const name = get(`resources.Resources.${ResourceName}.Properties.Name`, this.service);
141162
if (isString(name)) return name;
142163
}
164+
if (join) {
165+
const physicalResourceName = extractStreamNameFromJoin(join); // Fixme name
166+
if (isString(physicalResourceName)) return physicalResourceName;
167+
}
143168

144169
throw new Error(
145170
`StreamName not found. See https://github.com/CoorpAcademy/serverless-plugins/tree/master/packages/serverless-offline-kinesis#functions`
146171
);
147172
}
148173

174+
// FIXME: to really incorporate [to be done after conflict resolving]
175+
pollStreamUntilActive(streamName, timeout) {
176+
const client = this.getClient();
177+
const lastTime = Date.now() + timeout;
178+
return new Promise((resolve, reject) => {
179+
const poll = async () => {
180+
const {
181+
StreamDescription: {StreamStatus}
182+
} = await client.describeStream({StreamName: streamName}).promise();
183+
if (StreamStatus === 'ACTIVE') {
184+
resolve();
185+
} else if (Date.now() > lastTime) {
186+
reject(
187+
new Error(
188+
`Stream ${streamName} did not become active within timeout of ${Math.floor(
189+
timeout / 1000
190+
)}s`
191+
)
192+
);
193+
} else {
194+
setTimeout(poll, 1000);
195+
}
196+
};
197+
poll();
198+
});
199+
}
200+
149201
async createKinesisReadable(functionName, streamEvent, retry = false) {
150202
const client = this.getClient();
151203
const streamName = this.getStreamName(streamEvent);
152204

153-
this.serverless.cli.log(`${streamName}`);
205+
this.serverless.cli.log(`Waiting for ${streamName} to become active`);
206+
207+
await this.pollStreamUntilActive(streamName, this.getConfig().waitForActiveTimeout || 30000); // FIXME
154208

155209
const kinesisStream = await client
156210
.describeStream({
@@ -175,6 +229,7 @@ class ServerlessOfflineKinesis {
175229
const {
176230
StreamDescription: {Shards: shards}
177231
} = kinesisStream;
232+
this.serverless.cli.log(`${streamName} - creating listeners for ${shards.length} shards`);
178233

179234
forEach(({ShardId: shardId}) => {
180235
const readable = KinesisReadable(
@@ -244,3 +299,4 @@ class ServerlessOfflineKinesis {
244299
}
245300

246301
module.exports = ServerlessOfflineKinesis;
302+
module.exports.extractStreamNameFromGetAtt = extractStreamNameFromGetAtt;
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
const ServerlessOfflineKinesis = require('./index');
2+
const { extractStreamNameFromGetAtt } = ServerlessOfflineKinesis;
3+
4+
test('extractStreamNameFromGetAtt handles array Fn::GetAtt', () => {
5+
expect(extractStreamNameFromGetAtt(['MyResource', 'Arn'])).toEqual('MyResource');
6+
});
7+
test('extractStreamNameFromGetAtt handles string Fn::GetAtt', () => {
8+
expect(extractStreamNameFromGetAtt('MyResource.Arn')).toEqual('MyResource');
9+
});
10+
test('extractStreamNameFromGetAtt throws on other cases', () => {
11+
expect(() => extractStreamNameFromGetAtt({ MyResource: 'Arn' })).toThrow();
12+
});
13+
14+
const baseServerless = {
15+
service: {
16+
resources: {
17+
Resources: {
18+
TestStream: {
19+
Type: 'AWS::Kinesis::Stream',
20+
Properties: {
21+
Name: 'test-stream-dev'
22+
},
23+
},
24+
},
25+
},
26+
},
27+
};
28+
29+
test('getStreamName handles a directly specified ARN', () => {
30+
const plugin = new ServerlessOfflineKinesis(baseServerless);
31+
expect(plugin.getStreamName('arn:aws:kinesis:us-east-1:123456789012:stream/TestStream')).toEqual('TestStream');
32+
})
33+
34+
test('getStreamName handles an object with a arn string property', () => {
35+
const plugin = new ServerlessOfflineKinesis(baseServerless);
36+
expect(plugin.getStreamName({
37+
arn: 'arn:aws:kinesis:us-east-1:123456789012:stream/TestStream'
38+
})).toEqual('TestStream');
39+
});
40+
41+
test('getStreamName handles an object with a streamName property', () => {
42+
const plugin = new ServerlessOfflineKinesis(baseServerless);
43+
expect(plugin.getStreamName({
44+
streamName: 'TestStream',
45+
})).toEqual('TestStream');
46+
});
47+
48+
test('getStreamName handles an object with an arn Fn::GetAtt lookup (array form)', () => {
49+
const plugin = new ServerlessOfflineKinesis(baseServerless);
50+
expect(plugin.getStreamName({
51+
arn: {
52+
'Fn::GetAtt': ['TestStream', 'Arn'],
53+
}
54+
})).toEqual('test-stream-dev');
55+
});
56+
57+
test('getStreamName handles an object with an arn Fn::GetAtt lookup (string form)', () => {
58+
const plugin = new ServerlessOfflineKinesis(baseServerless);
59+
expect(plugin.getStreamName({
60+
arn: {
61+
'Fn::GetAtt': 'TestStream.Arn',
62+
}
63+
})).toEqual('test-stream-dev');
64+
});
65+
test('getStreamName makes a naïve attempt to parse an arn Fn::Join lookup', () => {
66+
const plugin = new ServerlessOfflineKinesis(baseServerless);
67+
expect(plugin.getStreamName({
68+
arn: {
69+
'Fn::Join': [
70+
':',
71+
['arn', 'aws', 'kinesis', { Ref: 'AWS::Region'}, { Ref: 'AWS::AccountId' }, 'stream/my-stream-dev']
72+
]
73+
}
74+
})).toEqual('my-stream-dev');
75+
});
76+

0 commit comments

Comments
 (0)