Skip to content

Commit 7fe1826

Browse files
bbakermanandimarek
andauthored
DataLoader on Mutations and Queries (graphql-java#3737)
* POC - DataLoader on Mutations and Queries * POC - DataLoader on Mutations and Queries - hanging test ignored for now * POC - DataLoader on Mutations and Queries - added extra calls * POC - DataLoader on Mutations and Queries - added new method * simplify test * make serial execution work with per level field tracking + some refactoring * DataLoader on Mutations and Queries - added more test methods * DataLoader on Mutations and Queries - tweaked AsyncSerialExecutionStrategy.java --------- Co-authored-by: Andreas Marek <[email protected]>
1 parent 8a7a525 commit 7fe1826

10 files changed

+504
-54
lines changed

src/main/java/graphql/execution/AsyncExecutionStrategy.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,14 +72,14 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
7272
for (FieldValueInfo completeValueInfo : completeValueInfos) {
7373
fieldValuesFutures.addObject(completeValueInfo.getFieldValueObject());
7474
}
75-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos, parameters);
75+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(completeValueInfos);
7676
executionStrategyCtx.onFieldValuesInfo(completeValueInfos);
7777
fieldValuesFutures.await().whenComplete(handleResultsConsumer);
7878
}).exceptionally((ex) -> {
7979
// if there are any issues with combining/handling the field results,
8080
// complete the future at all costs and bubble up any thrown exception so
8181
// the execution does not hang.
82-
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex, parameters);
82+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesException(ex);
8383
executionStrategyCtx.onFieldValuesException();
8484
overallResult.completeExceptionally(ex);
8585
return null;

src/main/java/graphql/execution/AsyncSerialExecutionStrategy.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public AsyncSerialExecutionStrategy(DataFetcherExceptionHandler exceptionHandler
3232
@Override
3333
@SuppressWarnings({"TypeParameterUnusedInFormals", "FutureReturnValueIgnored"})
3434
public CompletableFuture<ExecutionResult> execute(ExecutionContext executionContext, ExecutionStrategyParameters parameters) throws NonNullableFieldWasNullException {
35-
executionContext.getDataLoaderDispatcherStrategy().executionStrategy(executionContext, parameters);
35+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy = executionContext.getDataLoaderDispatcherStrategy();
3636

3737
Instrumentation instrumentation = executionContext.getInstrumentation();
3838
InstrumentationExecutionStrategyParameters instrumentationParameters = new InstrumentationExecutionStrategyParameters(executionContext, parameters);
@@ -54,7 +54,8 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
5454
ResultPath fieldPath = parameters.getPath().segment(mkNameForPath(currentField));
5555
ExecutionStrategyParameters newParameters = parameters
5656
.transform(builder -> builder.field(currentField).path(fieldPath));
57-
return resolveField(executionContext, newParameters);
57+
58+
return resolveSerialField(executionContext, dataLoaderDispatcherStrategy, newParameters);
5859
});
5960

6061
CompletableFuture<ExecutionResult> overallResult = new CompletableFuture<>();
@@ -65,4 +66,22 @@ public CompletableFuture<ExecutionResult> execute(ExecutionContext executionCont
6566
return overallResult;
6667
}
6768

69+
private Object resolveSerialField(ExecutionContext executionContext,
70+
DataLoaderDispatchStrategy dataLoaderDispatcherStrategy,
71+
ExecutionStrategyParameters newParameters) {
72+
dataLoaderDispatcherStrategy.executionSerialStrategy(executionContext, newParameters);
73+
74+
Object fieldWithInfo = resolveFieldWithInfo(executionContext, newParameters);
75+
if (fieldWithInfo instanceof CompletableFuture) {
76+
//noinspection unchecked
77+
return ((CompletableFuture<FieldValueInfo>) fieldWithInfo).thenCompose(fvi -> {
78+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
79+
return fvi.getFieldValueFuture();
80+
});
81+
} else {
82+
FieldValueInfo fvi = (FieldValueInfo) fieldWithInfo;
83+
dataLoaderDispatcherStrategy.executionStrategyOnFieldValuesInfo(List.of(fvi));
84+
return fvi.getFieldValueObject();
85+
}
86+
}
6887
}

src/main/java/graphql/execution/DataLoaderDispatchStrategy.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,15 @@ default void executionStrategy(ExecutionContext executionContext, ExecutionStrat
1616

1717
}
1818

19-
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
19+
default void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
2020

2121
}
2222

23-
default void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters parameters) {
23+
default void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
24+
25+
}
26+
27+
default void executionStrategyOnFieldValuesException(Throwable t) {
2428

2529
}
2630

