Skip to content

Commit 9a002eb

Browse files
committed
Handle errors and clean up after disconnection in client API
1 parent e47c3d7 commit 9a002eb

File tree

4 files changed

+65
-3
lines changed

4 files changed

+65
-3
lines changed

src/api/api-model.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ export class ApiModel {
199199
sendRequest(
200200
requestDefinition: Client.RequestDefinition,
201201
requestOptions: Client.RequestOptions
202-
): stream.Readable {
202+
) {
203203
return Client.sendRequest(requestDefinition, requestOptions);
204204
}
205205

src/api/rest-api.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,18 +88,25 @@ export function exposeRestAPI(
8888
if (!request) throw new StatusError(400, "No request definition provided");
8989
if (!options) throw new StatusError(400, "No request options provided");
9090

91+
const abortController = new AbortController();
92+
9193
const resultStream = apiModel.sendRequest({
9294
...request,
9395
// Body buffers are serialized as base64 (for both requests & responses)
9496
rawBody: Buffer.from(request.rawBody ?? '', 'base64')
9597
}, {
96-
...options
98+
...options,
99+
abortSignal: abortController.signal
97100
});
98101

102+
// If the client closes the connection, abort the outgoing request:
103+
res.on('close', () => abortController.abort());
104+
99105
res.writeHead(200, {
100106
'content-type': 'application/x-ndjson'
101107
});
102108

109+
// Write all upstream events as JSON data to the client here:
103110
resultStream.on('data', (evt: Client.ResponseStreamEvents) => {
104111
if (evt.type === 'response-body-part') {
105112
res.write(JSON.stringify({

src/client/client.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import * as stream from 'stream';
22
import * as net from 'net';
33
import * as http from 'http';
44
import * as https from 'https';
5-
import { streamToBuffer } from '../util/stream';
65

76
export type RawHeaders = Array<[key: string, value: string]>;
87

@@ -23,6 +22,11 @@ export interface RequestDefinition {
2322
}
2423

2524
export interface RequestOptions {
25+
/**
26+
* An abort signal, which can be used to cancel the in-process request if
27+
* required.
28+
*/
29+
abortSignal?: AbortSignal;
2630
}
2731

2832
export type ResponseStreamEvents =
@@ -50,6 +54,12 @@ export function sendRequest(
5054

5155
const request = (url.protocol === 'https' ? https : http).request(requestDefn.url, {
5256
method: requestDefn.method,
57+
signal: options.abortSignal
58+
});
59+
60+
options.abortSignal?.addEventListener('abort', () => {
61+
// In older Node versions, this seems to be required to _actually_ abort the request:
62+
request.abort();
5363
});
5464

5565
// Node supports sending raw headers via [key, value, key, value] array, but if we do
@@ -93,6 +103,7 @@ export function sendRequest(
93103
}));
94104

95105
response.on('end', () => resultsStream.push(null));
106+
response.on('error', (error) => resultsStream.destroy(error));
96107
}).catch((error) => {
97108
resultsStream.destroy(error);
98109
request.destroy();

test/client/send-request.spec.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as mockttp from 'mockttp';
33

44
import { ResponseStreamEvents, sendRequest } from '../../src/client/client';
55
import { streamToArray } from '../../src/util/stream';
6+
import { delay } from '../../src/util/promise';
67

78
describe("The HTTP client API", () => {
89

@@ -55,5 +56,48 @@ describe("The HTTP client API", () => {
5556

5657
expect(responseEvents[1].type).equal('response-body-part');
5758
expect(responseEvents[1].rawBody.toString()).to.equal('Mock response body');
59+
});
60+
61+
it("should stop requests if cancelled", async () => {
62+
await mockServer.forAnyRequest().thenTimeout()
63+
64+
const requests: any[] = [];
65+
mockServer.on('request', (req) => requests.push(req));
66+
const aborts: any[] = [];
67+
mockServer.on('abort', (req) => aborts.push(req));
68+
69+
const abortController = new AbortController();
70+
71+
const responseStream = sendRequest({
72+
url: mockServer.url,
73+
method: 'GET',
74+
headers: [['host', `localhost:${mockServer.port}`]]
75+
}, {
76+
abortSignal: abortController.signal
77+
});
78+
79+
const responseEvents: any[] = [];
80+
responseStream.on('data', (d) => responseEvents.push(d));
81+
responseStream.on('error', (e) => responseEvents.push(e));
82+
await delay(10);
83+
84+
expect(requests.length).to.equal(1);
85+
expect(aborts.length).to.equal(0);
86+
expect(responseEvents.length).to.equal(0);
87+
88+
abortController.abort();
89+
await delay(10);
90+
91+
expect(requests.length).to.equal(1);
92+
expect(aborts.length).to.equal(1); // <-- Server sees the request cancelled
93+
94+
// Only emitted event is a thrown error:
95+
expect(responseEvents.length).to.equal(1);
96+
expect(responseEvents[0]).to.be.instanceOf(Error);
97+
expect(responseEvents[0].code).to.be.oneOf([
98+
// Depends on the Node version you're testing with:
99+
'ECONNRESET',
100+
'ABORT_ERR'
101+
]);
58102
})
59103
});

0 commit comments

Comments
 (0)