Skip to content

Commit 7384a7a

Browse files
authored
Merge branch 'devel' into fix/remove-cpu-features
2 parents 10e5f53 + 540d7c0 commit 7384a7a

File tree

29 files changed

+345
-79
lines changed

29 files changed

+345
-79
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/* eslint-disable no-console */
2+
3+
// eslint-disable-next-line valid-jsdoc
4+
/**
5+
* Simple test event sequence.
6+
*
7+
* @param {never} _input - unused
8+
* @param {string} inputEvent - input
9+
* @param {string} outputEvent - output
10+
* @returns {void}
11+
* @this {import("@scramjet/types").AppContext<{}, {}>} - context
12+
*/
13+
module.exports = async function(_input, inputEvent = "in", outputEvent = "out") {
14+
this.logger.info("started");
15+
return new Promise((res) => {
16+
this.on(inputEvent, async (msg) => {
17+
const ev = JSON.parse(msg);
18+
19+
console.log("event", JSON.stringify(ev));
20+
this.emit(outputEvent, JSON.stringify({ test: ev.test + 1 }));
21+
22+
await new Promise(res2 => setTimeout(res2, 100));
23+
24+
res();
25+
});
26+
});
27+
};
28+
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
{
2+
"name": "@scramjet/event-sequence",
3+
"version": "1.0.0",
4+
"description": "",
5+
"main": "index.js",
6+
"scripts": {
7+
"predeploy": "mkdir -p dist/ && cp index.js package.json dist/ && (cd dist && npm i --omit=dev)"
8+
},
9+
"engines": {
10+
"node": ">=16"
11+
},
12+
"repository": {
13+
"type": "git",
14+
"url": "git+https://github.com/scramjetorg/create-sequence.git"
15+
},
16+
"bugs": {
17+
"url": "https://github.com/scramjetorg/create-sequence/issues"
18+
},
19+
"homepage": "https://github.com/scramjetorg/create-sequence#readme",
20+
"devDependencies": {
21+
"@scramjet/types": "^0.34.4"
22+
},
23+
"author": "",
24+
"license": "ISC"
25+
}

bdd/features/hub/HUB-002-host-iac.feature

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,21 @@ Feature: HUB-002 Host started in Infrastructure as Code mode
2828
And wait for "500" ms
2929
And host is running
3030
* exit hub process
31+
32+
@ci-hub @starts-host
33+
Scenario: HUB-002 TC-004 Event forwarding works between sequences
34+
When hub process is started with random ports and parameters "--sequences-root=data/sequences/ --instance-lifetime-extension-delay=10 --identify-existing --runtime-adapter=process"
35+
And host is running
36+
And I get list of instances
37+
And start Instance by name "event-sequence" with JSON arguments '["event-one", "event-two"]'
38+
* remember last instance as "first"
39+
And start Instance by name "event-sequence" with JSON arguments '["event-two", "event-three"]'
40+
* remember last instance as "second"
41+
* switch to instance "first"
42+
And send event "event-one" to instance with message '{"test": 1}'
43+
# * wait for "100" ms
44+
Then "stdout" starts with 'event {"test":1}'
45+
* switch to instance "second"
46+
Then "stdout" starts with 'event {"test":2}'
47+
And host is running
48+
* exit hub process

bdd/lib/utils.ts

Lines changed: 31 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -218,23 +218,25 @@ export async function waitUntilStreamContains(stream: Readable, expected: string
218218
]);
219219
}
220220

221-
export async function waitUntilStreamEquals(stream: Readable, expected: string): Promise<string> {
221+
export async function waitUntilStreamEquals(stream: Readable, expected: string, timeout = 10000): Promise<string> {
222222
let response = "";
223223

224224
await Promise.race([
225225
(async () => {
226-
for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) {
227-
response += chunk.toString();
226+
for await (const chunk of stream.pipe(new PassThrough({ encoding: "utf-8" }))) {
227+
response += chunk;
228+
229+
// eslint-disable-next-line no-console
230+
console.log(response, chunk);
228231

229232
if (response === expected) return expected;
230233
if (response.length >= expected.length) {
231-
assert.equal(response, expected);
234+
return assert.equal(response, expected);
232235
}
233236
}
234-
assert.equal(response, expected, "End of stream reached");
235-
236-
return "passed";
237-
})()
237+
throw new Error("End of stream reached");
238+
})(),
239+
defer(timeout).then(() => { assert.equal(response, expected, "timeout"); })
238240
]);
239241

240242
return response;
@@ -361,6 +363,27 @@ export function spawnSiInit(
361363
});
362364
}
363365

