Skip to content

Commit 6bf90b9

Browse files
author
Guy Bedford
authored
fix: passing direct response body to requests in streaming (#954)
1 parent 72bd10c commit 6bf90b9

File tree

5 files changed

+142
-39
lines changed

5 files changed

+142
-39
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ jobs:
5656
path: "/home/runner/.cargo/bin/${{ matrix.crate }}"
5757
key: crate-cache-${{ matrix.crate }}-${{ matrix.version }}
5858
- name: Install ${{ matrix.crate }} ${{ matrix.version }}
59-
if: steps.cache-crate.output.cache-hit != 'true'
59+
if: steps.cache-crate.outputs.cache-hit != 'true'
6060
run: cargo install ${{ matrix.crate }} ${{ matrix.options }} --version ${{ matrix.version }} --force
6161

6262
shellcheck:

integration-tests/js-compute/compare-downstream-response.js

Lines changed: 94 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,52 @@ import compareHeaders from './compare-headers.js';
1212
}} configResponse
1313
* @param {import('undici').Dispatcher.ResponseData} actualResponse
1414
*/
15+
16+
function maybeUint8Array(body) {
17+
if (Array.isArray(body) && typeof body[0] === 'number')
18+
return new Uint8Array(body);
19+
return body;
20+
}
21+
22+
/**
23+
* @param {Uint8Array[]} buffers
24+
* @returns
25+
*/
26+
function concat(buffers) {
27+
const out = new Uint8Array(buffers.reduce((len, buf) => len + buf.length, 0));
28+
let curPos = 0;
29+
for (const buf of buffers) {
30+
if (!(buf instanceof Uint8Array)) throw new Error('not a uint8 array');
31+
out.set(buf, curPos);
32+
curPos += buf.length;
33+
}
34+
return out;
35+
}
36+
37+
function bufferToString(actualBodyChunks) {
38+
try {
39+
return new TextDecoder().decode(concat(actualBodyChunks));
40+
} catch {
41+
return concat(actualBodyChunks);
42+
}
43+
}
44+
45+
function bufferEq(a, b) {
46+
for (let i = 0; i < a.byteLength; i++) {
47+
if (a[i] !== b[i]) return false;
48+
}
49+
return true;
50+
}
51+
1552
export async function compareDownstreamResponse(
1653
configResponse,
1754
actualResponse,
1855
actualBodyChunks,
1956
) {
20-
let errors = [];
2157
// Status
2258
if (configResponse.status != actualResponse.statusCode) {
23-
errors.push(
24-
new Error(
25-
`[DownstreamResponse: Status mismatch] Expected: ${configResponse.status} - Got: ${actualResponse.statusCode}\n${actualBodyChunks.length ? `"${Buffer.concat(actualBodyChunks)}"` : ''}`,
26-
),
59+
throw new Error(
60+
`[DownstreamResponse: Status mismatch] Expected: ${configResponse.status} - Got: ${actualResponse.statusCode}\n${actualBodyChunks.length ? `\n"${bufferToString(actualBodyChunks)}"` : ''}`,
2761
);
2862
}
2963

@@ -33,8 +67,47 @@ export async function compareDownstreamResponse(
3367
}
3468

3569
// Body
36-
if (configResponse.body) {
70+
if (
71+
configResponse.body ||
72+
configResponse.body_prefix ||
73+
configResponse.body_suffix
74+
) {
75+
if (configResponse.body_prefix) {
76+
const body_prefix = maybeUint8Array(configResponse.body_prefix);
77+
const actual_prefix = concat(actualBodyChunks).slice(
78+
0,
79+
body_prefix.byteLength,
80+
);
81+
if (body_prefix.byteLength !== actual_prefix.byteLength) {
82+
throw new Error(
83+
`[DownstreamResponse: Body Prefix length mismatch] Expected: ${body_prefix.byteLength} - Got ${actual_prefix.byteLength}: \n"${bufferToString(actualBodyChunks)}"`,
84+
);
85+
}
86+
if (!bufferEq(actual_prefix, body_prefix)) {
87+
throw new Error(
88+
`[DownstreamResponse: Body Prefix mismatch] Expected: ${body_prefix} - Got ${actual_prefix}:\n"${bufferToString(actualBodyChunks)}"`,
89+
);
90+
}
91+
}
92+
if (configResponse.body_suffix) {
93+
const body_suffix = maybeUint8Array(configResponse.body_suffix);
94+
const actual_suffix = concat(actualBodyChunks).slice(
95+
0,
96+
body_suffix.byteLength,
97+
);
98+
if (body_suffix.byteLength !== actual_suffix.byteLength) {
99+
throw new Error(
100+
`[DownstreamResponse: Body Suffix length mismatch] Expected: ${body_suffix.byteLength} - Got: ${actual_suffix.byteLength}: \n"${bufferToString(actualBodyChunks)}"`,
101+
);
102+
}
103+
if (!bufferEq(actual_suffix, body_suffix)) {
104+
throw new Error(
105+
`[DownstreamResponse: Body Suffix mismatch] Expected: ${body_suffix} - Got ${actual_suffix}:\n"${bufferToString(actualBodyChunks)}"`,
106+
);
107+
}
108+
}
37109
// Check if we need to stream the response and check the chunks, or the whole body
110+
configResponse.body = maybeUint8Array(configResponse.body);
38111
if (configResponse.body instanceof Array) {
39112
// Stream down the response
40113
let chunkNumber = 0;
@@ -50,38 +123,31 @@ export async function compareDownstreamResponse(
50123
chunkNumber++;
51124
}
52125
} else {
53-
errors.push(
54-
new Error(
55-
`[DownstreamResponse: Body Chunk mismatch] Expected: ${configResponse.body[chunkNumber]} - Got: ${chunkString}`,
56-
),
126+
throw new Error(
127+
`[DownstreamResponse: Body Chunk mismatch] Expected: ${configResponse.body[chunkNumber]} - Got: ${chunkString}`,
57128
);
58129
}
59130
}
60131

61132
if (chunkNumber !== configResponse.body.length) {
62-
errors.push(
63-
new Error(
64-
`[DownstreamResponse: Body Chunk mismatch] Expected: ${configResponse.body} - Got: (Incomplete stream, Number of chunks returned: ${chunkNumber})`,
65-
),
133+
throw new Error(
134+
`[DownstreamResponse: Body Chunk mismatch] Expected: ${configResponse.body} - Got: (Incomplete stream, Number of chunks returned: ${chunkNumber})`,
66135
);
67136
}
68-
} else {
137+
} else if (configResponse.body !== undefined) {
69138
// Get the text, and check if it matches the test
70-
const downstreamBodyText = Buffer.concat(
71-
actualBodyChunks.map((chunk) => Buffer.from(chunk)),
72-
).toString('utf8');
73-
74-
if (downstreamBodyText !== configResponse.body) {
75-
errors.push(
76-
new Error(
77-
`[DownstreamResponse: Body mismatch] Expected: ${configResponse.body} - Got: ${downstreamBodyText}`,
78-
),
139+
const downstreamBodyText = new TextDecoder().decode(
140+
concat(actualBodyChunks),
141+
);
142+
const eq =
143+
typeof configResponse.body === 'string'
144+
? downstreamBodyText === configResponse.body
145+
: bufferEq(configResponse.body, concat(actualBodyChunks));
146+
if (!eq) {
147+
throw new Error(
148+
`[DownstreamResponse: Body mismatch] Expected: ${configResponse.body} - Got: ${downstreamBodyText}`,
79149
);
80150
}
81151
}
82152
}
83-
84-
if (errors.length) {
85-
throw new Error(errors.map((error) => error.message).join('\n'));
86-
}
87153
}

integration-tests/js-compute/fixtures/app/src/response.js

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
/* eslint-env serviceworker */
22

3-
import { routes } from "./routes.js";
4-
import { pass, assert, assertThrows } from "./assertions.js";
3+
import { routes } from './routes.js';
4+
import { pass, assert } from './assertions.js';
5+
import { allowDynamicBackends } from 'fastly:experimental';
56

6-
routes.set("/response/stall", async (event) => {
7+
routes.set('/response/stall', async (event) => {
78
// keep connection open 10 seconds
8-
event.waitUntil(new Promise(resolve => setTimeout(resolve, 10_000)));
9+
event.waitUntil(new Promise((resolve) => setTimeout(resolve, 10_000)));
910
return new Response(
1011
new ReadableStream({
1112
start(controller) {
@@ -15,7 +16,7 @@ routes.set("/response/stall", async (event) => {
1516
);
1617
});
1718

18-
routes.set("/response/text/guest-backed-stream", async () => {
19+
routes.set('/response/text/guest-backed-stream', async () => {
1920
let contents = new Array(10).fill(new Uint8Array(500).fill(65));
2021
contents.push(new Uint8Array([0, 66]));
2122
contents.push(new Uint8Array([1, 1, 2, 65]));
@@ -24,15 +25,16 @@ routes.set("/response/text/guest-backed-stream", async () => {
2425

2526
let error = assert(
2627
text,
27-
"A".repeat(5000) + "\x00B\x01\x01\x02A",
28+
'A'.repeat(5000) + '\x00B\x01\x01\x02A',
2829
`await res.text() === "a".repeat(5000)`,
2930
);
3031
if (error) {
3132
return error;
3233
}
3334
return pass();
3435
});
35-
routes.set("/response/json/guest-backed-stream", async () => {
36+
37+
routes.set('/response/json/guest-backed-stream', async () => {
3638
let obj = { a: 1, b: 2, c: { d: 3 } };
3739
let encoder = new TextEncoder();
3840
let contents = encoder.encode(JSON.stringify(obj));
@@ -45,7 +47,8 @@ routes.set("/response/json/guest-backed-stream", async () => {
4547
}
4648
return pass();
4749
});
48-
routes.set("/response/arrayBuffer/guest-backed-stream", async () => {
50+
51+
routes.set('/response/arrayBuffer/guest-backed-stream', async () => {
4952
let obj = { a: 1, b: 2, c: { d: 3 } };
5053
let encoder = new TextEncoder();
5154
let contents = encoder.encode(JSON.stringify(obj));
@@ -62,7 +65,8 @@ routes.set("/response/arrayBuffer/guest-backed-stream", async () => {
6265
}
6366
return pass();
6467
});
65-
routes.set("/response/ip-port-undefined", async () => {
68+
69+
routes.set('/response/ip-port-undefined', async () => {
6670
let res = new Response();
6771
let error = assert(res.ip, undefined);
6872
if (error) {
@@ -75,6 +79,25 @@ routes.set("/response/ip-port-undefined", async () => {
7579
return pass();
7680
});
7781

82+
routes.set('/response/request-body-init', async () => {
83+
allowDynamicBackends(true);
84+
// fetch an image
85+
const downloadResp = await fetch('https://httpbin.org/image', {
86+
headers: {
87+
accept: 'image/webp'
88+
}
89+
});
90+
// stream it through an echo proxy
91+
const postResp = await fetch(
92+
new Request('https://httpbin.org/anything', {
93+
method: 'POST',
94+
body: downloadResp.body,
95+
})
96+
);
97+
// finally stream back to user
98+
return postResp;
99+
});
100+
78101
function iteratableToStream(iterable) {
79102
return new ReadableStream({
80103
async pull(controller) {

integration-tests/js-compute/fixtures/app/tests.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1732,6 +1732,20 @@
17321732
"GET /response/arrayBuffer/guest-backed-stream": {},
17331733
"GET /response/json": {},
17341734
"GET /response/redirect": {},
1735+
"GET /response/request-body-init": {
1736+
"downstream_response": {
1737+
"status": 200,
1738+
"body_prefix": [
1739+
123, 10, 32, 32, 34, 97, 114, 103, 115, 34, 58, 32, 123, 125, 44, 32,
1740+
10, 32, 32, 34, 100, 97, 116, 97, 34, 58, 32, 34, 100, 97, 116, 97, 58,
1741+
97, 112, 112, 108, 105, 99, 97, 116, 105, 111, 110, 47, 111, 99, 116,
1742+
101, 116, 45, 115, 116, 114, 101, 97, 109, 59, 98, 97, 115, 101, 54, 52,
1743+
44, 85, 107, 108, 71, 82, 107, 65, 112, 65, 65, 66, 88, 82, 85, 74, 81,
1744+
86, 108, 65, 52, 87, 65, 111, 65, 65, 65, 65, 69, 65, 65, 65, 65, 69, 81
1745+
],
1746+
"body_suffix": [123, 10, 32, 32, 34, 97, 114, 103, 115, 34, 58, 32]
1747+
}
1748+
},
17351749
"GET /response/ip-port-undefined": {},
17361750
"GET /setInterval/exposed-as-global": {},
17371751
"GET /setInterval/interface": {},

0 commit comments

Comments
 (0)