@@ -46,25 +46,42 @@ func subscribe(
4646 )
4747
4848 if let sourceStream = sourceResult. stream {
49- let subscriptionStream = sourceStream. map { eventPayload -> GraphQLResult in
50- // For each payload yielded from a subscription, map it over the normal
51- // GraphQL `execute` function, with `payload` as the rootValue.
52- // This implements the "MapSourceToResponseEvent" algorithm described in
53- // the GraphQL specification. The `execute` function provides the
54- // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
55- // "ExecuteQuery" algorithm, for which `execute` is also used.
56- try await execute (
57- queryStrategy: queryStrategy,
58- mutationStrategy: mutationStrategy,
59- subscriptionStrategy: subscriptionStrategy,
60- instrumentation: instrumentation,
61- schema: schema,
62- documentAST: documentAST,
63- rootValue: eventPayload,
64- context: context,
65- variableValues: variableValues,
66- operationName: operationName
67- )
49+ // We must create a new AsyncSequence because AsyncSequence.map requires a concrete type
50+ // (which we cannot know),
51+ // and we need the result to be a concrete type.
52+ let subscriptionStream = AsyncThrowingStream < GraphQLResult , Error > { continuation in
53+ let task = Task {
54+ do {
55+ for try await eventPayload in sourceStream {
56+ // For each payload yielded from a subscription, map it over the normal
57+ // GraphQL `execute` function, with `payload` as the rootValue.
58+ // This implements the "MapSourceToResponseEvent" algorithm described in
59+ // the GraphQL specification. The `execute` function provides the
60+ // "ExecuteSubscriptionEvent" algorithm, as it is nearly identical to the
61+ // "ExecuteQuery" algorithm, for which `execute` is also used.
62+ let newEvent = try await execute (
63+ queryStrategy: queryStrategy,
64+ mutationStrategy: mutationStrategy,
65+ subscriptionStrategy: subscriptionStrategy,
66+ instrumentation: instrumentation,
67+ schema: schema,
68+ documentAST: documentAST,
69+ rootValue: eventPayload,
70+ context: context,
71+ variableValues: variableValues,
72+ operationName: operationName
73+ )
74+ continuation. yield ( newEvent)
75+ }
76+ continuation. finish ( )
77+ } catch {
78+ continuation. finish ( throwing: error)
79+ }
80+ }
81+
82+ continuation. onTermination = { @Sendable reason in
83+ task. cancel ( )
84+ }
6885 }
6986 return SubscriptionResult ( stream: subscriptionStream, errors: sourceResult. errors)
7087 } else {
@@ -151,7 +168,7 @@ func createSourceEventStream(
151168}
152169
153170func executeSubscription(
154- context: ExecutionContext ,
171+ context: ExecutionContext
155172) async throws -> SourceEventStreamResult {
156173 // Get the first node
157174 let type = try getOperationRootType ( schema: context. schema, operation: context. operation)
@@ -245,7 +262,7 @@ func executeSubscription(
245262 return SourceEventStreamResult ( errors: context. errors)
246263 } else if let error = resolved as? GraphQLError {
247264 return SourceEventStreamResult ( errors: [ error] )
248- } else if let stream = resolved as? EventStream < Any > {
265+ } else if let stream = resolved as? any AsyncSequence {
249266 return SourceEventStreamResult ( stream: stream)
250267 } else if resolved == nil {
251268 return SourceEventStreamResult ( errors: [
@@ -255,7 +272,7 @@ func executeSubscription(
255272 let resolvedObj = resolved as AnyObject
256273 return SourceEventStreamResult ( errors: [
257274 GraphQLError (
258- message: " Subscription field resolver must return EventStream<Any> . Received: ' \( resolvedObj) ' "
275+ message: " Subscription field resolver must return an AsyncSequence . Received: ' \( resolvedObj) ' "
259276 ) ,
260277 ] )
261278 }
@@ -266,10 +283,10 @@ func executeSubscription(
266283// checking. Normal resolvers for subscription fields should handle type casting, same as resolvers
267284// for query fields.
268285struct SourceEventStreamResult {
269- public let stream : EventStream < Any > ?
286+ public let stream : ( any AsyncSequence ) ?
270287 public let errors : [ GraphQLError ]
271288
272- public init ( stream: EventStream < Any > ? = nil , errors: [ GraphQLError ] = [ ] ) {
289+ public init ( stream: ( any AsyncSequence ) ? = nil , errors: [ GraphQLError ] = [ ] ) {
273290 self . stream = stream
274291 self . errors = errors
275292 }
0 commit comments