Skip to content

Commit fc4c386

Browse files
Port graphql-java 15 subscription changes to Flow strategy (#742)
In graphql-java 15, changes were made to SubscriptionExecutionStrategy to allow for DataFetcherResults to be returned, and to instrument the results of a subscription properly. This updates the Flow strategy with the same changes and adds tests for both changes. See graphql-java/graphql-java#1804
1 parent cc87ba3 commit fc4c386

File tree

2 files changed

+145
-9
lines changed

2 files changed

+145
-9
lines changed

graphql-kotlin-schema-generator/src/main/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategy.kt

Lines changed: 94 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,30 @@ package com.expediagroup.graphql.execution
1818

1919
import graphql.ExecutionResult
2020
import graphql.ExecutionResultImpl
21+
import graphql.GraphQLError
22+
import graphql.execution.AbsoluteGraphQLError
2123
import graphql.execution.DataFetcherExceptionHandler
24+
import graphql.execution.DataFetcherResult
2225
import graphql.execution.ExecutionContext
26+
import graphql.execution.ExecutionStepInfo
2327
import graphql.execution.ExecutionStrategy
2428
import graphql.execution.ExecutionStrategyParameters
2529
import graphql.execution.FetchedValue
2630
import graphql.execution.SimpleDataFetcherExceptionHandler
2731
import graphql.execution.SubscriptionExecutionStrategy
32+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
33+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionStrategyParameters
34+
import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters
2835
import graphql.execution.reactive.CompletionStageMappingPublisher
36+
import graphql.schema.GraphQLObjectType
2937
import kotlinx.coroutines.flow.Flow
3038
import kotlinx.coroutines.flow.map
3139
import kotlinx.coroutines.future.await
3240
import kotlinx.coroutines.reactive.asFlow
3341
import org.reactivestreams.Publisher
3442
import java.util.Collections
3543
import java.util.concurrent.CompletableFuture
44+
import java.util.function.Consumer
3645

3746
/**
3847
* [SubscriptionExecutionStrategy] replacement that returns an [ExecutionResult]
@@ -54,11 +63,15 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
5463
parameters: ExecutionStrategyParameters
5564
): CompletableFuture<ExecutionResult> {
5665

66+
val instrumentation = executionContext.instrumentation
67+
val instrumentationParameters = InstrumentationExecutionStrategyParameters(executionContext, parameters)
68+
val executionStrategyCtx = instrumentation.beginExecutionStrategy(instrumentationParameters)
69+
5770
val sourceEventStream = createSourceEventStream(executionContext, parameters)
5871

5972
//
6073
// when the upstream source event stream completes, subscribe to it and wire in our adapter
61-
return sourceEventStream.thenApply { sourceFlow ->
74+
val overallResult: CompletableFuture<ExecutionResult> = sourceEventStream.thenApply { sourceFlow ->
6275
if (sourceFlow == null) {
6376
ExecutionResultImpl(null, executionContext.errors)
6477
} else {
@@ -68,6 +81,12 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
6881
ExecutionResultImpl(returnFlow, executionContext.errors)
6982
}
7083
}
84+
85+
// dispatched the subscription query
86+
executionStrategyCtx.onDispatched(overallResult)
87+
overallResult.whenComplete(executionStrategyCtx::onCompleted)
88+
89+
return overallResult
7190
}
7291

7392
/*
@@ -118,15 +137,37 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
118137
parameters: ExecutionStrategyParameters,
119138
eventPayload: Any?
120139
): CompletableFuture<ExecutionResult> {
121-
val newExecutionContext = executionContext.transform { builder -> builder.root(eventPayload) }
140+
val instrumentation = executionContext.instrumentation
122141

142+
val newExecutionContext = executionContext.transform { builder ->
143+
builder
144+
.root(eventPayload)
145+
.resetErrors()
146+
}
123147
val newParameters = firstFieldOfSubscriptionSelection(parameters)
124-
val fetchedValue = FetchedValue.newFetchedValue().fetchedValue(eventPayload)
125-
.rawFetchedValue(eventPayload)
126-
.localContext(parameters.localContext)
127-
.build()
128-
return completeField(newExecutionContext, newParameters, fetchedValue).fieldValue
148+
val subscribedFieldStepInfo = createSubscribedFieldStepInfo(executionContext, newParameters)
149+
150+
val i13nFieldParameters = InstrumentationFieldParameters(executionContext, subscribedFieldStepInfo.fieldDefinition, subscribedFieldStepInfo)
151+
val subscribedFieldCtx = instrumentation.beginSubscribedFieldEvent(i13nFieldParameters)
152+
153+
val fetchedValue = unboxPossibleDataFetcherResult(newExecutionContext, parameters, eventPayload)
154+
155+
val fieldValueInfo = completeField(newExecutionContext, newParameters, fetchedValue)
156+
val overallResult = fieldValueInfo
157+
.fieldValue
129158
.thenApply { executionResult -> wrapWithRootFieldName(newParameters, executionResult) }
159+
160+
// dispatch instrumentation so they can know about each subscription event
161+
subscribedFieldCtx.onDispatched(overallResult)
162+
overallResult.whenComplete(subscribedFieldCtx::onCompleted)
163+
164+
// allow them to instrument each ER should they want to
165+
val i13ExecutionParameters = InstrumentationExecutionParameters(
166+
executionContext.executionInput, executionContext.graphQLSchema, executionContext.instrumentationState)
167+
168+
return overallResult.thenCompose { executionResult ->
169+
instrumentation.instrumentExecutionResult(executionResult, i13ExecutionParameters)
170+
}
130171
}
131172

132173
private fun wrapWithRootFieldName(
@@ -154,4 +195,50 @@ class FlowSubscriptionExecutionStrategy(dfe: DataFetcherExceptionHandler) : Exec
154195
val fieldPath = parameters.path.segment(ExecutionStrategy.mkNameForPath(firstField.singleField))
155196
return parameters.transform { builder -> builder.field(firstField).path(fieldPath) }
156197
}
198+
199+
private fun createSubscribedFieldStepInfo(
200+
executionContext: ExecutionContext,
201+
parameters: ExecutionStrategyParameters
202+
): ExecutionStepInfo {
203+
val field = parameters.field.singleField
204+
val parentType = parameters.executionStepInfo.unwrappedNonNullType as GraphQLObjectType
205+
val fieldDef = getFieldDef(executionContext.graphQLSchema, parentType, field)
206+
return createExecutionStepInfo(executionContext, parameters, fieldDef, parentType)
207+
}
208+
209+
/**
210+
* java->kotlin copy of [ExecutionStrategy.unboxPossibleDataFetcherResult] where it's package-private
211+
*/
212+
private fun unboxPossibleDataFetcherResult(
213+
executionContext: ExecutionContext,
214+
parameters: ExecutionStrategyParameters,
215+
result: Any?
216+
): FetchedValue {
217+
return if (result is DataFetcherResult<*>) {
218+
if (result.isMapRelativeErrors) {
219+
result.errors.stream()
220+
.map { relError: GraphQLError? -> AbsoluteGraphQLError(parameters, relError) }
221+
.forEach { error: AbsoluteGraphQLError? -> executionContext.addError(error) }
222+
} else {
223+
result.errors.forEach(Consumer { error: GraphQLError? -> executionContext.addError(error) })
224+
}
225+
var localContext = result.localContext
226+
if (localContext == null) {
227+
// if the field returns nothing then they get the context of their parent field
228+
localContext = parameters.localContext
229+
}
230+
FetchedValue.newFetchedValue()
231+
.fetchedValue(executionContext.valueUnboxer.unbox(result.data))
232+
.rawFetchedValue(result.data)
233+
.errors(result.errors)
234+
.localContext(localContext)
235+
.build()
236+
} else {
237+
FetchedValue.newFetchedValue()
238+
.fetchedValue(executionContext.valueUnboxer.unbox(result))
239+
.rawFetchedValue(result)
240+
.localContext(parameters.localContext)
241+
.build()
242+
}
243+
}
157244
}

