Skip to content

Commit 16d7ff7

Browse files
committed
Make the client API stream response data instead of waiting
1 parent 03f1829 commit 16d7ff7

File tree

5 files changed

+101
-33
lines changed

5 files changed

+101
-33
lines changed

src/api/api-model.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import * as _ from 'lodash';
2+
import * as stream from 'stream';
23
import * as os from 'os';
34

45
import { generateSPKIFingerprint } from 'mockttp';
@@ -195,10 +196,10 @@ export class ApiModel {
195196
return { success: !interceptor.isActive(proxyPort) };
196197
}
197198

198-
async sendRequest(
199+
sendRequest(
199200
requestDefinition: Client.RequestDefinition,
200201
requestOptions: Client.RequestOptions
201-
): Promise<Client.ResponseDefinition> {
202+
): stream.Readable {
202203
return Client.sendRequest(requestDefinition, requestOptions);
203204
}
204205

src/api/rest-api.ts

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,47 @@ export function exposeRestAPI(
8888
if (!request) throw new StatusError(400, "No request definition provided");
8989
if (!options) throw new StatusError(400, "No request options provided");
9090

91-
const result = await apiModel.sendRequest({
91+
const resultStream = apiModel.sendRequest({
9292
...request,
9393
// Body buffers are serialized as base64 (for both requests & responses)
9494
rawBody: Buffer.from(request.rawBody ?? '', 'base64')
9595
}, {
9696
...options
9797
});
9898

99-
res.send({
100-
type: 'response', // Later we may send ND-JSON with chunked data & other types
101-
response: {
102-
...result,
103-
rawBody: result.rawBody?.toString('base64') ?? ''
99+
res.writeHead(200, {
100+
'content-type': 'application/x-ndjson'
101+
});
102+
103+
resultStream.on('data', (evt: Client.ResponseStreamEvents) => {
104+
if (evt.type === 'response-body-part') {
105+
res.write(JSON.stringify({
106+
...evt,
107+
data: evt.data.toString('base64')
108+
}));
109+
} else {
110+
res.write(JSON.stringify(evt));
104111
}
112+
res.write('\n');
113+
});
114+
115+
resultStream.on('end', () => {
116+
res.write(JSON.stringify({
117+
'type': 'response-end'
118+
}) + '\n');
119+
res.end();
120+
});
121+
122+
resultStream.on('error', (error: ErrorLike) => {
123+
res.write(JSON.stringify({
124+
type: 'error',
125+
error: {
126+
code: error.code,
127+
message: error.message,
128+
stack: error.stack
129+
}
130+
}) + '\n');
131+
res.end();
105132
});
106133
}));
107134
}

src/client/client.ts

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import * as stream from 'stream';
12
import * as net from 'net';
23
import * as http from 'http';
34
import * as https from 'https';
@@ -24,17 +25,27 @@ export interface RequestDefinition {
2425
export interface RequestOptions {
2526
}
2627

27-
export interface ResponseDefinition {
28+
export type ResponseStreamEvents =
29+
| ResponseHead
30+
| ResponseBodyPart;
31+
// Other notable events: errors (via 'error' event) and clean closure (via 'end').
32+
33+
export interface ResponseHead {
34+
type: 'response-head';
2835
statusCode: number;
2936
statusMessage?: string;
3037
headers: RawHeaders;
31-
rawBody?: Buffer;
3238
}
3339

34-
export async function sendRequest(
40+
export interface ResponseBodyPart {
41+
type: 'response-body-part';
42+
data: Buffer;
43+
}
44+
45+
export function sendRequest(
3546
requestDefn: RequestDefinition,
3647
options: RequestOptions
37-
): Promise<ResponseDefinition> {
48+
): stream.Readable {
3849
const url = new URL(requestDefn.url);
3950

4051
const request = (url.protocol === 'https' ? https : http).request(requestDefn.url, {
@@ -60,19 +71,34 @@ export async function sendRequest(
6071
request.end();
6172
}
6273

63-
const response = await new Promise<http.IncomingMessage>((resolve, reject) => {
74+
const resultsStream = new stream.Readable({
75+
objectMode: true,
76+
read() {} // Can't pull data - we manually fill this with .push() instead.
77+
});
78+
79+
new Promise<http.IncomingMessage>((resolve, reject) => {
6480
request.on('error', reject);
6581
request.on('response', resolve);
82+
}).then((response) => {
83+
resultsStream.push({
84+
type: 'response-head',
85+
statusCode: response.statusCode!,
86+
statusMessage: response.statusMessage,
87+
headers: pairFlatRawHeaders(response.rawHeaders)
88+
});
89+
90+
response.on('data', (data) => resultsStream.push({
91+
type: 'response-body-part',
92+
data
93+
}));
94+
95+
response.on('end', () => resultsStream.push(null));
96+
}).catch((error) => {
97+
resultsStream.destroy(error);
98+
request.destroy();
6699
});
67100

68-
const body = await streamToBuffer(response);
69-
70-
return {
71-
statusCode: response.statusCode!,
72-
statusMessage: response.statusMessage,
73-
headers: pairFlatRawHeaders(response.rawHeaders),
74-
rawBody: body
75-
}
101+
return resultsStream;
76102
}
77103

78104
/**

src/util/stream.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,15 @@
11
import * as stream from 'stream';
22

3-
export function streamToBuffer(input: stream.Readable) {
4-
return new Promise<Buffer>((resolve, reject) => {
5-
const chunks: Uint8Array[] = [];
3+
export function streamToArray<T extends unknown>(input: stream.Readable) {
4+
return new Promise<T[]>((resolve, reject) => {
5+
const chunks: T[] = [];
66
input.on('data', (d) => chunks.push(d));
7-
input.on('end', () => resolve(Buffer.concat(chunks)));
7+
input.on('end', () => resolve(chunks));
88
input.on('error', reject);
99
});
10+
};
11+
12+
export async function streamToBuffer(input: stream.Readable) {
13+
const chunks = await streamToArray<Uint8Array>(input);
14+
return Buffer.concat(chunks);
1015
};

test/client/send-request.spec.ts

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import { expect } from 'chai';
22
import * as mockttp from 'mockttp';
33

4-
import { sendRequest } from '../../src/client/client';
4+
import { ResponseStreamEvents, sendRequest } from '../../src/client/client';
5+
import { streamToArray } from '../../src/util/stream';
56

67
describe("The HTTP client API", () => {
78

@@ -29,7 +30,7 @@ describe("The HTTP client API", () => {
2930
};
3031
});
3132

32-
const response = await sendRequest({
33+
const responseStream = sendRequest({
3334
url: mockServer.urlFor('/path?qwe=asd'),
3435
method: 'POST',
3536
headers: [
@@ -40,11 +41,19 @@ describe("The HTTP client API", () => {
4041
rawBody: Buffer.from('Request body')
4142
}, {});
4243

43-
expect(response.statusCode).to.equal(200);
44-
expect(response.statusMessage).to.equal('Custom status message');
45-
expect(response.headers).to.deep.equal([
46-
['custom-HEADER', 'custom-VALUE']
47-
]);
48-
expect(response.rawBody!.toString()).to.equal('Mock response body');
44+
const responseParts = await streamToArray<any>(responseStream);
45+
46+
expect(responseParts.length).to.equal(2);
47+
expect(responseParts[0]).to.deep.equal({
48+
type: 'response-head',
49+
statusCode: 200,
50+
statusMessage: 'Custom status message',
51+
headers: [
52+
['custom-HEADER', 'custom-VALUE']
53+
]
54+
});
55+
56+
expect(responseParts[1].type).equal('response-body-part');
57+
expect(responseParts[1].data.toString()).to.equal('Mock response body');
4958
})
5059
});

0 commit comments

Comments
 (0)