Skip to content

Commit 3958158

Browse files
subscribe: simplify by rewriting as async functions (#3024)
1 parent 814169b commit 3958158

File tree

1 file changed

+53
-61
lines changed

1 file changed

+53
-61
lines changed

src/subscription/subscribe.js

Lines changed: 53 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ export type SubscriptionArgs = {|
5757
*
5858
* Accepts either an object with named arguments, or individual arguments.
5959
*/
60-
export function subscribe(
60+
export async function subscribe(
6161
args: SubscriptionArgs,
6262
): Promise<AsyncGenerator<ExecutionResult, void, void> | ExecutionResult> {
6363
const {
@@ -71,7 +71,8 @@ export function subscribe(
7171
subscribeFieldResolver,
7272
} = args;
7373

74-
const sourcePromise = createSourceEventStream(
74+
// $FlowFixMe[incompatible-call]
75+
const resultOrStream = await createSourceEventStream(
7576
schema,
7677
document,
7778
rootValue,
@@ -81,6 +82,10 @@ export function subscribe(
8182
subscribeFieldResolver,
8283
);
8384

85+
if (!isAsyncIterable(resultOrStream)) {
86+
return resultOrStream;
87+
}
88+
8489
// For each payload yielded from a subscription, map it over the normal
8590
// GraphQL `execute` function, with `payload` as the rootValue.
8691
// This implements the "MapSourceToResponseEvent" algorithm described in
@@ -98,30 +103,13 @@ export function subscribe(
98103
fieldResolver,
99104
});
100105

101-
// Resolve the Source Stream, then map every source value to a
102-
// ExecutionResult value as described above.
103-
return sourcePromise.then((resultOrStream) =>
104-
// Note: Flow can't refine isAsyncIterable, so explicit casts are used.
105-
isAsyncIterable(resultOrStream)
106-
? mapAsyncIterator(
107-
resultOrStream,
108-
mapSourceToResponse,
109-
reportGraphQLError,
110-
)
111-
: ((resultOrStream: any): ExecutionResult),
112-
);
113-
}
114-
115-
/**
116-
* This function checks if the error is a GraphQLError. If it is, report it as
117-
* an ExecutionResult, containing only errors and no data. Otherwise treat the
118-
* error as a system-class error and re-throw it.
119-
*/
120-
function reportGraphQLError(error: mixed): ExecutionResult {
121-
if (error instanceof GraphQLError) {
122-
return { errors: [error] };
123-
}
124-
throw error;
106+
// Map every source value to a ExecutionResult value as described above.
107+
return mapAsyncIterator(resultOrStream, mapSourceToResponse, (error) => {
108+
if (error instanceof GraphQLError) {
109+
return { errors: [error] };
110+
}
111+
throw error;
112+
});
125113
}
126114

127115
/**
@@ -152,7 +140,7 @@ function reportGraphQLError(error: mixed): ExecutionResult {
152140
* or otherwise separating these two steps. For more on this, see the
153141
* "Supporting Subscriptions at Scale" information in the GraphQL specification.
154142
*/
155-
export function createSourceEventStream(
143+
export async function createSourceEventStream(
156144
schema: GraphQLSchema,
157145
document: DocumentNode,
158146
rootValue?: mixed,
@@ -165,9 +153,8 @@ export function createSourceEventStream(
165153
// developer mistake which should throw an early error.
166154
assertValidExecutionArguments(schema, document, variableValues);
167155

168-
return new Promise((resolve) => {
169-
// If a valid context cannot be created due to incorrect arguments,
170-
// this will throw an error.
156+
try {
157+
// If a valid context cannot be created due to incorrect arguments, this will throw an error.
171158
const exeContext = buildExecutionContext(
172159
schema,
173160
document,
@@ -178,18 +165,35 @@ export function createSourceEventStream(
178165
fieldResolver,
179166
);
180167

181-
resolve(
182-
// Return early errors if execution context failed.
183-
Array.isArray(exeContext)
184-
? { errors: exeContext }
185-
: executeSubscription(exeContext),
186-
);
187-
}).catch(reportGraphQLError);
168+
// Return early errors if execution context failed.
169+
if (Array.isArray(exeContext)) {
170+
return { errors: exeContext };
171+
}
172+
173+
const eventStream = await executeSubscription(exeContext);
174+
175+
// Assert field returned an event stream, otherwise yield an error.
176+
if (!isAsyncIterable(eventStream)) {
177+
throw new Error(
178+
'Subscription field must return Async Iterable. ' +
179+
`Received: ${inspect(eventStream)}.`,
180+
);
181+
}
182+
183+
return eventStream;
184+
} catch (error) {
185+
// If it GraphQLError, report it as an ExecutionResult, containing only errors and no data.
186+
// Otherwise treat the error as a system-class error and re-throw it.
187+
if (error instanceof GraphQLError) {
188+
return { errors: [error] };
189+
}
190+
throw error;
191+
}
188192
}
189193

190-
function executeSubscription(
194+
async function executeSubscription(
191195
exeContext: ExecutionContext,
192-
): Promise<AsyncIterable<mixed>> {
196+
): Promise<mixed> {
193197
const { schema, operation, variableValues, rootValue } = exeContext;
194198
const type = getOperationRootType(schema, operation);
195199
const fields = collectFields(
@@ -216,8 +220,7 @@ function executeSubscription(
216220
const path = addPath(undefined, responseName, type.name);
217221
const info = buildResolveInfo(exeContext, fieldDef, fieldNodes, type, path);
218222

219-
// Coerce to Promise for easier error handling and consistent return type.
220-
return new Promise((resolveResult) => {
223+
try {
221224
// Implements the "ResolveFieldEventStream" algorithm from GraphQL specification.
222225
// It differs from "ResolveFieldValue" due to providing a different `resolveFn`.
223226

@@ -233,24 +236,13 @@ function executeSubscription(
233236
// Call the `subscribe()` resolver or the default resolver to produce an
234237
// AsyncIterable yielding raw payloads.
235238
const resolveFn = fieldDef.subscribe ?? exeContext.fieldResolver;
236-
resolveResult(resolveFn(rootValue, args, contextValue, info));
237-
}).then(
238-
(eventStream) => {
239-
if (eventStream instanceof Error) {
240-
throw locatedError(eventStream, fieldNodes, pathToArray(path));
241-
}
242-
243-
// Assert field returned an event stream, otherwise yield an error.
244-
if (!isAsyncIterable(eventStream)) {
245-
throw new Error(
246-
'Subscription field must return Async Iterable. ' +
247-
`Received: ${inspect(eventStream)}.`,
248-
);
249-
}
250-
return eventStream;
251-
},
252-
(error) => {
253-
throw locatedError(error, fieldNodes, pathToArray(path));
254-
},
255-
);
239+
const eventStream = await resolveFn(rootValue, args, contextValue, info);
240+
241+
if (eventStream instanceof Error) {
242+
throw eventStream;
243+
}
244+
return eventStream;
245+
} catch (error) {
246+
throw locatedError(error, fieldNodes, pathToArray(path));
247+
}
256248
}

0 commit comments

Comments
 (0)