Skip to content

Commit 3e8d114

Browse files
committed
defer stream cancelation
1 parent ce185de commit 3e8d114

File tree

8 files changed

+325
-58
lines changed

8 files changed

+325
-58
lines changed

examples/defer-stream/__integration-tests__/defer-stream.spec.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ import { yoga } from '../src/yoga';
22

33
describe('Defer / Stream', () => {
44
it('stream', async () => {
5-
const start = Date.now();
65
const response = await yoga.fetch('/graphql', {
76
method: 'POST',
87
headers: {
@@ -21,14 +20,9 @@ describe('Defer / Stream', () => {
2120
const contentType = response.headers.get('Content-Type');
2221
expect(contentType).toEqual('multipart/mixed; boundary="-"');
2322
const responseText = await response.text();
24-
const end = Date.now();
2523
expect(responseText).toMatchSnapshot('stream');
26-
const diff = end - start;
27-
expect(diff).toBeLessThan(2650);
28-
expect(diff > 2550).toBeTruthy();
2924
});
3025
it('defer', async () => {
31-
const start = Date.now();
3226
const response = await yoga.fetch('/graphql', {
3327
method: 'POST',
3428
headers: {
@@ -50,10 +44,6 @@ describe('Defer / Stream', () => {
5044
const contentType = response.headers.get('Content-Type');
5145
expect(contentType).toEqual('multipart/mixed; boundary="-"');
5246
const responseText = await response.text();
53-
const end = Date.now();
5447
expect(responseText).toMatchSnapshot('defer');
55-
const diff = end - start;
56-
expect(diff).toBeLessThan(1600);
57-
expect(diff > 1450).toBeTruthy();
5848
});
5949
});

packages/graphql-yoga/__tests__/request-cancellation.spec.ts

Lines changed: 225 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,47 @@
1-
import { createSchema, createYoga } from '../src/index';
1+
import { useDeferStream } from '@graphql-yoga/plugin-defer-stream';
2+
import { createLogger, createSchema, createYoga, FetchAPI } from '../src/index';
23

3-
describe('request cancellation', () => {
4-
it('request cancellation stops invocation of subsequent resolvers', async () => {
4+
const variants: Array<[name: string, fetchAPI: undefined | FetchAPI]> = [
5+
['Ponyfilled WhatWG Fetch', undefined],
6+
];
7+
8+
const [major] = globalThis?.process?.versions?.node.split('.') ?? [];
9+
10+
if (major === '21' && process.env.LEAKS_TEST !== 'true') {
11+
variants.push([
12+
'Node.js 21',
13+
{
14+
fetch: globalThis.fetch,
15+
Blob: globalThis.Blob,
16+
btoa: globalThis.btoa,
17+
FormData: globalThis.FormData,
18+
Headers: globalThis.Headers,
19+
Request: globalThis.Request,
20+
crypto: globalThis.crypto,
21+
File: globalThis.File,
22+
ReadableStream: globalThis.ReadableStream,
23+
// @ts-expect-error json function signature
24+
Response: globalThis.Response,
25+
TextDecoder: globalThis.TextDecoder,
26+
TextEncoder: globalThis.TextEncoder,
27+
URL: globalThis.URL,
28+
TransformStream: globalThis.TransformStream,
29+
// URLPattern: globalThis.URLPattern,
30+
URLSearchParams: globalThis.URLSearchParams,
31+
WritableStream: globalThis.WritableStream,
32+
},
33+
]);
34+
}
35+
36+
function waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground() {
37+
return new Promise(res => setTimeout(res, 5));
38+
}
39+
40+
describe.each(variants)('request cancellation (%s)', (_, fetchAPI) => {
41+
it('request cancellation stops invocation of subsequent resolvers (GraphQL over HTTP)', async () => {
542
const rootResolverGotInvokedD = createDeferred();
643
const requestGotCancelledD = createDeferred();
744
let aResolverGotInvoked = false;
8-
let rootResolverGotInvoked = false;
945
const schema = createSchema({
1046
typeDefs: /* GraphQL */ `
1147
type Query {
@@ -18,7 +54,6 @@ describe('request cancellation', () => {
1854
resolvers: {
1955
Query: {
2056
async root() {
21-
rootResolverGotInvoked = true;
2257
rootResolverGotInvokedD.resolve();
2358
await requestGotCancelledD.promise;
2459
return { a: 'a' };
@@ -32,7 +67,10 @@ describe('request cancellation', () => {
3267
},
3368
},
3469
});
35-
const yoga = createYoga({ schema });
70+
const logger = createLogger('silent');
71+
const debugLogs = jest.fn();
72+
logger.debug = debugLogs;
73+
const yoga = createYoga({ schema, fetchAPI, logging: logger });
3674
const abortController = new AbortController();
3775
const promise = Promise.resolve(
3876
yoga.fetch('http://yoga/graphql', {
@@ -48,9 +86,188 @@ describe('request cancellation', () => {
4886
abortController.abort();
4987
requestGotCancelledD.resolve();
5088
await expect(promise).rejects.toThrow('This operation was aborted');
51-
expect(rootResolverGotInvoked).toBe(true);
89+
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
5290
expect(aResolverGotInvoked).toBe(false);
53-
await requestGotCancelledD.promise;
91+
expect(debugLogs.mock.calls).toEqual([
92+
['Parsing request to extract GraphQL parameters'],
93+
['Processing GraphQL Parameters'],
94+
['Request aborted'],
95+
]);
96+
});
97+
98+
it('request cancellation stops invocation of subsequent resolvers (GraphQL over SSE with Subscription)', async () => {
99+
const rootResolverGotInvokedD = createDeferred();
100+
const requestGotCancelledD = createDeferred();
101+
let aResolverGotInvoked = false;
102+
const schema = createSchema({
103+
typeDefs: /* GraphQL */ `
104+
type Query {
105+
root: A!
106+
}
107+
type Subscription {
108+
root: A!
109+
}
110+
type A {
111+
a: String!
112+
}
113+
`,
114+
resolvers: {
115+
Subscription: {
116+
root: {
117+
async *subscribe() {
118+
yield 1;
119+
},
120+
async resolve() {
121+
rootResolverGotInvokedD.resolve();
122+
await requestGotCancelledD.promise;
123+
return { a: 'a' };
124+
},
125+
},
126+
},
127+
A: {
128+
a() {
129+
aResolverGotInvoked = true;
130+
return 'a';
131+
},
132+
},
133+
},
134+
});
135+
const logger = createLogger('silent');
136+
const debugLogs = jest.fn();
137+
logger.debug = debugLogs;
138+
const yoga = createYoga({ schema, fetchAPI, logging: logger });
139+
const abortController = new AbortController();
140+
const response = await yoga.fetch('http://yoga/graphql', {
141+
method: 'POST',
142+
body: JSON.stringify({ query: 'subscription { root { a } }' }),
143+
headers: {
144+
'Content-Type': 'application/json',
145+
Accept: 'text/event-stream',
146+
},
147+
signal: abortController.signal,
148+
});
149+
expect(response.status).toBe(200);
150+
const iterator = response.body![Symbol.asyncIterator]();
151+
// first we will always get a ping/keep alive for flushed headers
152+
const next = await iterator.next();
153+
expect(Buffer.from(next.value).toString('utf-8')).toMatchInlineSnapshot(`
154+
":
155+
156+
"
157+
`);
158+
159+
await rootResolverGotInvokedD.promise;
160+
const next$ = iterator.next().then(({ done, value }) => {
161+
// in case it resolves, parse the buffer to string for easier debugging.
162+
return { done, value: Buffer.from(value).toString('utf-8') };
163+
});
164+
165+
abortController.abort();
166+
requestGotCancelledD.resolve();
167+
168+
await expect(next$).rejects.toThrow('This operation was aborted');
169+
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
170+
expect(aResolverGotInvoked).toBe(false);
171+
172+
expect(debugLogs.mock.calls).toEqual([
173+
['Parsing request to extract GraphQL parameters'],
174+
['Processing GraphQL Parameters'],
175+
['Processing GraphQL Parameters done.'],
176+
['Request aborted'],
177+
]);
178+
});
179+
180+
it('request cancellation stops invocation of subsequent resolvers (GraphQL over Multipart with defer/stream)', async () => {
181+
const aResolverGotInvokedD = createDeferred();
182+
const requestGotCancelledD = createDeferred();
183+
let bResolverGotInvoked = false;
184+
const schema = createSchema({
185+
typeDefs: /* GraphQL */ `
186+
type Query {
187+
root: A!
188+
}
189+
type A {
190+
a: B!
191+
}
192+
type B {
193+
b: String
194+
}
195+
`,
196+
resolvers: {
197+
Query: {
198+
async root() {
199+
return { a: 'a' };
200+
},
201+
},
202+
A: {
203+
async a() {
204+
aResolverGotInvokedD.resolve();
205+
await requestGotCancelledD.promise;
206+
return { b: 'b' };
207+
},
208+
},
209+
B: {
210+
b: obj => {
211+
bResolverGotInvoked = true;
212+
return obj.b;
213+
},
214+
},
215+
},
216+
});
217+
const logger = createLogger('silent');
218+
const debugLogs = jest.fn();
219+
logger.debug = debugLogs;
220+
const yoga = createYoga({ schema, plugins: [useDeferStream()], fetchAPI, logging: logger });
221+
222+
const abortController = new AbortController();
223+
const response = await yoga.fetch('http://yoga/graphql', {
224+
method: 'POST',
225+
body: JSON.stringify({
226+
query: /* GraphQL */ `
227+
query {
228+
root {
229+
... @defer {
230+
a {
231+
b
232+
}
233+
}
234+
}
235+
}
236+
`,
237+
}),
238+
headers: {
239+
'content-type': 'application/json',
240+
accept: 'multipart/mixed',
241+
},
242+
signal: abortController.signal,
243+
});
244+
expect(response.status).toEqual(200);
245+
const iterator = response.body![Symbol.asyncIterator]();
246+
let payload = '';
247+
248+
// Shitty wait condition, but it works lol
249+
while (payload.split('\r\n').length < 6 || !payload.endsWith('---')) {
250+
const next = await iterator.next();
251+
payload += Buffer.from(next.value).toString('utf-8');
252+
}
253+
254+
const next$ = iterator.next().then(({ done, value }) => {
255+
// in case it resolves, parse the buffer to string for easier debugging.
256+
return { done, value: Buffer.from(value).toString('utf-8') };
257+
});
258+
259+
await aResolverGotInvokedD.promise;
260+
abortController.abort();
261+
requestGotCancelledD.resolve();
262+
await expect(next$).rejects.toThrow('This operation was aborted');
263+
await waitAFewMillisecondsToMakeSureGraphQLExecutionIsNotResumingInBackground();
264+
expect(bResolverGotInvoked).toBe(false);
265+
expect(debugLogs.mock.calls).toEqual([
266+
['Parsing request to extract GraphQL parameters'],
267+
['Processing GraphQL Parameters'],
268+
['Processing GraphQL Parameters done.'],
269+
['Request aborted'],
270+
]);
54271
});
55272
});
56273

packages/graphql-yoga/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
},
5151
"dependencies": {
5252
"@envelop/core": "^5.0.0",
53-
"@graphql-tools/executor": "^1.2.3",
53+
"@graphql-tools/executor": "^1.2.5",
5454
"@graphql-tools/schema": "^10.0.0",
5555
"@graphql-tools/utils": "^10.1.0",
5656
"@graphql-yoga/logger": "^2.0.0",

packages/graphql-yoga/src/error.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ export function isOriginalGraphQLError(
3737
return false;
3838
}
3939

40+
export function isAbortError(error: unknown): error is DOMException {
41+
return (
42+
typeof error === 'object' &&
43+
error?.constructor?.name === 'DOMException' &&
44+
(error as Record<string, unknown>).name === 'AbortError'
45+
);
46+
}
47+
4048
export function handleError(
4149
error: unknown,
4250
maskedErrorsOpts: YogaMaskedErrorOpts | null,
@@ -50,10 +58,7 @@ export function handleError(
5058
errors.add(handledError);
5159
}
5260
}
53-
} else if (
54-
error?.constructor?.name === 'DOMException' &&
55-
(error as Record<string, unknown>).name === 'AbortError'
56-
) {
61+
} else if (isAbortError(error)) {
5762
logger.debug('Request aborted');
5863
} else if (maskedErrorsOpts) {
5964
const maskedError = maskedErrorsOpts.maskError(

packages/graphql-yoga/src/plugins/result-processor/multipart.ts

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -37,28 +37,33 @@ export function processMultipartResult(result: ResultProcessorInput, fetchAPI: F
3737
controller.enqueue(textEncoder.encode(`---`));
3838
},
3939
async pull(controller) {
40-
const { done, value } = await iterator.next();
41-
if (value != null) {
42-
controller.enqueue(textEncoder.encode('\r\n'));
40+
try {
41+
const { done, value } = await iterator.next();
42+
if (value != null) {
43+
controller.enqueue(textEncoder.encode('\r\n'));
4344

44-
controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8'));
45-
controller.enqueue(textEncoder.encode('\r\n'));
45+
controller.enqueue(textEncoder.encode('Content-Type: application/json; charset=utf-8'));
46+
controller.enqueue(textEncoder.encode('\r\n'));
4647

47-
const chunk = jsonStringifyResultWithoutInternals(value);
48-
const encodedChunk = textEncoder.encode(chunk);
48+
const chunk = jsonStringifyResultWithoutInternals(value);
49+
const encodedChunk = textEncoder.encode(chunk);
4950

50-
controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength));
51-
controller.enqueue(textEncoder.encode('\r\n'));
51+
controller.enqueue(textEncoder.encode('Content-Length: ' + encodedChunk.byteLength));
52+
controller.enqueue(textEncoder.encode('\r\n'));
5253

53-
controller.enqueue(textEncoder.encode('\r\n'));
54-
controller.enqueue(encodedChunk);
55-
controller.enqueue(textEncoder.encode('\r\n'));
54+
controller.enqueue(textEncoder.encode('\r\n'));
55+
controller.enqueue(encodedChunk);
56+
controller.enqueue(textEncoder.encode('\r\n'));
5657

57-
controller.enqueue(textEncoder.encode('---'));
58-
}
59-
if (done) {
60-
controller.enqueue(textEncoder.encode('--\r\n'));
61-
controller.close();
58+
controller.enqueue(textEncoder.encode('---'));
59+
}
60+
61+
if (done) {
62+
controller.enqueue(textEncoder.encode('--\r\n'));
63+
controller.close();
64+
}
65+
} catch (err) {
66+
controller.error(err);
6267
}
6368
},
6469
async cancel(e) {

0 commit comments

Comments
 (0)