diff --git a/src/execution/IncrementalPublisher.ts b/src/execution/IncrementalPublisher.ts index d060ad2463..c5f9c4c2ca 100644 --- a/src/execution/IncrementalPublisher.ts +++ b/src/execution/IncrementalPublisher.ts @@ -5,6 +5,7 @@ import { pathToArray } from '../jsutils/Path.js'; import type { GraphQLError } from '../error/GraphQLError.js'; import { IncrementalGraph } from './IncrementalGraph.js'; +import type { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -43,6 +44,7 @@ export function buildIncrementalResponse( } interface IncrementalPublisherContext { + promiseCanceller: PromiseCanceller | undefined; cancellableStreams: Set | undefined; } @@ -125,6 +127,7 @@ class IncrementalPublisher { IteratorResult > => { if (isDone) { + this._context.promiseCanceller?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; } @@ -171,6 +174,9 @@ class IncrementalPublisher { batch = await this._incrementalGraph.nextCompletedBatch(); } while (batch !== undefined); + // TODO: add test for this case + /* c8 ignore next */ + this._context.promiseCanceller?.disconnect(); await this._returnAsyncIteratorsIgnoringErrors(); return { value: undefined, done: true }; }; diff --git a/src/execution/PromiseCanceller.ts b/src/execution/PromiseCanceller.ts new file mode 100644 index 0000000000..60c3e3b6a3 --- /dev/null +++ b/src/execution/PromiseCanceller.ts @@ -0,0 +1,53 @@ +import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js'; + +/** + * A PromiseCanceller object can be used to cancel multiple promises + * using a single AbortSignal. + * + * @internal + */ +export class PromiseCanceller { + abortSignal: AbortSignal; + abort: () => void; + + private _aborts: Set<() => void>; + + constructor(abortSignal: AbortSignal) { + this.abortSignal = abortSignal; + this._aborts = new Set<() => void>(); + this.abort = () => { + for (const abort of this._aborts) { + abort(); + } + }; + + abortSignal.addEventListener('abort', this.abort); + } + + disconnect(): void { + this.abortSignal.removeEventListener('abort', this.abort); + } + + withCancellation(originalPromise: Promise): Promise { + if (this.abortSignal.aborted) { + // eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors + return Promise.reject(this.abortSignal.reason); + } + + const { promise, resolve, reject } = promiseWithResolvers(); + const abort = () => reject(this.abortSignal.reason); + this._aborts.add(abort); + originalPromise.then( + (resolved) => { + this._aborts.delete(abort); + resolve(resolved); + }, + (error: unknown) => { + this._aborts.delete(abort); + reject(error); + }, + ); + + return promise; + } +} diff --git a/src/execution/__tests__/PromiseCanceller-test.ts b/src/execution/__tests__/PromiseCanceller-test.ts new file mode 100644 index 0000000000..91fe6c40e5 --- /dev/null +++ b/src/execution/__tests__/PromiseCanceller-test.ts @@ -0,0 +1,56 @@ +import { describe, it } from 'mocha'; + +import { expectPromise } from '../../__testUtils__/expectPromise.js'; + +import { PromiseCanceller } from '../PromiseCanceller.js'; + +describe('PromiseCanceller', () => { + it('works to cancel an already resolved promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = Promise.resolve(1); + + const withCancellation = promiseCanceller.withCancellation(promise); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.withCancellation(promise); + + abortController.abort(new Error('Cancelled!')); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); + + it('works to cancel a hanging promise created after abort signal triggered', async () => { + const abortController = new AbortController(); + const abortSignal = abortController.signal; + + abortController.abort(new Error('Cancelled!')); + + const promiseCanceller = new PromiseCanceller(abortSignal); + + const promise = new Promise(() => { + /* never resolves */ + }); + + const withCancellation = promiseCanceller.withCancellation(promise); + + await expectPromise(withCancellation).toRejectWith('Cancelled!'); + }); +}); diff --git a/src/execution/__tests__/abort-signal-test.ts b/src/execution/__tests__/abort-signal-test.ts index ad9ba6c332..d12253b517 100644 --- a/src/execution/__tests__/abort-signal-test.ts +++ b/src/execution/__tests__/abort-signal-test.ts @@ -9,7 +9,11 @@ import { parse } from '../../language/parser.js'; import { buildSchema } from '../../utilities/buildASTSchema.js'; -import { execute, experimentalExecuteIncrementally } from '../execute.js'; +import { + execute, + experimentalExecuteIncrementally, + subscribe, +} from '../execute.js'; import type { InitialIncrementalExecutionResult, SubsequentIncrementalExecutionResult, @@ -41,7 +45,7 @@ async function complete( const schema = buildSchema(` type Todo { id: ID - text: String + items: [String] author: User } @@ -52,12 +56,17 @@ const schema = buildSchema(` type Query { todo: Todo + nonNullableTodo: Todo! } type Mutation { foo: String bar: String } + + type Subscription { + foo: String + } `); describe('Execute: Cancellation', () => { @@ -82,7 +91,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -177,7 +185,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -226,7 +233,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'Hello, World!', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -271,7 +277,6 @@ describe('Execute: Cancellation', () => { rootValue: { todo: { id: '1', - text: 'Hello, World!', /* c8 ignore next 3 */ author: async () => Promise.resolve(() => expect.fail('Should not be called')), @@ -300,6 +305,146 @@ describe('Execute: Cancellation', () => { }); }); + it('should stop the execution when aborted despite a hanging resolver', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + author { + id + } + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + todo: () => + new Promise(() => { + /* will never resolve */ + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: { + todo: null, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); + + it('should stop the execution when aborted despite a hanging item', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + todo: () => ({ + id: '1', + items: [ + new Promise(() => { + /* will never resolve */ + }), + ], + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: { + todo: { + id: '1', + items: [null], + }, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo', 'items', 0], + locations: [{ line: 5, column: 11 }], + }, + ], + }); + }); + + it('should stop the execution when aborted with proper null bubbling', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + nonNullableTodo { + id + author { + id + } + } + } + `); + + const resultPromise = execute({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + nonNullableTodo: async () => + Promise.resolve({ + id: '1', + /* c8 ignore next */ + author: () => expect.fail('Should not be called'), + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expect(result.errors?.[0].originalError?.name).to.equal('AbortError'); + + expectJSON(result).toDeepEqual({ + data: null, + errors: [ + { + message: 'This operation was aborted', + path: ['nonNullableTodo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); + it('should stop deferred execution when aborted', async () => { const abortController = new AbortController(); const document = parse(` @@ -307,7 +452,6 @@ describe('Execute: Cancellation', () => { todo { id ... on Todo @defer { - text author { id } @@ -323,7 +467,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'hello world', /* c8 ignore next */ author: () => expect.fail('Should not be called'), }), @@ -353,14 +496,11 @@ describe('Execute: Cancellation', () => { const abortController = new AbortController(); const document = parse(` query { - todo { - id - ... on Todo @defer { - text + ... on Query @defer { + todo { + id author { - ... on Author @defer { - id - } + id } } } @@ -373,7 +513,6 @@ describe('Execute: Cancellation', () => { todo: async () => Promise.resolve({ id: '1', - text: 'hello world', /* c8 ignore next 2 */ author: async () => Promise.resolve(() => expect.fail('Should not be called')), @@ -382,9 +521,59 @@ describe('Execute: Cancellation', () => { abortController.signal, ); - await resolveOnNextTick(); - await resolveOnNextTick(); - await resolveOnNextTick(); + abortController.abort(); + + const result = await resultPromise; + + expectJSON(result).toDeepEqual([ + { + data: {}, + pending: [{ id: '0', path: [] }], + hasNext: true, + }, + { + incremental: [ + { + data: { + todo: null, + }, + errors: [ + { + message: 'This operation was aborted', + path: ['todo'], + locations: [{ line: 4, column: 11 }], + }, + ], + id: '0', + }, + ], + completed: [{ id: '0' }], + hasNext: false, + }, + ]); + }); + + it('should stop streamed execution when aborted', async () => { + const abortController = new AbortController(); + const document = parse(` + query { + todo { + id + items @stream + } + } + `); + + const resultPromise = complete( + document, + { + todo: { + id: '1', + items: [Promise.resolve('item')], + }, + }, + abortController.signal, + ); abortController.abort(); @@ -395,28 +584,21 @@ describe('Execute: Cancellation', () => { data: { todo: { id: '1', + items: [], }, }, - pending: [{ id: '0', path: ['todo'] }], + pending: [{ id: '0', path: ['todo', 'items'] }], hasNext: true, }, { incremental: [ { - data: { - text: 'hello world', - author: null, - }, + items: [null], errors: [ { - locations: [ - { - column: 13, - line: 7, - }, - ], message: 'This operation was aborted', - path: ['todo', 'author'], + path: ['todo', 'items', 0], + locations: [{ line: 5, column: 11 }], }, ], id: '0', @@ -448,6 +630,10 @@ describe('Execute: Cancellation', () => { }, }); + await resolveOnNextTick(); + await resolveOnNextTick(); + await resolveOnNextTick(); + abortController.abort(); const result = await resultPromise; @@ -498,4 +684,39 @@ describe('Execute: Cancellation', () => { ], }); }); + + it('should stop the execution when aborted during subscription', async () => { + const abortController = new AbortController(); + const document = parse(` + subscription { + foo + } + `); + + const resultPromise = subscribe({ + document, + schema, + abortSignal: abortController.signal, + rootValue: { + foo: async () => + new Promise(() => { + /* will never resolve */ + }), + }, + }); + + abortController.abort(); + + const result = await resultPromise; + + expectJSON(result).toDeepEqual({ + errors: [ + { + message: 'This operation was aborted', + path: ['foo'], + locations: [{ line: 3, column: 9 }], + }, + ], + }); + }); }); diff --git a/src/execution/execute.ts b/src/execution/execute.ts index 7c06624414..5e2f718176 100644 --- a/src/execution/execute.ts +++ b/src/execution/execute.ts @@ -63,6 +63,7 @@ import { import { getVariableSignature } from './getVariableSignature.js'; import { buildIncrementalResponse } from './IncrementalPublisher.js'; import { mapAsyncIterable } from './mapAsyncIterable.js'; +import { PromiseCanceller } from './PromiseCanceller.js'; import type { CancellableStreamRecord, CompletedExecutionGroup, @@ -163,6 +164,7 @@ export interface ValidatedExecutionArgs { export interface ExecutionContext { validatedExecutionArgs: ValidatedExecutionArgs; errors: Array | undefined; + promiseCanceller: PromiseCanceller | undefined; cancellableStreams: Set | undefined; } @@ -310,9 +312,13 @@ export function executeQueryOrMutationOrSubscriptionEvent( export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( validatedExecutionArgs: ValidatedExecutionArgs, ): PromiseOrValue { + const abortSignal = validatedExecutionArgs.abortSignal; const exeContext: ExecutionContext = { validatedExecutionArgs, errors: undefined, + promiseCanceller: abortSignal + ? new PromiseCanceller(abortSignal) + : undefined, cancellableStreams: undefined, }; try { @@ -364,14 +370,20 @@ export function experimentalExecuteQueryOrMutationOrSubscriptionEvent( if (isPromise(graphqlWrappedResult)) { return graphqlWrappedResult.then( (resolved) => buildDataResponse(exeContext, resolved), - (error: unknown) => ({ - data: null, - errors: withError(exeContext.errors, error as GraphQLError), - }), + (error: unknown) => { + exeContext.promiseCanceller?.disconnect(); + return { + data: null, + errors: withError(exeContext.errors, error as GraphQLError), + }; + }, ); } return buildDataResponse(exeContext, graphqlWrappedResult); } catch (error) { + // TODO: add test case for synchronous null bubbling to root with cancellation + /* c8 ignore next */ + exeContext.promiseCanceller?.disconnect(); return { data: null, errors: withError(exeContext.errors, error) }; } } @@ -462,6 +474,7 @@ function buildDataResponse( const { rawResult: data, incrementalDataRecords } = graphqlWrappedResult; const errors = exeContext.errors; if (incrementalDataRecords === undefined) { + exeContext.promiseCanceller?.disconnect(); return errors !== undefined ? { errors, data } : { data }; } @@ -660,11 +673,12 @@ function executeFieldsSerially( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue>> { + const abortSignal = exeContext.validatedExecutionArgs.abortSignal; return promiseReduce( groupedFieldSet, (graphqlWrappedResult, [responseName, fieldDetailsList]) => { const fieldPath = addPath(path, responseName, parentType.name); - const abortSignal = exeContext.validatedExecutionArgs.abortSignal; + if (abortSignal?.aborted) { handleFieldError( abortSignal.reason, @@ -811,7 +825,7 @@ function executeField( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue> | undefined { - const validatedExecutionArgs = exeContext.validatedExecutionArgs; + const { validatedExecutionArgs, promiseCanceller } = exeContext; const { schema, contextValue, variableValues, hideSuggestions, abortSignal } = validatedExecutionArgs; const fieldName = fieldDetailsList[0].node.name.value; @@ -856,7 +870,7 @@ function executeField( fieldDetailsList, info, path, - result, + promiseCanceller?.withCancellation(result) ?? result, incrementalContext, deferMap, ); @@ -1564,7 +1578,7 @@ function completeListItemValue( } async function completePromisedListItemValue( - item: unknown, + item: Promise, parent: GraphQLWrappedResult>, exeContext: ExecutionContext, itemType: GraphQLOutputType, @@ -1575,7 +1589,9 @@ async function completePromisedListItemValue( deferMap: ReadonlyMap | undefined, ): Promise { try { - const resolved = await item; + const resolved = await (exeContext.promiseCanceller?.withCancellation( + item, + ) ?? item); let completed = completeValue( exeContext, itemType, @@ -1745,23 +1761,13 @@ function completeObjectValue( incrementalContext: IncrementalContext | undefined, deferMap: ReadonlyMap | undefined, ): PromiseOrValue>> { - const validatedExecutionArgs = exeContext.validatedExecutionArgs; - const abortSignal = validatedExecutionArgs.abortSignal; - if (abortSignal?.aborted) { - throw locatedError( - abortSignal.reason, - toNodes(fieldDetailsList), - pathToArray(path), - ); - } - // If there is an isTypeOf predicate function, call it with the // current result. If isTypeOf returns false, then raise an error rather // than continuing execution. if (returnType.isTypeOf) { const isTypeOf = returnType.isTypeOf( result, - validatedExecutionArgs.contextValue, + exeContext.validatedExecutionArgs.contextValue, info, ); @@ -2201,11 +2207,22 @@ function executeSubscription( const result = resolveFn(rootValue, args, contextValue, info, abortSignal); if (isPromise(result)) { - return result - .then(assertEventStream) - .then(undefined, (error: unknown) => { + const promiseCanceller = abortSignal + ? new PromiseCanceller(abortSignal) + : undefined; + const promise = promiseCanceller?.withCancellation(result) ?? result; + return promise.then(assertEventStream).then( + (resolved) => { + // TODO: add test case + /* c8 ignore next */ + promiseCanceller?.disconnect(); + return resolved; + }, + (error: unknown) => { + promiseCanceller?.disconnect(); throw locatedError(error, fieldNodes, pathToArray(path)); - }); + }, + ); } return assertEventStream(result); @@ -2566,7 +2583,7 @@ function completeStreamItem( fieldDetailsList, info, itemPath, - item, + exeContext.promiseCanceller?.withCancellation(item) ?? item, incrementalContext, new Map(), ).then(