366+
export async function waitUntilStreamStartsWith(stream: Readable, expected: string, timeout = 10000): Promise<string> {
367+
let response = "";
368+
369+
await Promise.race([
370+
(async () => {
371+
for await (const chunk of stream.pipe(new PassThrough({ encoding: undefined }))) {
372+
response += chunk.toString();
373+
374+
if (response === expected) return expected;
375+
if (response.length >= expected.length) {
376+
return assert.equal(response.substring(0, expected.length), expected);
377+
}
378+
}
379+
throw new Error("End of stream reached");
380+
})(),
381+
defer(timeout).then(() => { assert.equal(response, expected, "timeout"); })
382+
]);
383+
384+
return response;
385+
}
386+
364387
export function isTemplateCreated(templateType: string, workingDirectory: string) {
365388
return new Promise<boolean>((resolve, reject) => {
366389
// eslint-disable-next-line complexity

bdd/step-definitions/e2e/host-steps.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import {
77
removeBoundaryQuotes,
88
defer,
99
waitUntilStreamEquals,
10+
waitUntilStreamStartsWith,
1011
waitUntilStreamContains,
1112
removeProfile,
1213
createProfile,
@@ -307,6 +308,37 @@ When(
307308

308309
When("instance started with arguments {string}", { timeout: 25000 }, startWith);
309310

311+
When("start Instance by name {string}", async function(this: CustomWorld, name: string) {
312+
this.resources.sequence = hostClient.getSequenceClient(name);
313+
this.resources.instance = await this.resources.sequence!.start({
314+
appConfig: {}
315+
});
316+
});
317+
318+
When("start Instance by name {string} with JSON arguments {string}", async function(this: CustomWorld, name: string, args: string) {
319+
const instanceArgs: any = JSON.parse(args);
320+
321+
if (!Array.isArray(instanceArgs)) throw new Error("Args must be an array");
322+
323+
this.resources.sequence = hostClient.getSequenceClient(name);
324+
this.resources.instance = await this.resources.sequence!.start({
325+
appConfig: {},
326+
args: instanceArgs
327+
});
328+
});
329+
330+
When("remember last instance as {string}", function(this: CustomWorld, seq: string) {
331+
if (!this.resources.instance) throw new Error("No instance client set");
332+
333+
this.resources.instanceList[seq] = this.resources.instance;
334+
});
335+
336+
When("switch to instance {string}", function(this: CustomWorld, seq: string) {
337+
if (!this.resources.instanceList[seq]) throw new Error(`No instance "${seq}"`);
338+
339+
this.resources.instance = this.resources.instanceList[seq];
340+
});
341+
310342
When("start Instance with output topic name {string}", async function(this: CustomWorld, topicOut: string) {
311343
this.resources.instance = await this.resources.sequence!.start({
312344
appConfig: {},
@@ -754,6 +786,13 @@ When("send {string} to stdin", async function(this: CustomWorld, str) {
754786
await this.resources.instance?.sendStream("stdin", Readable.from(str));
755787
});
756788

789+
Then("{string} starts with {string}", async function(this: CustomWorld, stream, text) {
790+
const result = await this.resources.instance?.getStream(stream);
791+
792+
await waitUntilStreamStartsWith(result!, text);
793+
if (!result) assert.fail(`No data in ${stream}!`);
794+
});
795+
757796
Then("{string} is {string}", async function(this: CustomWorld, stream, text) {
758797
const result = await this.resources.instance?.getStream(stream);
759798
const response = await waitUntilStreamEquals(result!, text);

bdd/step-definitions/world.ts

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,17 @@ export class CustomWorld implements World {
1616
resources: {
1717
[key: string]: any;
1818
hub?: ChildProcess;
19+
instanceList: {[key: string]: InstanceClient};
1920
instance?: InstanceClient;
2021
instance1?: InstanceClient;
2122
instance2?: InstanceClient;
2223
sequence?: SequenceClient;
2324
sequence1?: SequenceClient;
2425
sequence2?: SequenceClient;
2526
outStream?: Readable;
26-
} = {};
27+
} = {
28+
instanceList: {}
29+
};
2730

2831
cliResources: {
2932
stdio?: [stdout: string, stderr: string, statusCode: any];
@@ -48,7 +51,6 @@ export class CustomWorld implements World {
4851
if (setDefaultResultOrder) {
4952
setDefaultResultOrder("ipv4first");
5053
}
51-
5254
this.attach = attach;
5355
this.log = log;
5456
this.parameters = parameters;

packages/api-client/src/host-client.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { ClientProvider, ClientUtils, HttpClient } from "@scramjet/client-utils";
1+
import { ClientProvider, ClientUtils, Headers, HttpClient } from "@scramjet/client-utils";
22
import { STHRestAPI } from "@scramjet/types";
33
import { InstanceClient } from "./instance-client";
44
import { SequenceClient } from "./sequence-client";
@@ -193,10 +193,13 @@ export class HostClient implements ClientProvider {
193193
async sendTopic<T>(
194194
topic: string,
195195
stream: Parameters<HttpClient["sendStream"]>[1],
196-
requestInit?: RequestInit,
196+
requestInit: RequestInit = {},
197197
contentType: string = "application/x-ndjson",
198198
end?: boolean
199199
) {
200+
requestInit.headers ||= {} as Headers;
201+
(requestInit.headers as Headers).expect = "100-continue";
202+
200203
return this.client.sendStream<T>(`topic/${topic}`, stream, requestInit, { type: contentType, end: end });
201204
}
202205

packages/api-server/src/handlers/op.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,10 @@ export function createOperationHandler(router: SequentialCeroRouter): APIRoute["
182182
const handler: Middleware = async (req, res, next) => {
183183
logger.trace("Request", req.method, req.url);
184184

185+
if (req.headers.expect === "100-continue") {
186+
res.writeContinue();
187+
}
188+
185189
try {
186190
if (typeof message === "function") {
187191
return await opDataHandler(req, res, message, { rawBody });

packages/api-server/src/handlers/stream.ts

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { StreamConfig, StreamInput, StreamOutput } from "@scramjet/types";
1+
import { ParsedMessage, StreamConfig, StreamInput, StreamOutput } from "@scramjet/types";
22
import { IncomingHttpHeaders, IncomingMessage, ServerResponse } from "http";
33
import { Writable, Readable, Duplex } from "stream";
44
import { DuplexStream } from "../lib/duplex-stream";
@@ -106,16 +106,27 @@ export function createStreamHandlers(router: SequentialCeroRouter) {
106106
const downstream = (
107107
path: string | RegExp,
108108
stream: StreamOutput,
109-
{ json = false, text = false, end: _end = false, encoding = "utf-8", checkContentType = true, checkEndHeader = true, method = "post" }: StreamConfig = {}
109+
{ json = false, text = false, end: _end = false, encoding = "utf-8", checkContentType = true, checkEndHeader = true, method = "post", postponeContinue = false }: StreamConfig = {}
110110
): void => {
111-
router[method](path, async (req, res, next) => {
111+
// eslint-disable-next-line complexity
112+
router[method](path, async (req: ParsedMessage, res, next) => {
112113
try {
113114
if (checkContentType) {
114115
checkAccepts(req.headers["content-type"], text, json);
115116
}
116117

117118
if (req.headers.expect === "100-continue") {
118-
res.writeContinue();
119+
if (!postponeContinue) {
120+
res.writeContinue();
121+
}
122+
}
123+
124+
if (postponeContinue && req.headers.expect !== "100-continue") {
125+
res.writeHead(400, "Bad Request", { "Content-type": "application/json" });
126+
res.write(JSON.stringify({ error: "Missing 'expect' header" }));
127+
res.end();
128+
129+
return;
119130
}
120131

121132
// Explicit pause causes next `on('data')` not to resume stream automatically.
@@ -177,11 +188,14 @@ export function createStreamHandlers(router: SequentialCeroRouter) {
177188
};
178189
const duplex = (
179190
path: string | RegExp,
180-
callback: (stream: Duplex, headers: IncomingHttpHeaders) => void
191+
callback: (stream: Duplex, headers: IncomingHttpHeaders) => void,
192+
{ postponeContinue = false }: StreamConfig = {}
181193
): void => {
182194
router.post(path, (req, res, next) => {
183195
if (req.headers.expect === "100-continue") {
184-
res.writeContinue();
196+
if (!postponeContinue) {
197+
res.writeContinue();
198+
}
185199
}
186200

187201
try {

packages/api-server/src/index.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,20 @@ export function createServer(conf: ServerConfig = {}): APIExpose {
100100
};
101101
const { server: srv, router } = cero({ server: createCeroServerConfig(conf), router: sequentialRouter(config) });
102102

103+
// Disable auto sending "100 Continue".
104+
srv.on("checkContinue", (request, response) => {
105+
request.writeContinue = () => {
106+
response.writeContinue();
107+
};
108+
109+
srv.emit("request", request, response);
110+
});
111+
103112
router.use("/", async (req, res, next) => {
113+
req.writeContinue ||= () => {};
114+
104115
next();
116+
105117
// TODO: fix - this should log on errors.
106118
log.write({ date: Date.now(), method: req.method, url: req.url, status: await new Promise(s => res.on("finish", () => s(res.statusCode))) } as any);
107119
});

0 commit comments

Comments
 (0)