graphql-kotlin-schema-generator/src/test/kotlin/com/expediagroup/graphql/execution/FlowSubscriptionExecutionStrategyTest.kt

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,13 @@ import com.expediagroup.graphql.hooks.FlowSubscriptionSchemaGeneratorHooks
2323
import com.expediagroup.graphql.toSchema
2424
import graphql.ExecutionInput
2525
import graphql.ExecutionResult
26+
import graphql.ExecutionResultImpl
2627
import graphql.GraphQL
2728
import graphql.GraphQLError
2829
import graphql.GraphqlErrorBuilder
30+
import graphql.execution.DataFetcherResult
31+
import graphql.execution.instrumentation.SimpleInstrumentation
32+
import graphql.execution.instrumentation.parameters.InstrumentationExecutionParameters
2933
import graphql.schema.GraphQLSchema
3034
import kotlinx.coroutines.InternalCoroutinesApi
3135
import kotlinx.coroutines.delay
@@ -38,6 +42,7 @@ import kotlinx.coroutines.reactive.asPublisher
3842
import kotlinx.coroutines.runBlocking
3943
import org.junit.jupiter.api.Test
4044
import org.reactivestreams.Publisher
45+
import java.util.concurrent.CompletableFuture
4146
import kotlin.test.assertEquals
4247
import kotlin.test.assertNull
4348
import kotlin.test.assertTrue
@@ -47,14 +52,17 @@ class FlowSubscriptionExecutionStrategyTest {
4752

4853
private val testSchema: GraphQLSchema = toSchema(
4954
config = SchemaGeneratorConfig(
50-
supportedPackages = listOf("com.expediagroup.graphql.spring.execution"),
55+
supportedPackages = listOf("com.expediagroup.graphql.execution"),
5156
hooks = FlowSubscriptionSchemaGeneratorHooks()
5257
),
5358
queries = listOf(TopLevelObject(BasicQuery())),
5459
mutations = listOf(TopLevelObject(BasicQuery())),
5560
subscriptions = listOf(TopLevelObject(FlowSubscription()))
5661
)
57-
private val testGraphQL: GraphQL = GraphQL.newGraphQL(testSchema).subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy()).build()
62+
private val testGraphQL: GraphQL = GraphQL.newGraphQL(testSchema)
63+
.subscriptionExecutionStrategy(FlowSubscriptionExecutionStrategy())
64+
.instrumentation(TestInstrumentation())
65+
.build()
5866