src/main/java/graphql/execution/Execution.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ private DataLoaderDispatchStrategy createDataLoaderDispatchStrategy(ExecutionCon
228228
if (executionContext.getDataLoaderRegistry() == EMPTY_DATALOADER_REGISTRY || doNotAutomaticallyDispatchDataLoader) {
229229
return DataLoaderDispatchStrategy.NO_OP;
230230
}
231-
if (executionStrategy instanceof AsyncExecutionStrategy) {
231+
if (! executionContext.isSubscriptionOperation()) {
232232
boolean deferEnabled = Optional.ofNullable(executionContext.getGraphQLContext())
233233
.map(graphqlContext -> graphqlContext.getBoolean(ExperimentalApi.ENABLE_INCREMENTAL_SUPPORT))
234234
.orElse(false);

src/main/java/graphql/execution/ExecutionContext.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,34 @@ public ValueUnboxer getValueUnboxer() {
170170
return valueUnboxer;
171171
}
172172

173+
/**
174+
* @return true if the current operation is a Query
175+
*/
176+
public boolean isQueryOperation() {
177+
return isOpType(OperationDefinition.Operation.QUERY);
178+
}
179+
180+
/**
181+
* @return true if the current operation is a Mutation
182+
*/
183+
public boolean isMutationOperation() {
184+
return isOpType(OperationDefinition.Operation.MUTATION);
185+
}
186+
187+
/**
188+
* @return true if the current operation is a Subscription
189+
*/
190+
public boolean isSubscriptionOperation() {
191+
return isOpType(OperationDefinition.Operation.SUBSCRIPTION);
192+
}
193+
194+
private boolean isOpType(OperationDefinition.Operation operation) {
195+
if (operationDefinition != null) {
196+
return operation.equals(operationDefinition.getOperation());
197+
}
198+
return false;
199+
}
200+
173201
/**
174202
* This method will only put one error per field path.
175203
*

src/main/java/graphql/execution/instrumentation/dataloader/FallbackDataLoaderDispatchStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88

99
/**
10-
* Used when the execution strategy is not an AsyncExecutionStrategy: simply dispatch always after each DF.
10+
* Used when we cant guarantee the fields will be counted right: simply dispatch always after each DF.
1111
*/
1212
@Internal
1313
public class FallbackDataLoaderDispatchStrategy implements DataLoaderDispatchStrategy {

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategy.java

Lines changed: 77 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import graphql.execution.ExecutionContext;
77
import graphql.execution.ExecutionStrategyParameters;
88
import graphql.execution.FieldValueInfo;
9-
import graphql.execution.MergedField;
109
import graphql.schema.DataFetcher;
1110
import graphql.util.LockKit;
1211
import org.dataloader.DataLoaderRegistry;
@@ -27,55 +26,81 @@ private static class CallStack {
2726
private final LockKit.ReentrantLock lock = new LockKit.ReentrantLock();
2827
private final LevelMap expectedFetchCountPerLevel = new LevelMap();
2928
private final LevelMap fetchCountPerLevel = new LevelMap();
30-
private final LevelMap expectedStrategyCallsPerLevel = new LevelMap();
31-
private final LevelMap happenedStrategyCallsPerLevel = new LevelMap();
29+
30+
private final LevelMap expectedExecuteObjectCallsPerLevel = new LevelMap();
31+
private final LevelMap happenedExecuteObjectCallsPerLevel = new LevelMap();
32+
3233
private final LevelMap happenedOnFieldValueCallsPerLevel = new LevelMap();
3334

3435
private final Set<Integer> dispatchedLevels = new LinkedHashSet<>();
3536

3637
public CallStack() {
37-
expectedStrategyCallsPerLevel.set(1, 1);
38+
expectedExecuteObjectCallsPerLevel.set(1, 1);
3839
}
3940

4041
void increaseExpectedFetchCount(int level, int count) {
4142
expectedFetchCountPerLevel.increment(level, count);
4243
}
4344

45+
void clearExpectedFetchCount() {
46+
expectedFetchCountPerLevel.clear();
47+
}
48+
4449
void increaseFetchCount(int level) {
4550
fetchCountPerLevel.increment(level, 1);
4651
}
4752

48-
void increaseExpectedStrategyCalls(int level, int count) {
49-
expectedStrategyCallsPerLevel.increment(level, count);
53+
void clearFetchCount() {
54+
fetchCountPerLevel.clear();
55+
}
56+
57+
void increaseExpectedExecuteObjectCalls(int level, int count) {
58+
expectedExecuteObjectCallsPerLevel.increment(level, count);
5059
}
5160

52-
void increaseHappenedStrategyCalls(int level) {
53-
happenedStrategyCallsPerLevel.increment(level, 1);
61+
void clearExpectedObjectCalls() {
62+
expectedExecuteObjectCallsPerLevel.clear();
63+
}
64+
65+
void increaseHappenedExecuteObjectCalls(int level) {
66+
happenedExecuteObjectCallsPerLevel.increment(level, 1);
67+
}
68+
69+
void clearHappenedExecuteObjectCalls() {
70+
happenedExecuteObjectCallsPerLevel.clear();
5471
}
5572

5673
void increaseHappenedOnFieldValueCalls(int level) {
5774
happenedOnFieldValueCallsPerLevel.increment(level, 1);
5875
}
5976

60-
boolean allStrategyCallsHappened(int level) {
61-
return happenedStrategyCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level);
77+
void clearHappenedOnFieldValueCalls() {
78+
happenedOnFieldValueCallsPerLevel.clear();
79+
}
80+
81+
boolean allExecuteObjectCallsHappened(int level) {
82+
return happenedExecuteObjectCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level);
6283
}
6384

6485
boolean allOnFieldCallsHappened(int level) {
65-
return happenedOnFieldValueCallsPerLevel.get(level) == expectedStrategyCallsPerLevel.get(level);
86+
return happenedOnFieldValueCallsPerLevel.get(level) == expectedExecuteObjectCallsPerLevel.get(level);
6687
}
6788

6889
boolean allFetchesHappened(int level) {
6990
return fetchCountPerLevel.get(level) == expectedFetchCountPerLevel.get(level);
7091
}
7192

93+
void clearDispatchLevels() {
94+
dispatchedLevels.clear();
95+
}
96+
7297
@Override
7398
public String toString() {
7499
return "CallStack{" +
75100
"expectedFetchCountPerLevel=" + expectedFetchCountPerLevel +
76101
", fetchCountPerLevel=" + fetchCountPerLevel +
77-
", expectedStrategyCallsPerLevel=" + expectedStrategyCallsPerLevel +
78-
", happenedStrategyCallsPerLevel=" + happenedStrategyCallsPerLevel +
102+
", expectedExecuteObjectCallsPerLevel=" + expectedExecuteObjectCallsPerLevel +
103+
", happenedExecuteObjectCallsPerLevel=" + happenedExecuteObjectCallsPerLevel +
79104
", happenedOnFieldValueCallsPerLevel=" + happenedOnFieldValueCallsPerLevel +
80105
", dispatchedLevels" + dispatchedLevels +
81106
'}';
@@ -105,33 +130,37 @@ public void executeDeferredOnFieldValueInfo(FieldValueInfo fieldValueInfo, Execu
105130
@Override
106131
public void executionStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
107132
int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1;
108-
increaseCallCounts(curLevel, parameters);
133+
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters);
109134
}
110135

111136
@Override
112-
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
113-
int curLevel = parameters.getPath().getLevel() + 1;
114-
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
137+
public void executionSerialStrategy(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
138+
resetCallStack();
139+
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(1, 1);
140+
}
141+
142+
@Override
143+
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
144+
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1);
115145
}
116146

117-
public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters executionStrategyParameters) {
118-
int curLevel = executionStrategyParameters.getPath().getLevel() + 1;
147+
public void executionStrategyOnFieldValuesException(Throwable t) {
119148
callStack.lock.runLocked(() ->
120-
callStack.increaseHappenedOnFieldValueCalls(curLevel)
149+
callStack.increaseHappenedOnFieldValueCalls(1)
121150
);
122151
}
123152

124153

125154
@Override
126155
public void executeObject(ExecutionContext executionContext, ExecutionStrategyParameters parameters) {
127156
int curLevel = parameters.getExecutionStepInfo().getPath().getLevel() + 1;
128-
increaseCallCounts(curLevel, parameters);
157+
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, parameters);
129158
}
130159

131160
@Override
132161
public void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
133162
int curLevel = parameters.getPath().getLevel() + 1;
134-
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
163+
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel);
135164
}
136165

137166

@@ -143,16 +172,30 @@ public void executeObjectOnFieldValuesException(Throwable t, ExecutionStrategyPa
143172
);
144173
}
145174

