@@ -10,7 +10,6 @@ public class Server<
1010> : @unchecked Sendable where
1111 SubscriptionSequenceType. Element == GraphQLResult
1212{
13-
1413 // We keep this weak because we strongly inject this object into the messenger callback
1514 weak var messenger : Messenger ?
1615
@@ -89,13 +88,17 @@ public class Server<
8988 try await self . error ( . invalidRequestFormat( messageType: . complete) )
9089 return
9190 }
92- try await self . onOperationComplete ( completeRequest. id )
91+ try await self . onOperationComplete ( completeRequest)
9392 case . unknown:
9493 try await self . error ( . invalidType( ) )
9594 }
9695 }
9796 }
9897
98+ deinit {
99+ subscriptionTasks. values. forEach { $0. cancel ( ) }
100+ }
101+
99102 /// Define a custom callback run during `connection_init` resolution that allows authorization using the `payload`.
100103 /// Throw from this closure to indicate that authorization has failed.
101104 /// - Parameter callback: The callback to assign
@@ -171,18 +174,15 @@ public class Server<
171174 let stream = try await onSubscribe ( graphQLRequest)
172175 for try await event in stream {
173176 try Task . checkCancellation ( )
174- do {
175- try await self . sendNext ( event, id: id)
176- } catch {
177- try await self . sendError ( error, id: id)
178- throw error
179- }
177+ try await self . sendNext ( event, id: id)
180178 }
181179 } catch {
182180 try await sendError ( error, id: id)
181+ subscriptionTasks. removeValue ( forKey: id)
183182 throw error
184183 }
185184 try await self . sendComplete ( id: id)
185+ subscriptionTasks. removeValue ( forKey: id)
186186 }
187187 } else {
188188 do {
@@ -196,6 +196,20 @@ public class Server<
196196 }
197197 }
198198
199+ private func onOperationComplete( _ completeRequest: CompleteRequest ) async throws {
200+ guard initialized else {
201+ try await error ( . notInitialized( ) )
202+ return
203+ }
204+
205+ let id = completeRequest. id
206+ if let task = subscriptionTasks [ id] {
207+ task. cancel ( )
208+ subscriptionTasks. removeValue ( forKey: id)
209+ }
210+ try await onOperationComplete ( id)
211+ }
212+
199213 /// Send a `connection_ack` response through the messenger
200214 private func sendConnectionAck( _ payload: [ String : Map ] ? = nil ) async throws {
201215 guard let messenger = messenger else { return }
0 commit comments