@@ -13,6 +13,7 @@ import {
13
13
type YogaServerInstance ,
14
14
type YogaServerOptions ,
15
15
} from 'graphql-yoga' ;
16
+ import { AsyncResource } from 'node:async_hooks' ;
16
17
import type { WebSocket } from 'ws' ;
17
18
import { type GqlContextType } from '~/common' ;
18
19
import { HttpAdapter , type IRequest } from '../http' ;
@@ -104,7 +105,9 @@ export class Driver extends AbstractDriver<DriverConfig> {
104
105
* So this allows our "yoga" plugins to be executed.
105
106
*/
106
107
private makeWsHandler ( options : DriverConfig ) {
108
+ const asyncContextBySocket = new WeakMap < WebSocket , AsyncResource > ( ) ;
107
109
interface WsExecutionArgs extends ExecutionArgs {
110
+ socket : WebSocket ;
108
111
envelop : ReturnType < ReturnType < typeof envelop > > ;
109
112
}
110
113
@@ -113,20 +116,27 @@ export class Driver extends AbstractDriver<DriverConfig> {
113
116
// This forwards to yoga/envelop.
114
117
// This was adapted from yoga's graphql-ws example.
115
118
// https://github.com/dotansimha/graphql-yoga/tree/main/examples/graphql-ws
116
- const wsHandler = makeGqlWSHandler <
119
+ const fastifyWsHandler = makeGqlWSHandler <
117
120
Record < string , unknown > ,
118
121
{ socket : WebSocket ; request : IRequest }
119
122
> ( {
120
123
schema : options . schema ! ,
121
124
// Custom execute/subscribe functions that really just defer to a
122
125
// unique envelop (yoga) instance per request.
123
126
execute : ( wsArgs ) => {
124
- const { envelop, ...args } = wsArgs as WsExecutionArgs ;
125
- return envelop . execute ( args ) ;
127
+ const { envelop, socket, ...args } = wsArgs as WsExecutionArgs ;
128
+ return asyncContextBySocket . get ( socket ) ! . runInAsyncScope ( ( ) => {
129
+ return envelop . execute ( args ) ;
130
+ } ) ;
126
131
} ,
127
132
subscribe : ( wsArgs ) => {
128
- const { envelop, ...args } = wsArgs as WsExecutionArgs ;
129
- return envelop . subscribe ( args ) ;
133
+ const { envelop, socket, ...args } = wsArgs as WsExecutionArgs ;
134
+ // Because this is called via socket.onmessage, we don't have
135
+ // the same async context we started with.
136
+ // Grab and resume it.
137
+ return asyncContextBySocket . get ( socket ) ! . runInAsyncScope ( ( ) => {
138
+ return envelop . subscribe ( args ) ;
139
+ } ) ;
130
140
} ,
131
141
// Create a unique envelop/yoga instance for each subscription.
132
142
// This allows "yoga" plugins that are really just envelop hooks
@@ -151,6 +161,7 @@ export class Driver extends AbstractDriver<DriverConfig> {
151
161
// Public examples put these functions in the context, but I don't
152
162
// like exposing that implementation detail to the rest of the app.
153
163
envelop,
164
+ socket,
154
165
} ;
155
166
156
167
const errors = envelop . validate ( args . schema , args . document ) ;
@@ -161,6 +172,11 @@ export class Driver extends AbstractDriver<DriverConfig> {
161
172
} ,
162
173
} ) ;
163
174
175
+ const wsHandler : FastifyRoute [ 'wsHandler' ] = function ( socket , req ) {
176
+ // Save a reference to the current async context, so we can resume it.
177
+ asyncContextBySocket . set ( socket , new AsyncResource ( 'graphql-ws' ) ) ;
178
+ return fastifyWsHandler . call ( this , socket , req ) ;
179
+ } ;
164
180
return wsHandler ;
165
181
}
166
182
0 commit comments