Skip to content

Commit 8892c1a

Browse files
authored
Merge pull request #3333 from SeedCompany/graphql/ws
2 parents a0526e7 + 1dc4695 commit 8892c1a

File tree

7 files changed

+150
-4
lines changed

7 files changed

+150
-4
lines changed

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"@fastify/compress": "^8.0.1",
3636
"@fastify/cookie": "^11.0.1",
3737
"@fastify/cors": "^11.0.1",
38+
"@fastify/websocket": "^11.0.2",
3839
"@ffprobe-installer/ffprobe": "^2.1.2",
3940
"@golevelup/nestjs-discovery": "^4.0.0",
4041
"@graphql-hive/yoga": ">=0.41.0",
@@ -76,6 +77,7 @@
7677
"graphql": "^16.9.0",
7778
"graphql-parse-resolve-info": "^4.14.0",
7879
"graphql-scalars": "^1.22.4",
80+
"graphql-ws": "^6.0.4",
7981
"graphql-yoga": "^5.13.4",
8082
"human-format": "^1.2.0",
8183
"image-size": "^2.0.2",

src/core/graphql/driver.ts

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@ import {
55
type GqlModuleOptions,
66
} from '@nestjs/graphql';
77
import type { RouteOptions as FastifyRoute } from 'fastify';
8+
import type { ExecutionArgs } from 'graphql';
9+
import { makeHandler as makeGqlWSHandler } from 'graphql-ws/use/@fastify/websocket';
810
import {
911
createYoga,
12+
type envelop,
1013
type YogaServerInstance,
1114
type YogaServerOptions,
1215
} from 'graphql-yoga';
16+
import { AsyncResource } from 'node:async_hooks';
17+
import type { WebSocket } from 'ws';
1318
import { type GqlContextType } from '~/common';
1419
import { HttpAdapter, type IRequest } from '../http';
1520
import { type IResponse } from '../http/types';
@@ -55,7 +60,14 @@ export class Driver extends AbstractDriver<DriverConfig> {
5560
});
5661

5762
fastify.route({
58-
method: ['GET', 'POST', 'OPTIONS'],
63+
method: 'GET',
64+
url: this.yoga.graphqlEndpoint,
65+
handler: this.httpHandler,
66+
// Allow this same path to handle websocket upgrade requests.
67+
wsHandler: this.makeWsHandler(options),
68+
});
69+
fastify.route({
70+
method: ['POST', 'OPTIONS'],
5971
url: this.yoga.graphqlEndpoint,
6072
handler: this.httpHandler,
6173
});
@@ -77,6 +89,97 @@ export class Driver extends AbstractDriver<DriverConfig> {
7789
.send(res.body);
7890
};
7991

