Skip to content

Commit b0499c3

Browse files
committed
Align cancelling behavior to graphql-java request execution
This commit uses the new `ExecutionInput.cancel()` support from graphql-java to cancel data fetchers, if not already in flight. Closes gh-1171
1 parent 049eeed commit b0499c3

File tree

5 files changed

+24
-132
lines changed

5 files changed

+24
-132
lines changed

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextDataFetcherDecorator.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import graphql.ExecutionInput;
2323
import graphql.GraphQLContext;
2424
import graphql.TrivialDataFetcher;
25-
import graphql.execution.AbortExecutionException;
2625
import graphql.execution.DataFetcherResult;
2726
import graphql.schema.DataFetcher;
2827
import graphql.schema.DataFetchingEnvironment;
@@ -53,7 +52,6 @@
5352
* <li>Re-establish Reactor Context passed via {@link ExecutionInput}.
5453
* <li>Re-establish ThreadLocal context passed via {@link ExecutionInput}.
5554
* <li>Resolve exceptions from a GraphQL subscription {@link Publisher}.
56-
* <li>Propagate the cancellation signal to {@code DataFetcher} from the transport layer.
5755
* </ul>
5856
*
5957
* @author Rossen Stoyanchev
@@ -109,11 +107,6 @@ private ContextDataFetcherDecorator(
109107
if (value == null) {
110108
return null;
111109
}
112-
if (ContextPropagationHelper.isCancelled(graphQlContext)) {
113-
return DataFetcherResult.newResult()
114-
.error(new AbortExecutionException("GraphQL request has been cancelled by the client."))
115-
.build();
116-
}
117110

118111
if (this.subscription) {
119112
Flux<?> subscriptionResult = ReactiveAdapterRegistryHelper.toSubscriptionFlux(value)
@@ -125,8 +118,7 @@ private ContextDataFetcherDecorator(
125118
return this.subscriptionExceptionResolver.resolveException(exception)
126119
.flatMap((errors) -> Mono.error(new SubscriptionPublisherException(errors, exception)));
127120
});
128-
return ContextPropagationHelper.bindCancelFrom(subscriptionResult, graphQlContext)
129-
.contextWrite(snapshot::updateContext);
121+
return subscriptionResult.contextWrite(snapshot::updateContext);
130122
}
131123

132124
value = ReactiveAdapterRegistryHelper.toMonoIfReactive(value);

spring-graphql/src/main/java/org/springframework/graphql/execution/ContextPropagationHelper.java

Lines changed: 0 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,10 @@
1616

1717
package org.springframework.graphql.execution;
1818

19-
import java.util.concurrent.atomic.AtomicBoolean;
20-
2119
import graphql.GraphQLContext;
2220
import io.micrometer.context.ContextSnapshot;
2321
import io.micrometer.context.ContextSnapshotFactory;
2422
import org.jspecify.annotations.Nullable;
25-
import reactor.core.publisher.Flux;
26-
import reactor.core.publisher.Mono;
27-
import reactor.core.publisher.Sinks;
2823
import reactor.util.context.Context;
2924
import reactor.util.context.ContextView;
3025