5967
@Test
6068
fun `verify subscription to flow`() = runBlocking {
@@ -64,6 +72,24 @@ class FlowSubscriptionExecutionStrategyTest {
6472
val list = mutableListOf<Int>()
6573
flow.collect {
6674
list.add(it.getData<Map<String, Int>>().getValue("ticker"))
75+
assertEquals(it.extensions["testKey"], "testValue")
76+
}
77+
assertEquals(5, list.size)
78+
for (i in list.indices) {
79+
assertEquals(i + 1, list[i])
80+
}
81+
}
82+
83+
@Test
84+
fun `verify subscription to datafetcher flow`() = runBlocking {
85+
val request = ExecutionInput.newExecutionInput().query("subscription { datafetcher }").build()
86+
val response = testGraphQL.execute(request)
87+
val flow = response.getData<Flow<ExecutionResult>>()
88+
val list = mutableListOf<Int>()
89+
flow.collect {
90+
val intVal = it.getData<Map<String, Int>>().getValue("datafetcher")
91+
list.add(intVal)
92+
assertEquals(it.extensions["testKey"], "testValue")
6793
}
6894
assertEquals(5, list.size)
6995
for (i in list.indices) {
@@ -163,6 +189,20 @@ class FlowSubscriptionExecutionStrategyTest {
163189
fun query(): String = "hello"
164190
}
165191

192+
class TestInstrumentation : SimpleInstrumentation() {
193+
override fun instrumentExecutionResult(
194+
executionResult: ExecutionResult,
195+
parameters: InstrumentationExecutionParameters?
196+
): CompletableFuture<ExecutionResult> {
197+
return CompletableFuture.completedFuture(
198+
ExecutionResultImpl.newExecutionResult()
199+
.from(executionResult)
200+
.addExtension("testKey", "testValue")
201+
.build()
202+
)
203+
}
204+
}
205+
166206
class FlowSubscription {
167207
fun ticker(): Flow<Int> {
168208
return flow {
@@ -173,6 +213,15 @@ class FlowSubscriptionExecutionStrategyTest {
173213
}
174214
}
175215

216+
fun datafetcher(): Flow<DataFetcherResult<Int>> {
217+
return flow {
218+
for (i in 1..5) {
219+
delay(100)
220+
emit(DataFetcherResult(i, listOf()))
221+
}
222+
}
223+
}
224+
176225
fun publisherTicker(): Publisher<Int> {
177226
return flow {
178227
for (i in 1..5) {

0 commit comments

Comments
 (0)