Skip to content

Commit 540d7c0

Browse files
authored
Merge pull request #1012 from scramjetorg/feat/event-hub-clean
Event Hub
2 parents 7eada46 + 0396019 commit 540d7c0

File tree

9 files changed

+198
-26
lines changed

9 files changed

+198
-26
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/* eslint-disable no-console */
2+
3+
// eslint-disable-next-line valid-jsdoc
4+
/**
5+
* Simple test event sequence.
6+
*
7+
* @param {never} _input - unused
8+
* @param {string} inputEvent - input
9+
* @param {string} outputEvent - output
10+
* @returns {void}
11+
* @this {import("@scramjet/types").AppContext<{}, {}>} - context
12+
*/
13+
module.exports = async function(_input, inputEvent = "in", outputEvent = "out") {
14+
this.logger.info("started");
15+
return new Promise((res) => {
16+
this.on(inputEvent, async (msg) => {
17+
const ev = JSON.parse(msg);
18+
19+
console.log("event", JSON.stringify(ev));
20+
this.emit(outputEvent, JSON.stringify({ test: ev.test + 1 }));
21+
22+
await new Promise(res2 => setTimeout(res2, 100));
23+
24+
res();
25+
});
26+
});
27+
};
28+
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "@scramjet/event-sequence",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"scripts": {
7+
"predeploy": "mkdir -p dist/ && cp index.js package.json dist/ && (cd dist && npm i --omit=dev)"
8+
},
9+
"engines": {
10+
"node": ">=16"
11+
},
12+
"repository": {
13+
"type": "git",
14+
"url": "git+https://github.com/scramjetorg/create-sequence.git"
15+
},
16+
"bugs": {
17+
"url": "https://github.com/scramjetorg/create-sequence/issues"
18+
},
19+
"homepage": "https://github.com/scramjetorg/create-sequence#readme",
20+
"devDependencies": {
21+
"@scramjet/types": "^0.34.4"
22+
},
23+
"author": "",
24+
"license": "ISC"
25+
}

bdd/features/hub/HUB-002-host-iac.feature

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,21 @@ Feature: HUB-002 Host started in Infrastructure as Code mode
2828
And wait for "500" ms
2929
And host is running
3030
* exit hub process
31+
32+
@ci-hub @starts-host
33+
Scenario: HUB-002 TC-004 Event forwarding works between sequences
34+
When hub process is started with random ports and parameters "--sequences-root=data/sequences/ --instance-lifetime-extension-delay=10 --identify-existing --runtime-adapter=process"
35+
And host is running
36+
And I get list of instances
37+
And start Instance by name "event-sequence" with JSON arguments '["event-one", "event-two"]'
38+
* remember last instance as "first"
39+
And start Instance by name "event-sequence" with JSON arguments '["event-two", "event-three"]'
40+
* remember last instance as "second"
41+
* switch to instance "first"
42+
And send event "event-one" to instance with message '{"test": 1}'
43+
# * wait for "100" ms
44+
Then "stdout" starts with 'event {"test":1}'
45+
* switch to instance "second"
46+
Then "stdout" starts with 'event {"test":2}'
47+
And host is running
48+
* exit hub process

bdd/lib/utils.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,23 +218,25 @@ export async function waitUntilStreamContains(stream: Readable, expected: string
218218
]);
219219
}
220220

221-
export async function waitUntilStreamEquals(stream: Readable, expected: string): Promise<string> {
221+
export async function waitUntilStreamEquals(stream: Readable, expected: string, timeout = 10000): Promise<string> {
222222
let response = "";
223223

224224
await Promise.race([
225225
(async () => {
226-
for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) {
227-
response += chunk.toString();
226+
for await (const chunk of stream.pipe(new PassThrough({ encoding: "utf-8" }))) {
227+
response += chunk;
228+
229+
// eslint-disable-next-line no-console
230+
console.log(response, chunk);
228231

229232
if (response === expected) return expected;
230233
if (response.length >= expected.length) {
231-
assert.equal(response, expected);
234+
return assert.equal(response, expected);
232235
}
233236
}
234-
assert.equal(response, expected, "End of stream reached");
235-
236-
return "passed";
237-
})()
237+
throw new Error("End of stream reached");
238+
})(),
239+
defer(timeout).then(() => { assert.equal(response, expected, "timeout"); })
238240
]);
239241