@@ -41,10 +36,6 @@ public abstract class ContextPropagationHelper {
4136

4237
private static final String CONTEXT_SNAPSHOT_FACTORY_KEY = ContextPropagationHelper.class.getName() + ".KEY";
4338

44-
private static final String CANCELED_KEY = ContextPropagationHelper.class.getName() + ".canceled";
45-
46-
private static final String CANCELED_PUBLISHER_KEY = ContextPropagationHelper.class.getName() + ".canceledPublisher";
47-
4839

4940
/**
5041
* Select a {@code ContextSnapshotFactory} instance to use, either the one
@@ -122,54 +113,4 @@ public static ContextSnapshot captureFrom(GraphQLContext context) {
122113
return selectInstance(factory).captureFrom(context);
123114
}
124115

125-
/**
126-
* Create an atomic boolean and store it into the given {@link GraphQLContext}.
127-
* This boolean value can then be checked by upstream publishers to know whether the request is canceled.
128-
* @param context the current GraphQL context
129-
* @since 1.3.6
130-
*/
131-
public static Runnable createCancelSignal(GraphQLContext context) {
132-
AtomicBoolean requestCancelled = new AtomicBoolean();
133-
Sinks.Empty<Void> cancelSignal = Sinks.empty();
134-
context.put(CANCELED_KEY, requestCancelled);
135-
context.put(CANCELED_PUBLISHER_KEY, cancelSignal.asMono());
136-
return () -> {
137-
requestCancelled.set(true);
138-
cancelSignal.tryEmitEmpty();
139-
};
140-
}
141-
142-
/**
143-
* Return {@code true} if the current request has been cancelled, {@code false} otherwise.
144-
* This checks whether a {@link #createCancelSignal(GraphQLContext) cancellation publisher is present}
145-
* in the given context and the cancel signal has fired already.
146-
* @param context the current GraphQL context
147-
* @since 1.4.0
148-
*/
149-
public static boolean isCancelled(GraphQLContext context) {
150-
AtomicBoolean requestCancelled = context.get(CANCELED_KEY);
151-
if (requestCancelled != null) {
152-
return requestCancelled.get();
153-
}
154-
return false;
155-
}
156-
157-
/**
158-
* Bind the source {@link Flux} to the publisher from the given {@link GraphQLContext}.
159-
* The returned {@code Flux} will be cancelled when this publisher completes.
160-
* Subscribers must use the returned {@code Mono} instance.
161-
* @param source the source {@code Mono}
162-
* @param context the current GraphQL context
163-
* @param <T> the type of published elements
164-
* @return the new {@code Mono} that will be cancelled when notified
165-
* @since 1.3.5
166-
*/
167-
public static <T> Flux<T> bindCancelFrom(Flux<T> source, GraphQLContext context) {
168-
Mono<Void> cancelSignal = context.get(CANCELED_PUBLISHER_KEY);
169-
if (cancelSignal != null) {
170-
return source.takeUntilOther(cancelSignal);
171-
}
172-
return source;
173-
}
174-
175116
}

spring-graphql/src/main/java/org/springframework/graphql/execution/DefaultExecutionGraphQlService.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,12 @@ public final Mono<ExecutionGraphQlResponse> execute(ExecutionGraphQlRequest requ
9191
factory.captureFrom(contextView).updateContext(graphQLContext);
9292

9393
ExecutionInput executionInputToUse = registerDataLoaders(executionInput);
94-
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(graphQLContext);
9594

9695
return Mono.fromFuture(this.graphQlSource.graphQl().executeAsync(executionInputToUse))
9796
.onErrorResume((ex) -> ex instanceof GraphQLError, (ex) ->
9897
Mono.just(ExecutionResult.newExecutionResult().addError((GraphQLError) ex).build()))
9998
.map((result) -> new DefaultExecutionGraphQlResponse(executionInputToUse, result))
100-
.doOnCancel(cancelSignal::run);
99+
.doOnCancel(executionInputToUse::cancel);
101100
});
102101
}
103102

spring-graphql/src/test/java/org/springframework/graphql/execution/ContextDataFetcherDecoratorTests.java

Lines changed: 5 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import io.micrometer.context.ContextRegistry;
4343
import io.micrometer.context.ContextSnapshot;
4444
import io.micrometer.context.ContextSnapshotFactory;
45-
import org.junit.jupiter.api.Disabled;
4645
import org.junit.jupiter.api.Test;
4746
import reactor.core.publisher.Flux;
4847
import reactor.core.publisher.Mono;
@@ -291,60 +290,19 @@ void trivialDataFetcherIsNotDecorated() {
291290
assertThat(dataFetcher).isInstanceOf(TrivialDataFetcher.class);
292291
}
293292

294-
@Test
295-
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
296-
void cancelMonoDataFetcherWhenRequestCancelled() {
297-
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
298-
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
299-
.queryFetcher("greeting", (env) ->
300-
Mono.just("Hello")
301-
.delayElement(Duration.ofSeconds(1))
302-
.doOnCancel(() -> dataFetcherCancelled.set(true))
303-
)
304-
.toGraphQl();
305-
306-
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
307-
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
308-
309-
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
310-
cancelSignal.run();
311-
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
312-
}
313-
314-
@Test
315-
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
316-
void cancelFluxDataFetcherWhenRequestCancelled() {
317-
AtomicBoolean dataFetcherCancelled = new AtomicBoolean();
318-
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
319-
.queryFetcher("greeting", (env) ->
320-
Flux.just("Hello")
321-
.delayElements(Duration.ofSeconds(1))
322-
.doOnCancel(() -> dataFetcherCancelled.set(true))
323-
)
324-
.toGraphQl();
325-
326-
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
327-
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
328-
329-
CompletableFuture<ExecutionResult> asyncResult = graphQl.executeAsync(input);
330-
cancelSignal.run();
331-
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
332-
}
333-
334293
@Test
335294
void returnAbortExecutionForBlockingDataFetcherWhenRequestCancelled() throws Exception {
336295
GraphQL graphQl = GraphQlSetup.schemaContent(SCHEMA_CONTENT)
337296
.queryFetcher("greeting", (env) -> "Hello")
338297
.toGraphQl();
339298

340299
ExecutionInput input = ExecutionInput.newExecutionInput().query("{ greeting }").build();
341-
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
342-
cancelSignal.run();
300+
input.cancel();
343301
ExecutionResult result = graphQl.executeAsync(input).get();
344302

345303
assertThat(result.getErrors()).hasSize(1);
346304
assertThat(result.getErrors().get(0)).isInstanceOf(AbortExecutionException.class)
347-
.extracting("message").asString().isEqualTo("GraphQL request has been cancelled by the client.");
305+
.extracting("message").asString().isEqualTo("Execution has been asked to be cancelled");
348306
}
349307