175+
private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, ExecutionStrategyParameters executionStrategyParameters) {
176+
increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(curLevel, executionStrategyParameters.getFields().size());
177+
}
146178

147-
private void increaseCallCounts(int curLevel, ExecutionStrategyParameters executionStrategyParameters) {
148-
int fieldCount = executionStrategyParameters.getFields().size();
179+
private void increaseHappenedExecuteObjectAndIncreaseExpectedFetchCount(int curLevel, int fieldCount) {
149180
callStack.lock.runLocked(() -> {
181+
callStack.increaseHappenedExecuteObjectCalls(curLevel);
150182
callStack.increaseExpectedFetchCount(curLevel, fieldCount);
151-
callStack.increaseHappenedStrategyCalls(curLevel);
152183
});
153184
}
154185

155-
private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel, ExecutionStrategyParameters parameters) {
186+
private void resetCallStack() {
187+
callStack.lock.runLocked(() -> {
188+
callStack.clearDispatchLevels();
189+
callStack.clearExpectedObjectCalls();
190+
callStack.clearExpectedFetchCount();
191+
callStack.clearFetchCount();
192+
callStack.clearHappenedExecuteObjectCalls();
193+
callStack.clearHappenedOnFieldValueCalls();
194+
callStack.expectedExecuteObjectCallsPerLevel.set(1, 1);
195+
});
196+
}
197+
198+
private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel) {
156199
boolean dispatchNeeded = callStack.lock.callLocked(() ->
157200
handleOnFieldValuesInfo(fieldValueInfoList, curLevel)
158201
);
@@ -166,18 +209,21 @@ private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueIn
166209
//
167210
private boolean handleOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfos, int curLevel) {
168211
callStack.increaseHappenedOnFieldValueCalls(curLevel);
169-
int expectedStrategyCalls = getCountForList(fieldValueInfos);
170-
callStack.increaseExpectedStrategyCalls(curLevel + 1, expectedStrategyCalls);
212+
int expectedOnObjectCalls = getObjectCountForList(fieldValueInfos);
213+
callStack.increaseExpectedExecuteObjectCalls(curLevel + 1, expectedOnObjectCalls);
171214
return dispatchIfNeeded(curLevel + 1);
172215
}
173216

