Skip to content

Commit d854425

Browse files
authored
Added support for reactive Publishers to be returned from data fetchers (graphql-java#3731)
* Added support for reactive Publishers to be returned from data fetchers * Added support for reactive Publishers to be returned from data fetchers - review tweaks * Added support for reactive Publishers to be returned from data fetchers - never on subscriptions * Added support for reactive Publishers to be returned from data fetchers - never on subscriptions with tests working
1 parent 7fe1826 commit d854425

File tree

7 files changed

+509
-21
lines changed

7 files changed

+509
-21
lines changed

src/main/java/graphql/DuckTyped.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
package graphql;
2+
3+
import java.lang.annotation.Retention;
4+
import java.lang.annotation.RetentionPolicy;
5+
import java.lang.annotation.Target;
6+
7+
import static java.lang.annotation.ElementType.METHOD;
8+
import static java.lang.annotation.ElementType.PARAMETER;
9+
10+
/**
11+
* An annotation that marks a method return value or method parameter as returning a duck type value.
12+
* <p>
13+
* For efficiency reasons, the graphql engine methods can return {@link Object} values
14+
* which maybe two well known types of values. Often a {@link java.util.concurrent.CompletableFuture}
15+
* or a plain old {@link Object}, to represent an async value or a materialised value.
16+
*/
17+
@Internal
18+
@Retention(RetentionPolicy.RUNTIME)
19+
@Target(value = {METHOD, PARAMETER})
20+
public @interface DuckTyped {
21+
String shape();
22+
}

src/main/java/graphql/Internal.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.lang.annotation.RetentionPolicy;
55
import java.lang.annotation.Target;
66

7+
import static java.lang.annotation.ElementType.ANNOTATION_TYPE;
78
import static java.lang.annotation.ElementType.CONSTRUCTOR;
89
import static java.lang.annotation.ElementType.FIELD;
910
import static java.lang.annotation.ElementType.METHOD;
@@ -17,6 +18,6 @@
1718
* In general unnecessary changes will be avoided but you should not depend on internal classes being stable
1819
*/
1920
@Retention(RetentionPolicy.RUNTIME)
20-
@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD, PACKAGE})
21+
@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD, PACKAGE, ANNOTATION_TYPE})
2122
public @interface Internal {
2223
}