92+
/**
93+
* This code ties fastify, yoga, and graphql-ws together.
94+
* Execution layers in order:
95+
* 1. fastify route (http path matching)
96+
* 2. fastify's websocket plugin (http upgrade request & websocket open/close)
97+
* This allows our fastify hooks to be executed.
98+
* And provides a consistent Fastify `Request` type,
99+
* instead of a raw `IncomingMessage`.
100+
* 3. `graphql-ws`'s fastify handler (adapts #2 to graphql-ws)
101+
* 4. `graphql-ws` (handles specific gql protocol over websockets)
102+
* 5. `graphql-yoga` is unwrapped to `envelop`.
103+
* Yoga just wraps `envelop` and handles more of the http layer.
104+
* We really just reference `envelop` hooks with our "Yoga Plugins".
105+
* So this allows our "yoga" plugins to be executed.
106+
*/
107+
private makeWsHandler(options: DriverConfig) {
108+
const asyncContextBySocket = new WeakMap<WebSocket, AsyncResource>();
109+
interface WsExecutionArgs extends ExecutionArgs {
110+
socket: WebSocket;
111+
envelop: ReturnType<ReturnType<typeof envelop>>;
112+
}
113+
114+
// The graphql-ws handler which accepts the fastify websocket/request and
115+
// orchestrates the subscription setup & execution.
116+
// This forwards to yoga/envelop.
117+
// This was adapted from yoga's graphql-ws example.
118+
// https://github.com/dotansimha/graphql-yoga/tree/main/examples/graphql-ws
119+
const fastifyWsHandler = makeGqlWSHandler<
120+
Record<string, unknown>,
121+
{ socket: WebSocket; request: IRequest }
122+
>({
123+
schema: options.schema!,
124+
// Custom execute/subscribe functions that really just defer to a
125+
// unique envelop (yoga) instance per request.
126+
execute: (wsArgs) => {
127+
const { envelop, socket, ...args } = wsArgs as WsExecutionArgs;
128+
return asyncContextBySocket.get(socket)!.runInAsyncScope(() => {
129+
return envelop.execute(args);
130+
});
131+
},
132+
subscribe: (wsArgs) => {
133+
const { envelop, socket, ...args } = wsArgs as WsExecutionArgs;
134+
// Because this is called via socket.onmessage, we don't have
135+
// the same async context we started with.
136+
// Grab and resume it.
137+
return asyncContextBySocket.get(socket)!.runInAsyncScope(() => {
138+
return envelop.subscribe(args);
139+
});
140+
},
141+
// Create a unique envelop/yoga instance for each subscription.
142+
// This allows "yoga" plugins that are really just envelop hooks
143+
// to be executed.
144+
onSubscribe: async (ctx, id, payload) => {
145+
const {
146+
extra: { request, socket },
147+
} = ctx;
148+
const envelop = this.yoga.getEnveloped({
149+
req: request,
150+
socket,
151+
params: payload, // Same(ish?) shape as YogaInitialContext.params
152+
});
153+
154+
const args: WsExecutionArgs = {
155+
schema: envelop.schema,
156+
operationName: payload.operationName,
157+
document: envelop.parse(payload.query),
158+
variableValues: payload.variables,
159+
contextValue: await envelop.contextFactory(),
160+
// These are needed in our execute()/subscribe() declared above.
161+
// Public examples put these functions in the context, but I don't
162+
// like exposing that implementation detail to the rest of the app.
163+
envelop,
164+
socket,
165+
};
166+
167+
const errors = envelop.validate(args.schema, args.document);
168+
if (errors.length) {
169+
return errors;
170+
}
171+
return args;
172+
},
173+
});
174+
175+
const wsHandler: FastifyRoute['wsHandler'] = function (socket, req) {
176+
// Save a reference to the current async context, so we can resume it.
177+
asyncContextBySocket.set(socket, new AsyncResource('graphql-ws'));
178+
return fastifyWsHandler.call(this, socket, req);
179+
};
180+
return wsHandler;
181+
}
182+
80183
async stop() {
81184
// noop
82185
}

src/core/graphql/gql-context.host.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,17 @@ export class GqlContextHostImpl implements GqlContextHost, OnModuleDestroy {
5252
});
5353
};
5454

55+
onSubscribe: Plugin['onSubscribe'] = ({
56+
subscribeFn,
57+
setSubscribeFn,
58+
args,
59+
}) => {
60+
const ctx = args.contextValue;
61+
setSubscribeFn((...args) => {
62+
return this.als.run(ctx, subscribeFn, ...args);
63+
});
64+
};
65+
5566
onModuleDestroy() {
5667
this.als.disable();
5768
}

src/core/graphql/graphql-error-formatter.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,13 @@ export class GraphqlErrorFormatter {
5151
}),
5252
});
5353

54+
onSubscribe: Plugin['onSubscribe'] = () => ({
55+
onSubscribeError: ({ error, setError }) => {
56+
const formatted = this.formatError(error);
57+
setError(formatted);
58+
},
59+
});
60+
5461
formatError = (error: unknown) => {
5562
if (!(error instanceof GraphQLError)) {
5663
// I don't think this happens.

src/core/graphql/graphql-session.plugin.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,11 @@ export class GraphqlSessionPlugin {
99
args.contextValue.session$.complete();
1010
},
1111
});
12+
onSubscribe: Plugin['onSubscribe'] = () => ({
13+
onSubscribeResult: ({ args }) => ({
14+
onEnd: () => {
15+
args.contextValue.session$.complete();
16+
},
17+
}),
18+
});
1219
}