240242
return response;
@@ -361,6 +363,27 @@ export function spawnSiInit(
361363
});
362364
}
363365

366+
export async function waitUntilStreamStartsWith(stream: Readable, expected: string, timeout = 10000): Promise<string> {
367+
let response = "";
368+
369+
await Promise.race([
370+
(async () => {
371+
for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) {
372+
response += chunk.toString();
373+
374+
if (response === expected) return expected;
375+
if (response.length >= expected.length) {
376+
return assert.equal(response.substring(0, expected.length), expected);
377+
}
378+
}
379+
throw new Error("End of stream reached");
380+
})(),
381+
defer(timeout).then(() => { assert.equal(response, expected, "timeout"); })
382+
]);
383+
384+
return response;
385+
}
386+
364387
export function isTemplateCreated(templateType: string, workingDirectory: string) {
365388
return new Promise<boolean>((resolve, reject) => {
366389
// eslint-disable-next-line complexity

bdd/step-definitions/e2e/host-steps.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
removeBoundaryQuotes,
88
defer,
99
waitUntilStreamEquals,
10+
waitUntilStreamStartsWith,
1011
waitUntilStreamContains,
1112
removeProfile,
1213
createProfile,
@@ -307,6 +308,37 @@ When(
307308

308309
When("instance started with arguments {string}", { timeout: 25000 }, startWith);
309310

311+
When("start Instance by name {string}", async function(this: CustomWorld, name: string) {
312+
this.resources.sequence = hostClient.getSequenceClient(name);
313+
this.resources.instance = await this.resources.sequence!.start({
314+
appConfig: {}
315+
});
316+
});
317+
318+
When("start Instance by name {string} with JSON arguments {string}", async function(this: CustomWorld, name: string, args: string) {
319+
const instanceArgs: any = JSON.parse(args);
320+
321+
if (!Array.isArray(instanceArgs)) throw new Error("Args must be an array");
322+
323+
this.resources.sequence = hostClient.getSequenceClient(name);
324+
this.resources.instance = await this.resources.sequence!.start({
325+
appConfig: {},
326+
args: instanceArgs
327+
});
328+
});
329+
330+
When("remember last instance as {string}", function(this: CustomWorld, seq: string) {
331+
if (!this.resources.instance) throw new Error("No instance client set");
332+
333+
this.resources.instanceList[seq] = this.resources.instance;
334+
});
335+
336+
When("switch to instance {string}", function(this: CustomWorld, seq: string) {
337+
if (!this.resources.instanceList[seq]) throw new Error(`No instance "${seq}"`);
338+
339+
this.resources.instance = this.resources.instanceList[seq];
340+
});
341+
310342
When("start Instance with output topic name {string}", async function(this: CustomWorld, topicOut: string) {
311343
this.resources.instance = await this.resources.sequence!.start({
312344
appConfig: {},
@@ -754,6 +786,13 @@ When("send {string} to stdin", async function(this: CustomWorld, str) {
754786
await this.resources.instance?.sendStream("stdin", Readable.from(str));
755787
});
756788

789+
Then("{string} starts with {string}", async function(this: CustomWorld, stream, text) {
790+
const result = await this.resources.instance?.getStream(stream);
791+
792+
await waitUntilStreamStartsWith(result!, text);
793+
if (!result) assert.fail(`No data in ${stream}!`);
794+
});
795+
757796
Then("{string} is {string}", async function(this: CustomWorld, stream, text) {
758797
const result = await this.resources.instance?.getStream(stream);
759798
const response = await waitUntilStreamEquals(result!, text);

bdd/step-definitions/world.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ export class CustomWorld implements World {
1616
resources: {
1717
[key: string]: any;
1818
hub?: ChildProcess;
19+
instanceList: {[key: string]: InstanceClient};
1920
instance?: InstanceClient;
2021
instance1?: InstanceClient;
2122
instance2?: InstanceClient;
2223
sequence?: SequenceClient;
2324
sequence1?: SequenceClient;
2425
sequence2?: SequenceClient;
2526
outStream?: Readable;
26-
} = {};
27+
} = {
28+
instanceList: {}
29+
};
2730

2831
cliResources: {
2932
stdio?: [stdout: string, stderr: string, statusCode: any];
@@ -48,7 +51,6 @@ export class CustomWorld implements World {
4851
if (setDefaultResultOrder) {
4952
setDefaultResultOrder("ipv4first");
5053
}
51-
5254
this.attach = attach;
5355
this.log = log;
5456
this.parameters = parameters;

packages/host/src/lib/csi-controller.ts

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const runnerExitDelay = 15000;
5656

5757
type Events = {
5858
pang: (payload: MessageDataType<RunnerMessageCode.PANG>) => void;
59+
event: (payload: EventMessageData) => void;
5960
hourChime: () => void;
6061
error: (error: any) => void;
6162
stop: (code: number) => void;
@@ -164,6 +165,8 @@ export class CSIController extends TypedEmitter<Events> {
164165
private downStreams?: DownstreamStreamsConfig;
165166
private upStreams: PassThroughStreamsConfig;
166167

168+
public localEmitter: EventEmitter & { lastEvents: { [evname: string]: any } };
169+
167170
communicationHandler: ICommunicationHandler;
168171

169172
constructor(
@@ -195,6 +198,10 @@ export class CSIController extends TypedEmitter<Events> {
195198
this.communicationHandler = communicationHandler;
196199

197200
this.logger = new ObjLogger(this, { id });
201+
this.localEmitter = Object.assign(
202+
new EventEmitter(),
203+
{ lastEvents: {} }
204+
);
198205

199206
this.logger.debug("Constructor executed");
200207
this.info.created = new Date();
@@ -652,18 +659,15 @@ export class CSIController extends TypedEmitter<Events> {
652659
// We are not able to obtain all necessary information for this endpoint yet, disabling it for now
653660
// router.get("/status", RunnerMessageCode.STATUS, this.communicationHandler);
654661

655-
const localEmitter = Object.assign(
656-
new EventEmitter(),
657-
{ lastEvents: {} } as { lastEvents: { [evname: string]: any } }
658-
);
659-
660662
this.communicationHandler.addMonitoringHandler(RunnerMessageCode.EVENT, (data) => {
661663
const event = data[1];
662664

663665
if (!event.eventName) return;
664666

665-
localEmitter.lastEvents[event.eventName] = event.message;
666-
localEmitter.emit(event.eventName, event);
667+
this.emit("event", event);
668+
669+
this.localEmitter.lastEvents[event.eventName] = event.message;
670+
this.localEmitter.emit(event.eventName, event);
667671
});
668672

669673
this.router.upstream("/events/:name", async (req: ParsedMessage, res: ServerResponse) => {
@@ -688,12 +692,12 @@ export class CSIController extends TypedEmitter<Events> {
688692
const clean = () => {
689693
this.logger.debug(`Event stream "${name}" disconnected`);
690694

691-
localEmitter.off(name, handler);
695+
this.localEmitter.off(name, handler);
692696
};
693697

694698
this.logger.debug("Event stream connected", name);
695699

696-
localEmitter.on(name, handler);
700+
this.localEmitter.on(name, handler);
697701

698702
res.on("error", clean);
699703
res.on("end", clean);
@@ -704,16 +708,15 @@ export class CSIController extends TypedEmitter<Events> {
704708
const awaitEvent = async (req: ParsedMessage): Promise<unknown> => new Promise((res) => {
705709
const name = req.params?.name;
706710

707-
if (!name) {
711+
if (!name)
708712
throw new HostError("EVENT_NAME_MISSING");
709-
}
710713

711-
localEmitter.once(name, (data) => res(data.message));
714+
this.localEmitter.once(name, (data) => res(data.message));
712715
});
713716

714717
this.router.get("/event/:name", async (req) => {
715-
if (req.params?.name && localEmitter.lastEvents[req.params.name]) {
716-
return localEmitter.lastEvents[req.params.name];
718+
if (req.params?.name && this.localEmitter.lastEvents[req.params.name]) {
719+
return this.localEmitter.lastEvents[req.params.name];
717720
}
718721

719722
return awaitEvent(req);
@@ -722,12 +725,30 @@ export class CSIController extends TypedEmitter<Events> {
722725

723726
// operations
724727
this.router.op("post", "/_monitoring_rate", RunnerMessageCode.MONITORING_RATE, this.communicationHandler);
725-
this.router.op("post", "/_event", RunnerMessageCode.EVENT, this.communicationHandler);
728+
this.router.op("post", "/_event", (req) => this.handleEvent(req), this.communicationHandler);
726729

727730
this.router.op("post", "/_stop", (req) => this.handleStop(req), this.communicationHandler);
728731
this.router.op("post", "/_kill", (req) => this.handleKill(req), this.communicationHandler);
729732
}
730733

734+
private async handleEvent(event: ParsedMessage): Promise<OpResponse<STHRestAPI.SendEventResponse>> {
735+
const [, { eventName, message }] = event.body;
736+
737+
if (typeof eventName !== "string")
738+
return { opStatus: ReasonPhrases.BAD_REQUEST, error: "Invalid format, eventName missing." };
739+
740+
await this.emitEvent({ eventName, source: "api", message });
741+
return { opStatus: ReasonPhrases.OK, accepted: ReasonPhrases.OK };
742+
}
743+
744+
public async emitEvent({ source, eventName, message }: EventMessageData) {
745+
await this.communicationHandler.sendControlMessage(RunnerMessageCode.EVENT, {
746+
eventName,
747+
source,
748+
message
749+
});
750+
}
751+
731752
private async handleStop(req: ParsedMessage): Promise<OpResponse<STHRestAPI.SendStopInstanceResponse>> {
732753
const { body: { timeout = 7000, canCallKeepalive = false } = { timeout: 7000, canCallKeepalive: false } } = req;
733754

packages/host/src/lib/host.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
APIExpose,
1010
ContentType,
1111
CPMConnectorOptions,
12+
EventMessageData,
1213
HostProxy,
1314
IComponent,
1415
IMonitoringServerConstructor,
@@ -1020,6 +1021,9 @@ export class Host implements IComponent {
10201021

10211022
this.instancesStore[id] = csic;
10221023

1024+
csic.on("event", async (event: EventMessageData) => {
1025+
await this.eventBus({ source: id, ...event });
1026+
});
10231027
csic.on("error", (err) => {
10241028
this.pushTelemetry("Instance error", { ...err }, "error");
10251029
this.logger.error("CSIController errored", err.message, err.exitcode);
@@ -1132,6 +1136,16 @@ export class Host implements IComponent {
11321136
return csic;
11331137
}
11341138

1139+
async eventBus(event: EventMessageData) {
1140+
this.logger.debug("Got event", event);
1141+
1142+
// Send the event to all instances except the source of the event.
1143+
await Promise.all(
1144+
Object.values(this.instancesStore)
1145+
.map(inst => event.source !== inst.id ? inst.emitEvent(event) : true)
1146+
);
1147+
}
1148+
11351149
/**
11361150
* Returns list of all Sequences.
11371151
*

packages/types/src/messages/event.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@ export type EventMessageData = {
55
/** Name of the event. */
66
eventName: string;
77

8+
source?: string;
9+
810
/** TODO update Informs if keepAlive can be called to prolong the running of the Sequence. */
9-
message: any
11+
message: any;
1012
}
1113

1214
/**

0 commit comments

Comments
 (0)