src/main/java/graphql/execution/ExecutionStrategy.java

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.google.common.collect.ImmutableList;
44
import com.google.common.collect.Maps;
5+
import graphql.DuckTyped;
56
import graphql.ExecutionResult;
67
import graphql.ExecutionResultImpl;
78
import graphql.ExperimentalApi;
@@ -24,6 +25,7 @@
2425
import graphql.execution.instrumentation.parameters.InstrumentationFieldCompleteParameters;
2526
import graphql.execution.instrumentation.parameters.InstrumentationFieldFetchParameters;
2627
import graphql.execution.instrumentation.parameters.InstrumentationFieldParameters;
28+
import graphql.execution.reactive.ReactiveSupport;
2729
import graphql.extensions.ExtensionsBuilder;
2830
import graphql.introspection.Introspection;
2931
import graphql.language.Argument;
@@ -197,8 +199,8 @@ public static String mkNameForPath(List<Field> currentField) {
197199
* @throws NonNullableFieldWasNullException in the {@link CompletableFuture} if a non-null field resolved to a null value
198200
*/
199201
@SuppressWarnings("unchecked")
200-
protected Object /* CompletableFuture<Map<String, Object>> | Map<String, Object> */
201-
executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
202+
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>")
203+
protected Object executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
202204
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
203205
dataLoaderDispatcherStrategy.executeObject(executionContext, parameters);
204206
Instrumentation instrumentation = executionContext.getInstrumentation();
@@ -356,8 +358,8 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo(
356358
* @throws NonNullableFieldWasNullException in the future if a non-null field resolved to a null value
357359
*/
358360
@SuppressWarnings("unchecked")
359-
protected Object /* CompletableFuture<Object> | Object */
360-
resolveField(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
361+
@DuckTyped(shape = " CompletableFuture<Object> | Object")
362+
protected Object resolveField(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
361363
Object fieldWithInfo = resolveFieldWithInfo(executionContext, parameters);
362364
if (fieldWithInfo instanceof CompletableFuture) {
363365
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(FieldValueInfo::getFieldValueFuture);
@@ -384,8 +386,8 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo(
384386
* if a nonnull field resolves to a null value
385387
*/
386388
@SuppressWarnings("unchecked")
387-
protected Object /* CompletableFuture<FieldValueInfo> | FieldValueInfo */
388-
resolveFieldWithInfo(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
389+
@DuckTyped(shape = "CompletableFuture<FieldValueInfo> | FieldValueInfo")
390+
protected Object resolveFieldWithInfo(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
389391
GraphQLFieldDefinition fieldDef = getFieldDef(executionContext, parameters, parameters.getField().getSingleField());
390392
Supplier<ExecutionStepInfo> executionStepInfo = FpKit.intraThreadMemoize(() -> createExecutionStepInfo(executionContext, parameters, fieldDef, null));
391393

@@ -430,16 +432,16 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo(
430432
*
431433
* @throws NonNullableFieldWasNullException in the future if a non null field resolves to a null value
432434
*/
433-
protected Object /*CompletableFuture<FetchedValue> | FetchedValue>*/
434-
fetchField(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
435+
@DuckTyped(shape = "CompletableFuture<FetchedValue> | FetchedValue")
436+
protected Object fetchField(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
435437
MergedField field = parameters.getField();
436438
GraphQLObjectType parentType = (GraphQLObjectType) parameters.getExecutionStepInfo().getUnwrappedNonNullType();
437439
GraphQLFieldDefinition fieldDef = getFieldDef(executionContext.getGraphQLSchema(), parentType, field.getSingleField());
438440
return fetchField(fieldDef, executionContext, parameters);
439441
}
440442

441-
private Object /*CompletableFuture<FetchedValue> | FetchedValue>*/
442-
fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
443+
@DuckTyped(shape = "CompletableFuture<FetchedValue> | FetchedValue")
444+
private Object fetchField(GraphQLFieldDefinition fieldDef, ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
443445

444446
if (incrementAndCheckMaxNodesExceeded(executionContext)) {
445447
return new FetchedValue(null, Collections.emptyList(), null);
@@ -498,6 +500,11 @@ Async.CombinedBuilder<FieldValueInfo> getAsyncFieldValueInfo(
498500
executionContext.getDataLoaderDispatcherStrategy().fieldFetched(executionContext, parameters, dataFetcher, fetchedObject);
499501
fetchCtx.onDispatched();
500502
fetchCtx.onFetchedValue(fetchedObject);
503+
// if it's a subscription, leave any reactive objects alone
504+
if (!executionContext.isSubscriptionOperation()) {
505+
// possible convert reactive objects into CompletableFutures
506+
fetchedObject = ReactiveSupport.fetchedObject(fetchedObject);
507+
}
501508
if (fetchedObject instanceof CompletableFuture) {
502509
@SuppressWarnings("unchecked")
503510
CompletableFuture<Object> fetchedValue = (CompletableFuture<Object>) fetchedObject;
@@ -737,8 +744,8 @@ private FieldValueInfo getFieldValueInfoForNull(ExecutionStrategyParameters para
737744
*
738745
* @throws NonNullableFieldWasNullException inside the {@link CompletableFuture} if a non-null field resolves to a null value
739746
*/
740-
protected Object /* CompletableFuture<Object> | Object */
741-
completeValueForNull(ExecutionStrategyParameters parameters) {
747+
@DuckTyped(shape = "CompletableFuture<Object> | Object")
748+
protected Object completeValueForNull(ExecutionStrategyParameters parameters) {
742749
try {
743750
return parameters.getNonNullFieldValidator().validate(parameters, null);
744751
} catch (Exception e) {
@@ -876,8 +883,8 @@ protected <T> void handleValueException(CompletableFuture<T> overallResult, Thro
876883
*
877884
* @return a materialized scalar value or exceptionally completed {@link CompletableFuture} if there is a problem
878885
*/
879-
protected Object /* CompletableFuture<Object> | Object */
880-
completeValueForScalar(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLScalarType scalarType, Object result) {
886+
@DuckTyped(shape = "CompletableFuture<Object> | Object")
887+
protected Object completeValueForScalar(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLScalarType scalarType, Object result) {
881888
Object serialized;
882889
try {
883890
serialized = scalarType.getCoercing().serialize(result, executionContext.getGraphQLContext(), executionContext.getLocale());
@@ -903,8 +910,8 @@ protected <T> void handleValueException(CompletableFuture<T> overallResult, Thro
903910
*
904911
* @return a materialized enum value or exceptionally completed {@link CompletableFuture} if there is a problem
905912
*/
906-
protected Object /* CompletableFuture<Object> | Object */
907-
completeValueForEnum(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLEnumType enumType, Object result) {
913+
@DuckTyped(shape = "CompletableFuture<Object> | Object")
914+
protected Object completeValueForEnum(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLEnumType enumType, Object result) {
908915
Object serialized;
909916
try {
910917
serialized = enumType.serialize(result, executionContext.getGraphQLContext(), executionContext.getLocale());
@@ -929,8 +936,8 @@ protected <T> void handleValueException(CompletableFuture<T> overallResult, Thro
929936
*
930937
* @return a {@link CompletableFuture} promise to a map of object field values or a materialized map of object field values
931938
*/
932-
protected Object /* CompletableFuture<Map<String, Object>> | Map<String, Object> */
933-
completeValueForObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLObjectType resolvedObjectType, Object result) {
939+
@DuckTyped(shape = "CompletableFuture<Map<String, Object>> | Map<String, Object>")
940+
protected Object completeValueForObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters, GraphQLObjectType resolvedObjectType, Object result) {
934941
ExecutionStepInfo executionStepInfo = parameters.getExecutionStepInfo();
935942

936943
FieldCollectorParameters collectorParameters = newParameters()
@@ -1000,7 +1007,6 @@ private void handleTypeMismatchProblem(ExecutionContext context, ExecutionStrate
10001007
* if max nodes were exceeded for this request.
10011008
*
10021009
* @param executionContext the execution context in play
1003-
*
10041010
* @return true if max nodes were exceeded
10051011
*/
10061012
private boolean incrementAndCheckMaxNodesExceeded(ExecutionContext executionContext) {
@@ -1053,7 +1059,6 @@ protected GraphQLFieldDefinition getFieldDef(GraphQLSchema schema, GraphQLObject
10531059
*
10541060
* @param e this indicates that a null value was returned for a non null field, which needs to cause the parent field
10551061
* to become null OR continue on as an exception
1056-
*
10571062
* @throws NonNullableFieldWasNullException if a non null field resolves to a null value
10581063
*/
10591064
protected void assertNonNullFieldPrecondition(NonNullableFieldWasNullException e) throws NonNullableFieldWasNullException {
@@ -1167,4 +1172,6 @@ private static void addErrorsToRightContext(List<GraphQLError> errors, Execution
11671172
executionContext.addErrors(errors);
11681173
}
11691174
}
1175+
1176+
11701177
}
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
package graphql.execution.reactive;
2+
3+
import graphql.DuckTyped;
4+
import graphql.Internal;
5+
import org.reactivestreams.Publisher;
6+
import org.reactivestreams.Subscriber;
7+
import org.reactivestreams.Subscription;
8+
9+
import java.util.Objects;
10+
import java.util.concurrent.CompletableFuture;
11+
import java.util.concurrent.Flow;
12+
import java.util.concurrent.atomic.AtomicReference;
13+
14+
/**
15+
* This provides support for a DataFetcher to be able to
16+
* return a reactive streams {@link Publisher} or Java JDK {@link Flow.Publisher}
17+
* as a value, and it can be turned into a {@link CompletableFuture}
18+
* that we can get an async value from.
19+
*/
20+
@Internal
21+
public class ReactiveSupport {
22+
23+
@DuckTyped(shape = "CompletableFuture | Object")
24+
public static Object fetchedObject(Object fetchedObject) {
25+
if (fetchedObject instanceof Flow.Publisher) {
26+
return flowPublisherToCF((Flow.Publisher<?>) fetchedObject);
27+
}
28+
if (fetchedObject instanceof Publisher) {
29+
return reactivePublisherToCF((Publisher<?>) fetchedObject);
30+
}
31+
return fetchedObject;
32+
}
33+
34+
private static CompletableFuture<Object> reactivePublisherToCF(Publisher<?> publisher) {
35+
ReactivePublisherToCompletableFuture<Object> cf = new ReactivePublisherToCompletableFuture<>();
36+
publisher.subscribe(cf);
37+
return cf;
38+
}
39+
40+
private static CompletableFuture<Object> flowPublisherToCF(Flow.Publisher<?> publisher) {
41+
FlowPublisherToCompletableFuture<Object> cf = new FlowPublisherToCompletableFuture<>();
42+
publisher.subscribe(cf);
43+
return cf;
44+
}
45+
46+
/**
47+
* The implementations between reactive Publishers and Flow.Publishers are almost exactly the same except the
48+
* subscription class is different. So this is a common class that contains most of the common logic
49+
*
50+
* @param <T> for two
51+
* @param <S> for subscription
52+
*/
53+
private static abstract class PublisherToCompletableFuture<T, S> extends CompletableFuture<T> {
54+
55+
private final AtomicReference<S> subscriptionRef = new AtomicReference<>();
56+
57+
abstract void doSubscriptionCancel(S s);
58+
59+
@SuppressWarnings("SameParameterValue")
60+
abstract void doSubscriptionRequest(S s, long n);
61+
62+
private boolean validateSubscription(S current, S next) {
63+
Objects.requireNonNull(next, "Subscription cannot be null");
64+
if (current != null) {
65+
doSubscriptionCancel(next);
66+
return false;
67+
}
68+
return true;
69+
}
70+
71+
/**
72+
* This overrides the {@link CompletableFuture#cancel(boolean)} method
73+
* such that subscription is also cancelled.
74+
*
75+
* @param mayInterruptIfRunning this value has no effect in this
76+
* implementation because interrupts are not used to control
77+
* processing.
78+
* @return a boolean if it was cancelled
79+
*/
80+
@Override
81+
public boolean cancel(boolean mayInterruptIfRunning) {
82+
boolean cancelled = super.cancel(mayInterruptIfRunning);
83+
if (cancelled) {
84+
S s = subscriptionRef.getAndSet(null);
85+
if (s != null) {
86+
doSubscriptionCancel(s);
87+
}
88+
}
89+
return cancelled;
90+
}
91+
92+
void onSubscribeImpl(S s) {
93+
if (validateSubscription(subscriptionRef.getAndSet(s), s)) {
94+
doSubscriptionRequest(s, Long.MAX_VALUE);
95+
}
96+
}
97+
98+
void onNextImpl(T t) {
99+
S s = subscriptionRef.getAndSet(null);
100+
if (s != null) {
101+
complete(t);
102+
doSubscriptionCancel(s);
103+
}
104+
}
105+
106+
void onErrorImpl(Throwable t) {
107+
if (subscriptionRef.getAndSet(null) != null) {
108+
completeExceptionally(t);
109+
}
110+
}
111+
112+
void onCompleteImpl() {
113+
if (subscriptionRef.getAndSet(null) != null) {
114+
complete(null);
115+
}
116+
}
117+
}
118+
119+
private static class ReactivePublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Subscription> implements Subscriber<T> {
120+
121+
@Override
122+
void doSubscriptionCancel(Subscription subscription) {
123+
subscription.cancel();
124+
}
125+
126+
@Override
127+
void doSubscriptionRequest(Subscription subscription, long n) {
128+
subscription.request(n);
129+
}
130+
131+
@Override
132+
public void onSubscribe(Subscription s) {
133+
onSubscribeImpl(s);
134+
}
135+
136+
@Override
137+
public void onNext(T t) {
138+
onNextImpl(t);
139+
}
140+
141+
@Override
142+
public void onError(Throwable t) {
143+
onErrorImpl(t);
144+
}
145+
146+
@Override
147+
public void onComplete() {
148+
onCompleteImpl();
149+
}
150+
}
151+
152+
private static class FlowPublisherToCompletableFuture<T> extends PublisherToCompletableFuture<T, Flow.Subscription> implements Flow.Subscriber<T> {
153+
154+
@Override
155+
void doSubscriptionCancel(Flow.Subscription subscription) {
156+
subscription.cancel();
157+
}
158+
159+
@Override
160+
void doSubscriptionRequest(Flow.Subscription subscription, long n) {
161+
subscription.request(n);
162+
}
163+
164+
@Override
165+
public void onSubscribe(Flow.Subscription s) {
166+
onSubscribeImpl(s);
167+
}
168+
169+
@Override
170+
public void onNext(T t) {
171+
onNextImpl(t);
172+
}
173+
174+
@Override
175+
public void onError(Throwable t) {
176+
onErrorImpl(t);
177+
}
178+
179+
@Override
180+
public void onComplete() {
181+
onCompleteImpl();
182+
}
183+
}
184+
}

0 commit comments

Comments
 (0)