350308
@Test
@@ -359,12 +317,10 @@ void cancelFluxDataFetcherSubscriptionWhenRequestCancelled() throws Exception {
359317
.toGraphQl();
360318

361319
ExecutionInput input = ExecutionInput.newExecutionInput().query("subscription { greetings }").build();
362-
Runnable cancelSignal = ContextPropagationHelper.createCancelSignal(input.getGraphQLContext());
363-
364320
ExecutionResult executionResult = graphQl.executeAsync(input).get();
365-
ResponseHelper.forSubscription(executionResult).subscribe();
366-
cancelSignal.run();
367-
321+
input.cancel();
322+
StepVerifier.create(ResponseHelper.forSubscription(executionResult))
323+
.verifyError(AbortExecutionException.class);
368324
await().atMost(Duration.ofSeconds(2)).until(dataFetcherCancelled::get);
369325
assertThat(dataFetcherCancelled).isTrue();
370326
}

spring-graphql/src/test/java/org/springframework/graphql/execution/DefaultExecutionGraphQlServiceTests.java

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@
2222

2323
import graphql.ErrorType;
2424
import org.dataloader.DataLoaderRegistry;
25-
import org.junit.jupiter.api.Disabled;
2625
import org.junit.jupiter.api.Test;
2726
import reactor.core.publisher.Flux;
2827
import reactor.core.publisher.Mono;
2928
import reactor.test.StepVerifier;
3029

3130
import org.springframework.graphql.Author;
3231
import org.springframework.graphql.Book;
32+
import org.springframework.graphql.BookSource;
3333
import org.springframework.graphql.ExecutionGraphQlRequest;
3434
import org.springframework.graphql.ExecutionGraphQlResponse;
3535
import org.springframework.graphql.GraphQlSetup;
@@ -83,20 +83,24 @@ void shouldHandleGraphQlErrors() {
8383
}
8484

8585
@Test
86-
@Disabled("until https://github.com/spring-projects/spring-graphql/issues/1171")
87-
void cancellationSupport() {
88-
AtomicBoolean cancelled = new AtomicBoolean();
89-
Mono<String> greetingMono = Mono.just("hi")
90-
.delayElement(Duration.ofSeconds(3))
91-
.doOnCancel(() -> cancelled.set(true));
92-
93-
Mono<ExecutionGraphQlResponse> execution = GraphQlSetup.schemaContent("type Query { greeting: String }")
94-
.queryFetcher("greeting", (env) -> greetingMono)
86+
void cancellationSupport() throws Exception {
87+
AtomicBoolean called = new AtomicBoolean();
88+
Mono<Book> bookMono = Mono.just(BookSource.getBookWithoutAuthor(1L))
89+
.delayElement(Duration.ofSeconds(1));
90+
91+
Mono<ExecutionGraphQlResponse> execution = GraphQlSetup.schemaResource(BookSource.schema)
92+
.queryFetcher("bookById", (env) -> bookMono)
93+
.dataFetcher("Book", "author", (env) -> {
94+
called.set(true);
95+
return BookSource.getAuthor(1L);
96+
})
9597
.toGraphQlService()
96-
.execute(TestExecutionRequest.forDocument("{ greeting }"));
9798

98-
StepVerifier.create(execution).thenCancel().verify();
99-
assertThat(cancelled).isTrue();
99+
.execute(TestExecutionRequest.forDocument("{ bookById(id: 1) { author { firstName } } }"));
100+
101+
StepVerifier.create(execution).thenAwait(Duration.ofMillis(500)).thenCancel().verify();
102+
Thread.sleep(1000);
103+
assertThat(called).isFalse();
100104
}
101105

102106
}

0 commit comments

Comments
 (0)