@@ -38,43 +38,24 @@ func subscribe(
3838 )
3939
4040 return sourceResult. map { sourceStream in
41- // We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
42- // (which we cannot know),
43- // and we need the result to be a concrete type.
44- let subscriptionStream = AsyncThrowingStream < GraphQLResult , Error > { continuation in
45- let task = Task {
46- do {
47- for try await eventPayload in sourceStream {
48- // For each payload yielded from a subscription, map it over the normal
49- // GraphQL `execute` function, with `payload` as the rootValue.
50- // This implements the "MapSourceToResponseEvent" algorithm described in
51- // the GraphQL specification. The `execute` function provides the
52- // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
53- // "ExecuteQuery" algorithm, for which `execute` is also used.
54- let newEvent = try await execute (
55- queryStrategy: queryStrategy,
56- mutationStrategy: mutationStrategy,
57- subscriptionStrategy: subscriptionStrategy,
58- schema: schema,
59- documentAST: documentAST,
60- rootValue: eventPayload,
61- context: context,
62- variableValues: variableValues,
63- operationName: operationName
64- )
65- continuation. yield ( newEvent)
66- }
67- continuation. finish ( )
68- } catch {
69- continuation. finish ( throwing: error)
70- }
71- }
72-
73- continuation. onTermination = { @Sendable reason in
74- task. cancel ( )
41+ AsyncThrowingStream < GraphQLResult , Error > {
42+ // The type-cast below is required on Swift <6. Once we drop Swift 5 support it may be removed.
43+ var iterator = sourceStream. makeAsyncIterator ( ) as ( any AsyncIteratorProtocol )
44+ guard let eventPayload = try await iterator. next ( ) else {
45+ return nil
7546 }
47+ return try await execute (
48+ queryStrategy: queryStrategy,
49+ mutationStrategy: mutationStrategy,
50+ subscriptionStrategy: subscriptionStrategy,
51+ schema: schema,
52+ documentAST: documentAST,
53+ rootValue: eventPayload,
54+ context: context,
55+ variableValues: variableValues,
56+ operationName: operationName
57+ )
7658 }
77- return subscriptionStream
7859 }
7960}
8061
@@ -111,7 +92,7 @@ func createSourceEventStream(
11192 context: Any ,
11293 variableValues: [ String : Map ] = [ : ] ,
11394 operationName: String ? = nil
114- ) async throws -> Result < any AsyncSequence , GraphQLErrors > {
95+ ) async throws -> Result < any AsyncSequence & Sendable , GraphQLErrors > {
11596 // If a valid context cannot be created due to incorrect arguments,
11697 // this will throw an error.
11798 let exeContext = try buildExecutionContext (
@@ -141,7 +122,7 @@ func createSourceEventStream(
141122
142123func executeSubscription(
143124 context: ExecutionContext
144- ) async throws -> Result < any AsyncSequence , GraphQLErrors > {
125+ ) async throws -> Result < any AsyncSequence & Sendable , GraphQLErrors > {
145126 // Get the first node
146127 let type = try getOperationRootType ( schema: context. schema, operation: context. operation)
147128 var inputFields : OrderedDictionary < String , [ Field ] > = [ : ]
0 commit comments