Skip to content

Commit d7290ba

Browse files
author
John Gilbert
committed
add-scheduler-support
1 parent 25990d0 commit d7290ba

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

src/connectors/scheduler.js

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/* eslint import/no-extraneous-dependencies: ["error", {"devDependencies": true}] */
2+
import {
3+
CreateScheduleCommand,
4+
SchedulerClient,
5+
ActionAfterCompletion,
6+
} from '@aws-sdk/client-scheduler';
7+
import Promise from 'bluebird';
8+
import { NodeHttpHandler } from '@smithy/node-http-handler';
9+
import { ConfiguredRetryStrategy } from '@smithy/util-retry';
10+
import { omit, pick } from 'lodash';
11+
import { defaultRetryConfig, defaultBackoffDelay } from '../utils/retry';
12+
import { defaultDebugLogger } from '../utils/log';
13+
14+
class Connector {
15+
constructor({
16+
debug,
17+
pipelineId,
18+
timeout = Number(process.env.SCHEDULER_TIMEOUT)
19+
|| Number(process.env.TIMEOUT)
20+
|| 1000,
21+
busArn = process.env.BUS_ARN,
22+
roleArn = process.env.SCHEDULER_ROLE_ARN,
23+
kmsKeyArn = process.env.MASTER_KEY_ARN,
24+
retryConfig = defaultRetryConfig,
25+
additionalClientOpts = {},
26+
...opt
27+
}) {
28+
this.debug = (msg) => debug('%j', msg);
29+
this.client = Connector.getClient(pipelineId, debug, timeout, additionalClientOpts);
30+
this.busArn = busArn;
31+
this.roleArn = roleArn;
32+
this.kmsKeyArn = kmsKeyArn;
33+
this.retryConfig = retryConfig;
34+
this.opt = opt;
35+
}
36+
37+
static clients = {};
38+
39+
static getClient(pipelineId, debug, timeout, additionalClientOpts) {
40+
const addlRequestHandlerOpts = pick(additionalClientOpts, ['requestHandler']);
41+
const addlClientOpts = omit(additionalClientOpts, ['requestHandler']);
42+
43+
if (!this.clients[pipelineId]) {
44+
this.clients[pipelineId] = new SchedulerClient({
45+
requestHandler: new NodeHttpHandler({
46+
requestTimeout: timeout,
47+
connectionTimeout: timeout,
48+
...addlRequestHandlerOpts,
49+
}),
50+
retryStrategy: new ConfiguredRetryStrategy(11, defaultBackoffDelay),
51+
logger: defaultDebugLogger(debug),
52+
...addlClientOpts,
53+
});
54+
}
55+
return this.clients[pipelineId];
56+
}
57+
58+
schedule(inputParams, ctx) {
59+
const params = {
60+
...inputParams,
61+
Target: {
62+
...inputParams.Target,
63+
Arn: inputParams.Target.Arn || this.busArn,
64+
RoleArn: inputParams.Target.RoleArn || this.roleArn,
65+
},
66+
ActionAfterCompletion: ActionAfterCompletion.DELETE,
67+
KmsKeyArn: params.KmsKeyArn || this.kmsKeyArn,
68+
};
69+
70+
return this._sendCommand(new CreateScheduleCommand(params), ctx);
71+
}
72+
73+
_sendCommand(command, ctx) {
74+
this.opt.metrics?.capture(this.client, command, 'scheduler', this.opt, ctx);
75+
return Promise.resolve(this.client.send(command))
76+
.tap(this.debug)
77+
.tapCatch(this.debug);
78+
}
79+
}
80+
81+
export default Connector;

src/sinks/scheduler.js

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import _ from 'highland';
2+
3+
import Connector from '../connectors/scheduler';
4+
5+
import { rejectWithFault } from '../utils/faults';
6+
import { debug as d } from '../utils/print';
7+
import { ratelimit } from '../utils/ratelimit';
8+
9+
export const scheduleEvent = ({
10+
id: pipelineId,
11+
debug = d('scheduler'),
12+
timeout = Number(process.env.SCHEDULER_TIMEOUT) || Number(process.env.TIMEOUT) || 1000,
13+
parallel = Number(process.env.PARALLEL) || 4,
14+
scheduleRequestField = 'scheduleRequest',
15+
scheduleResponseField = 'scheduleResponse',
16+
step = 'schedule',
17+
...opt
18+
} = {}) => {
19+
const connector = new Connector({
20+
pipelineId, debug, ...opt,
21+
});
22+
23+
const invoke = (uow) => {
24+
/* istanbul ignore next */
25+
if (!uow[scheduleRequestField]) return _(Promise.resolve(uow));
26+
27+
const schedulePromise = connector
28+
.schedule(uow[scheduleRequestField])
29+
.then((scheduledResponse) => ({
30+
...uow,
31+
[scheduleResponseField]: scheduledResponse,
32+
}))
33+
.catch(rejectWithFault(uow));
34+
35+
return _(schedulePromise);
36+
};
37+
38+
return (s) => s
39+
.through(ratelimit(opt))
40+
.map(invoke)
41+
.parallel(parallel);
42+
};

0 commit comments

Comments
 (0)