Skip to content

Commit 5253af1

Browse files
committed
Merge branch '6.1.x'
2 parents e3281a5 + d955549 commit 5253af1

File tree

10 files changed

+219
-31
lines changed

10 files changed

+219
-31
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/InvocableHandlerMethod.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import kotlin.reflect.jvm.KCallablesJvm;
3737
import kotlin.reflect.jvm.ReflectJvmMapping;
3838
import reactor.core.publisher.Mono;
39+
import reactor.core.scheduler.Scheduler;
3940

4041
import org.springframework.core.CoroutinesUtils;
4142
import org.springframework.core.DefaultParameterNameDiscoverer;
@@ -60,6 +61,12 @@
6061
* Extension of {@link HandlerMethod} that invokes the underlying method with
6162
* argument values resolved from the current HTTP request through a list of
6263
* {@link HandlerMethodArgumentResolver}.
64+
* <p>By default, the method invocation happens on the thread from which the
65+
* {@code Mono} was subscribed to, or in some cases the thread that emitted one
66+
* of the resolved arguments (e.g. when the request body needs to be decoded).
67+
* To ensure a predictable thread for the underlying method's invocation,
68+
* a {@link Scheduler} can optionally be provided via
69+
* {@link #setInvocationScheduler(Scheduler)}.
6370
*
6471
* @author Rossen Stoyanchev
6572
* @author Juergen Hoeller
@@ -86,6 +93,9 @@ public class InvocableHandlerMethod extends HandlerMethod {
8693

8794
private Class<?>[] validationGroups = EMPTY_GROUPS;
8895

96+
@Nullable
97+
private Scheduler invocationScheduler;
98+
8999

90100
/**
91101
* Create an instance from a {@code HandlerMethod}.
@@ -154,6 +164,13 @@ public void setMethodValidator(@Nullable MethodValidator methodValidator) {
154164
methodValidator.determineValidationGroups(getBean(), getBridgedMethod()) : EMPTY_GROUPS);
155165
}
156166

167+
/**
168+
* Set the {@link Scheduler} on which to perform the method invocation.
169+
* @since 6.1.6
170+
*/
171+
public void setInvocationScheduler(@Nullable Scheduler invocationScheduler) {
172+
this.invocationScheduler = invocationScheduler;
173+
}
157174

158175
/**
159176
* Invoke the method for the given exchange.
@@ -166,7 +183,7 @@ public void setMethodValidator(@Nullable MethodValidator methodValidator) {
166183
public Mono<HandlerResult> invoke(
167184
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
168185

169-
return getMethodArgumentValues(exchange, bindingContext, providedArgs).flatMap(args -> {
186+
return getMethodArgumentValuesOnScheduler(exchange, bindingContext, providedArgs).flatMap(args -> {
170187
if (shouldValidateArguments() && this.methodValidator != null) {
171188
this.methodValidator.applyArgumentValidation(
172189
getBean(), getBridgedMethod(), getMethodParameters(), args, this.validationGroups);
@@ -218,6 +235,12 @@ public Mono<HandlerResult> invoke(
218235
});
219236
}
220237

238+
private Mono<Object[]> getMethodArgumentValuesOnScheduler(
239+
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
240+
Mono<Object[]> argumentValuesMono = getMethodArgumentValues(exchange, bindingContext, providedArgs);
241+
return this.invocationScheduler != null ? argumentValuesMono.publishOn(this.invocationScheduler) : argumentValuesMono;
242+
}
243+
221244
private Mono<Object[]> getMethodArgumentValues(
222245
ServerWebExchange exchange, BindingContext bindingContext, Object... providedArgs) {
223246

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/ControllerMethodResolver.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2023 the original author or authors.
2+
* Copyright 2002-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -28,6 +28,7 @@
2828

2929
import org.apache.commons.logging.Log;
3030
import org.apache.commons.logging.LogFactory;
31+
import reactor.core.scheduler.Scheduler;
3132

3233
import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
3334
import org.springframework.context.ApplicationContext;
@@ -103,6 +104,12 @@ class ControllerMethodResolver {
103104

104105
private final ReactiveAdapterRegistry reactiveAdapterRegistry;
105106

107+
@Nullable
108+
private final Scheduler invocationScheduler;
109+
110+
@Nullable
111+
private final Predicate<? super HandlerMethod> blockingMethodPredicate;
112+
106113
@Nullable
107114
private final MethodValidator methodValidator;
108115

@@ -125,7 +132,9 @@ class ControllerMethodResolver {
125132
ControllerMethodResolver(
126133
ArgumentResolverConfigurer customResolvers, ReactiveAdapterRegistry adapterRegistry,
127134
ConfigurableApplicationContext context, List<HttpMessageReader<?>> readers,
128-
@Nullable WebBindingInitializer webBindingInitializer) {
135+
@Nullable WebBindingInitializer webBindingInitializer,
136+
@Nullable Scheduler invocationScheduler,
137+
@Nullable Predicate<? super HandlerMethod> blockingMethodPredicate) {
129138

130139
Assert.notNull(customResolvers, "ArgumentResolverConfigurer is required");
131140
Assert.notNull(adapterRegistry, "ReactiveAdapterRegistry is required");
@@ -137,6 +146,8 @@ class ControllerMethodResolver {
137146
this.requestMappingResolvers = requestMappingResolvers(customResolvers, adapterRegistry, context, readers);
138147
this.exceptionHandlerResolvers = exceptionHandlerResolvers(customResolvers, adapterRegistry, context);
139148
this.reactiveAdapterRegistry = adapterRegistry;
149+
this.invocationScheduler = invocationScheduler;
150+
this.blockingMethodPredicate = blockingMethodPredicate;
140151

141152
if (BEAN_VALIDATION_PRESENT) {
142153
this.methodValidator = HandlerMethodValidator.from(webBindingInitializer, null,
@@ -287,6 +298,21 @@ private static Predicate<MethodParameter> methodParamPredicate(
287298
};
288299
}
289300

301+
/**
302+
* Return a {@link Scheduler} for the given method if it is considered
303+
* blocking by the underlying blocking method predicate, or null if no
304+
* particular scheduler should be used for this method invocation.
305+
*/
306+
@Nullable
307+
public Scheduler getSchedulerFor(HandlerMethod handlerMethod) {
308+
if (this.invocationScheduler != null) {
309+
Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate");
310+
if (this.blockingMethodPredicate.test(handlerMethod)) {
311+
return this.invocationScheduler;
312+
}
313+
}
314+
return null;
315+
}
290316

291317
/**
292318
* Return an {@link InvocableHandlerMethod} for the given
@@ -297,6 +323,7 @@ public InvocableHandlerMethod getRequestMappingMethod(HandlerMethod handlerMetho
297323
invocable.setArgumentResolvers(this.requestMappingResolvers);
298324
invocable.setReactiveAdapterRegistry(this.reactiveAdapterRegistry);
299325
invocable.setMethodValidator(this.methodValidator);
326+
invocable.setInvocationScheduler(getSchedulerFor(handlerMethod));
300327
return invocable;
301328
}
302329

spring-webflux/src/main/java/org/springframework/web/reactive/result/method/annotation/RequestMappingHandlerAdapter.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ public void afterPropertiesSet() throws Exception {
225225

226226
this.methodResolver = new ControllerMethodResolver(
227227
this.argumentResolverConfigurer, this.reactiveAdapterRegistry, this.applicationContext,
228-
this.messageReaders, this.webBindingInitializer);
228+
this.messageReaders, this.webBindingInitializer,
229+
this.scheduler, this.blockingMethodPredicate);
229230

230231
this.modelInitializer = new ModelInitializer(this.methodResolver, this.reactiveAdapterRegistry);
231232
}
@@ -260,11 +261,9 @@ public Mono<HandlerResult> handle(ServerWebExchange exchange, Object handler) {
260261
.doOnNext(result -> result.setExceptionHandler(exceptionHandler))
261262
.onErrorResume(ex -> exceptionHandler.handleError(exchange, ex));
262263

263-
if (this.scheduler != null) {
264-
Assert.state(this.blockingMethodPredicate != null, "Expected HandlerMethod Predicate");
265-
if (this.blockingMethodPredicate.test(handlerMethod)) {
266-
resultMono = resultMono.subscribeOn(this.scheduler);
267-
}
264+
Scheduler optionalScheduler = this.methodResolver.getSchedulerFor(handlerMethod);
265+
if (optionalScheduler != null) {
266+
return resultMono.subscribeOn(optionalScheduler);
268267
}
269268

270269
return resultMono;

spring-webflux/src/test/java/org/springframework/web/reactive/result/method/InvocableHandlerMethodTests.java

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.junit.jupiter.api.Test;
2727
import reactor.core.publisher.Flux;
2828
import reactor.core.publisher.Mono;
29+
import reactor.core.scheduler.Scheduler;
30+
import reactor.core.scheduler.Schedulers;
2931
import reactor.test.StepVerifier;
3032

3133
import org.springframework.core.io.buffer.DataBuffer;
@@ -75,6 +77,15 @@ void resolveArg() {
7577
assertHandlerResultValue(mono, "success:value1");
7678
}
7779

80+
@Test
81+
void resolveArgOnSchedulerThread() {
82+
this.resolvers.add(stubResolver(Mono.<Object>just("success").publishOn(Schedulers.newSingle("wrong"))));
83+
Method method = ResolvableMethod.on(TestController.class).mockCall(o -> o.singleArgThread(null)).method();
84+
Mono<HandlerResult> mono = invokeOnScheduler(Schedulers.newSingle("good"), new TestController(), method);
85+
86+
assertHandlerResultValue(mono, "success on thread: good-", false);
87+
}
88+
7889
@Test
7990
void resolveNoArgValue() {
8091
this.resolvers.add(stubResolver(Mono.empty()));
@@ -92,6 +103,14 @@ void resolveNoArgs() {
92103
assertHandlerResultValue(mono, "success");
93104
}
94105

106+
@Test
107+
void resolveNoArgsOnSchedulerThread() {
108+
Method method = ResolvableMethod.on(TestController.class).mockCall(TestController::noArgsThread).method();
109+
Mono<HandlerResult> mono = invokeOnScheduler(Schedulers.newSingle("good"), new TestController(), method);
110+
111+
assertHandlerResultValue(mono, "on thread: good-", false);
112+
}
113+
95114
@Test
96115
void cannotResolveArg() {
97116
Method method = ResolvableMethod.on(TestController.class).mockCall(o -> o.singleArg(null)).method();
@@ -229,6 +248,13 @@ private Mono<HandlerResult> invoke(Object handler, Method method, Object... prov
229248
return invocable.invoke(this.exchange, new BindingContext(), providedArgs);
230249
}
231250

251+
private Mono<HandlerResult> invokeOnScheduler(Scheduler scheduler, Object handler, Method method, Object... providedArgs) {
252+
InvocableHandlerMethod invocable = new InvocableHandlerMethod(handler, method);
253+
invocable.setArgumentResolvers(this.resolvers);
254+
invocable.setInvocationScheduler(scheduler);
255+
return invocable.invoke(this.exchange, new BindingContext(), providedArgs);
256+
}
257+
232258
private HandlerMethodArgumentResolver stubResolver(Object stubValue) {
233259
return stubResolver(Mono.just(stubValue));
234260
}
@@ -241,8 +267,19 @@ private HandlerMethodArgumentResolver stubResolver(Mono<Object> stubValue) {
241267
}
242268

243269
private void assertHandlerResultValue(Mono<HandlerResult> mono, String expected) {
270+
this.assertHandlerResultValue(mono, expected, true);
271+
}
272+
273+
private void assertHandlerResultValue(Mono<HandlerResult> mono, String expected, boolean strict) {
244274
StepVerifier.create(mono)
245-
.consumeNextWith(result -> assertThat(result.getReturnValue()).isEqualTo(expected))
275+
.assertNext(result -> {
276+
if (strict) {
277+
assertThat(result.getReturnValue()).isEqualTo(expected);
278+
}
279+
else {
280+
assertThat(String.valueOf(result.getReturnValue())).startsWith(expected);
281+
}
282+
})
246283
.expectComplete()
247284
.verify();
248285
}
@@ -259,6 +296,14 @@ String noArgs() {
259296
return "success";
260297
}
261298

299+
String singleArgThread(String q) {
300+
return q + " on thread: " + Thread.currentThread().getName();
301+
}
302+
303+
String noArgsThread() {
304+
return "on thread: " + Thread.currentThread().getName();
305+
}
306+
262307
void exceptionMethod() {
263308
throw new IllegalStateException("boo");
264309
}

0 commit comments

Comments
 (0)