174-
private int getCountForList(List<FieldValueInfo> fieldValueInfos) {
217+
/**
218+
* the amount of (non nullable) objects that will require an execute object call
219+
*/
220+
private int getObjectCountForList(List<FieldValueInfo> fieldValueInfos) {
175221
int result = 0;
176222
for (FieldValueInfo fieldValueInfo : fieldValueInfos) {
177223
if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.OBJECT) {
178224
result += 1;
179225
} else if (fieldValueInfo.getCompleteValueType() == FieldValueInfo.CompleteValueType.LIST) {
180-
result += getCountForList(fieldValueInfo.getFieldValueInfos());
226+
result += getObjectCountForList(fieldValueInfo.getFieldValueInfos());
181227
}
182228
}
183229
return result;
@@ -221,7 +267,7 @@ private boolean levelReady(int level) {
221267
return callStack.allFetchesHappened(1);
222268
}
223269
if (levelReady(level - 1) && callStack.allOnFieldCallsHappened(level - 1)
224-
&& callStack.allStrategyCallsHappened(level) && callStack.allFetchesHappened(level)) {
270+
&& callStack.allExecuteObjectCallsHappened(level) && callStack.allFetchesHappened(level)) {
225271

226272
return true;
227273
}

src/main/java/graphql/execution/instrumentation/dataloader/PerLevelDataLoaderDispatchStrategyWithDeferAlwaysDispatch.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -137,19 +137,17 @@ public void executeObject(ExecutionContext executionContext, ExecutionStrategyPa
137137
}
138138

139139
@Override
140-
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList, ExecutionStrategyParameters parameters) {
140+
public void executionStrategyOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoList) {
141141
if (this.startedDeferredExecution.get()) {
142142
this.dispatch();
143143
}
144-
int curLevel = parameters.getPath().getLevel() + 1;
145-
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
144+
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, 1);
146145
}
147146

148147
@Override
149-
public void executionStrategyOnFieldValuesException(Throwable t, ExecutionStrategyParameters executionStrategyParameters) {
150-
int curLevel = executionStrategyParameters.getPath().getLevel() + 1;
148+
public void executionStrategyOnFieldValuesException(Throwable t) {
151149
callStack.lock.runLocked(() ->
152-
callStack.increaseHappenedOnFieldValueCalls(curLevel)
150+
callStack.increaseHappenedOnFieldValueCalls(1)
153151
);
154152
}
155153

@@ -159,7 +157,7 @@ public void executeObjectOnFieldValuesInfo(List<FieldValueInfo> fieldValueInfoLi
159157
this.dispatch();
160158
}
161159
int curLevel = parameters.getPath().getLevel() + 1;
162-
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel, parameters);
160+
onFieldValuesInfoDispatchIfNeeded(fieldValueInfoList, curLevel);
163161
}
164162

165163

@@ -207,7 +205,7 @@ private void increaseCallCounts(int curLevel, ExecutionStrategyParameters parame
207205
});
208206
}
209207

210-
private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel, ExecutionStrategyParameters parameters) {
208+
private void onFieldValuesInfoDispatchIfNeeded(List<FieldValueInfo> fieldValueInfoList, int curLevel) {
211209
boolean dispatchNeeded = callStack.lock.callLocked(() ->
212210
handleOnFieldValuesInfo(fieldValueInfoList, curLevel)
213211
);

0 commit comments

Comments
 (0)