src/core/http/http.adapter.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import compression from '@fastify/compress';
22
import cookieParser from '@fastify/cookie';
33
import cors from '@fastify/cors';
4+
import websocket from '@fastify/websocket';
45
import { DiscoveryService } from '@golevelup/nestjs-discovery';
56
import {
67
VERSION_NEUTRAL,
@@ -65,6 +66,8 @@ export class HttpAdapter extends PatchedFastifyAdapter {
6566
// Only on routes we've decorated.
6667
await app.register(rawBody, { global: false });
6768

69+
await app.register(websocket);
70+
6871
app.setGlobalPrefix(config.hostUrl$.value.pathname.slice(1));
6972

7073
config.applyTimeouts(app.getHttpServer(), config.httpTimeouts);

yarn.lock

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1425,6 +1425,17 @@ __metadata:
14251425
languageName: node
14261426
linkType: hard
14271427

1428+
"@fastify/websocket@npm:^11.0.2":
1429+
version: 11.0.2
1430+
resolution: "@fastify/websocket@npm:11.0.2"
1431+
dependencies:
1432+
duplexify: "npm:^4.1.3"
1433+
fastify-plugin: "npm:^5.0.0"
1434+
ws: "npm:^8.16.0"
1435+
checksum: 10c0/ccdaf645d768f427542d5133d3a0f8b4ce49e4bc2496543e1fb6f1d4b8c4e07af6fb80bde7bf50acf7536cb35c839c54d94a0a6b6f1aec76d413f0e68b92e04e
1436+
languageName: node
1437+
linkType: hard
1438+
14281439
"@ffprobe-installer/darwin-arm64@npm:5.0.1":
14291440
version: 5.0.1
14301441
resolution: "@ffprobe-installer/darwin-arm64@npm:5.0.1"
@@ -5816,6 +5827,7 @@ __metadata:
58165827
"@fastify/compress": "npm:^8.0.1"
58175828
"@fastify/cookie": "npm:^11.0.1"
58185829
"@fastify/cors": "npm:^11.0.1"
5830+
"@fastify/websocket": "npm:^11.0.2"
58195831
"@ffprobe-installer/ffprobe": "npm:^2.1.2"
58205832
"@gel/generate": "npm:^0.7.0-canary.20250422T080308"
58215833
"@golevelup/nestjs-discovery": "npm:^4.0.0"
@@ -5881,6 +5893,7 @@ __metadata:
58815893
graphql: "npm:^16.9.0"
58825894
graphql-parse-resolve-info: "npm:^4.14.0"
58835895
graphql-scalars: "npm:^1.22.4"
5896+
graphql-ws: "npm:^6.0.4"
58845897
graphql-yoga: "npm:^5.13.4"
58855898
human-format: "npm:^1.2.0"
58865899
husky: "npm:^4.3.8"
@@ -6481,7 +6494,7 @@ __metadata:
64816494
languageName: node
64826495
linkType: hard
64836496

6484-
"duplexify@npm:^4.1.1":
6497+
"duplexify@npm:^4.1.1, duplexify@npm:^4.1.3":
64856498
version: 4.1.3
64866499
resolution: "duplexify@npm:4.1.3"
64876500
dependencies:
@@ -8078,7 +8091,7 @@ __metadata:
80788091
languageName: node
80798092
linkType: hard
80808093

8081-
"graphql-ws@npm:6.0.4, graphql-ws@npm:^6.0.3":
8094+
"graphql-ws@npm:6.0.4, graphql-ws@npm:^6.0.3, graphql-ws@npm:^6.0.4":
80828095
version: 6.0.4
80838096
resolution: "graphql-ws@npm:6.0.4"
80848097
peerDependencies:
@@ -14276,7 +14289,7 @@ __metadata:
1427614289
languageName: node
1427714290
linkType: hard
1427814291

14279-
"ws@npm:8.18.1, ws@npm:^8.17.1":
14292+
"ws@npm:8.18.1, ws@npm:^8.16.0, ws@npm:^8.17.1":
1428014293
version: 8.18.1
1428114294
resolution: "ws@npm:8.18.1"
1428214295
peerDependencies:

0 commit comments

Comments
 (0)