Skip to content

Commit 8b36736

Browse files
committed
Add concurrency throttle and flexible task callback to SyncTaskExecutor
Closes gh-35460
1 parent 26c57ce commit 8b36736

File tree

5 files changed

+225
-14
lines changed

5 files changed

+225
-14
lines changed

spring-context/src/main/java/org/springframework/resilience/annotation/ConcurrencyLimit.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
*
3939
* <p>This is particularly useful with Virtual Threads where there is generally
4040
* no thread pool limit in place. For asynchronous tasks, this can be constrained
41-
* on {@link org.springframework.core.task.SimpleAsyncTaskExecutor}; for
41+
* on {@link org.springframework.core.task.SimpleAsyncTaskExecutor}. For
4242
* synchronous invocations, this annotation provides equivalent behavior through
43-
* {@link org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor}.
43+
* {@link org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor}
44+
* Alternatively, consider {@link org.springframework.core.task.SyncTaskExecutor}
45+
* and its inherited concurrency throttle (new as of 7.0) for programmatic use.
4446
*
4547
* @author Juergen Hoeller
4648
* @author Hyunsang Han
@@ -49,6 +51,7 @@
4951
* @see EnableResilientMethods
5052
* @see ConcurrencyLimitBeanPostProcessor
5153
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
54+
* @see org.springframework.core.task.SyncTaskExecutor#setConcurrencyLimit
5255
* @see org.springframework.core.task.SimpleAsyncTaskExecutor#setConcurrencyLimit
5356
*/
5457
@Target({ElementType.TYPE, ElementType.METHOD})

spring-context/src/test/java/org/springframework/resilience/ConcurrencyLimitTests.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ void withSimpleInterceptor() {
5959
futures.add(CompletableFuture.runAsync(proxy::concurrentOperation));
6060
}
6161
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
62-
assertThat(target.counter).hasValue(0);
62+
assertThat(target.current).hasValue(0);
63+
assertThat(target.counter).hasValue(10);
6364
}
6465

6566
@Test
@@ -166,10 +167,12 @@ private static <T> T createProxy(Class<T> beanClass) {
166167

167168
static class NonAnnotatedBean {
168169

170+
final AtomicInteger current = new AtomicInteger();
171+
169172
final AtomicInteger counter = new AtomicInteger();
170173

171174
public void concurrentOperation() {
172-
if (counter.incrementAndGet() > 2) {
175+
if (current.incrementAndGet() > 2) {
173176
throw new IllegalStateException();
174177
}
175178
try {
@@ -178,7 +181,8 @@ public void concurrentOperation() {
178181
catch (InterruptedException ex) {
179182
throw new IllegalStateException(ex);
180183
}
181-
counter.decrementAndGet();
184+
current.decrementAndGet();
185+
counter.incrementAndGet();
182186
}
183187
}
184188

spring-core/src/main/java/org/springframework/core/task/SyncTaskExecutor.java

Lines changed: 45 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,13 @@
1919
import java.io.Serializable;
2020

2121
import org.springframework.util.Assert;
22+
import org.springframework.util.ConcurrencyThrottleSupport;
2223

2324
/**
2425
* {@link TaskExecutor} implementation that executes each task <i>synchronously</i>
25-
* in the calling thread.
26-
*
27-
* <p>Mainly intended for testing scenarios.
26+
* in the calling thread. This can be used for testing purposes but also for
27+
* bounded execution in a Virtual Threads setup, relying on concurrency throttling
28+
* as inherited from the base class: see {@link #setConcurrencyLimit} (as of 7.0).
2829
*
2930
* <p>Execution in the calling thread does have the advantage of participating
3031
* in its thread context, for example the thread context class loader or the
@@ -37,17 +38,52 @@
3738
* @see SimpleAsyncTaskExecutor
3839
*/
3940
@SuppressWarnings("serial")
40-
public class SyncTaskExecutor implements TaskExecutor, Serializable {
41+
public class SyncTaskExecutor extends ConcurrencyThrottleSupport implements TaskExecutor, Serializable {
4142

4243
/**
43-
* Executes the given {@code task} synchronously, through direct
44-
* invocation of it's {@link Runnable#run() run()} method.
45-
* @throws IllegalArgumentException if the given {@code task} is {@code null}
44+
* Execute the given {@code task} synchronously, through direct
45+
* invocation of its {@link Runnable#run() run()} method.
46+
* @throws RuntimeException if propagated from the given {@code Runnable}
4647
*/
4748
@Override
4849
public void execute(Runnable task) {
49-
Assert.notNull(task, "Runnable must not be null");
50-
task.run();
50+
Assert.notNull(task, "Task must not be null");
51+
if (isThrottleActive()) {
52+
beforeAccess();
53+
try {
54+
task.run();
55+
}
56+
finally {
57+
afterAccess();
58+
}
59+
}
60+
else {
61+
task.run();
62+
}
63+
}
64+
65+
/**
66+
* Execute the given {@code task} synchronously, through direct
67+
* invocation of its {@link TaskCallback#call() call()} method.
68+
* @param <V> the returned value type, if any
69+
* @param <E> the exception propagated, if any
70+
* @throws E if propagated from the given {@code TaskCallback}
71+
* @since 7.0
72+
*/
73+
public <V, E extends Exception> V execute(TaskCallback<V, E> task) throws E {
74+
Assert.notNull(task, "Task must not be null");
75+
if (isThrottleActive()) {
76+
beforeAccess();
77+
try {
78+
return task.call();
79+
}
80+
finally {
81+
afterAccess();
82+
}
83+
}
84+
else {
85+
return task.call();
86+
}
5187
}
5288

5389
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.task;
18+
19+
import java.util.concurrent.Callable;
20+
21+
/**
22+
* Variant of {@link Callable} with a flexible exception signature
23+
* that can be adapted in the {@link SyncTaskExecutor#execute(TaskCallback)}
24+
* method signature for propagating specific exception types only.
25+
*
26+
* <p>An implementation of this interface can also be passed into any
27+
* {@code Callback}-based method such as {@link AsyncTaskExecutor#submit(Callable)}
28+
* or {@link AsyncTaskExecutor#submitCompletable(Callable)}. It is just capable
29+
* of adapting to flexible exception propagation in caller signatures as well.
30+
*
31+
* @author Juergen Hoeller
32+
* @since 7.0
33+
* @param <V> the returned value type, if any
34+
* @param <E> the exception propagated, if any
35+
* @see SyncTaskExecutor#execute(TaskCallback)
36+
*/
37+
public interface TaskCallback<V, E extends Exception> extends Callable<V> {
38+
39+
@Override
40+
V call() throws E;
41+
42+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.task;
18+
19+
import java.io.IOException;
20+
import java.util.ArrayList;
21+
import java.util.List;
22+
import java.util.concurrent.CompletableFuture;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.jupiter.api.Test;
26+
27+
import static org.assertj.core.api.Assertions.assertThat;
28+
import static org.assertj.core.api.Assertions.assertThatIOException;
29+
import static org.assertj.core.api.Assertions.assertThatNoException;
30+
31+
/**
32+
* @author Juergen Hoeller
33+
* @since 7.0
34+
*/
35+
class SyncTaskExecutorTests {
36+
37+
@Test
38+
void plainExecution() {
39+
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
40+
41+
ConcurrentClass target = new ConcurrentClass();
42+
assertThatNoException().isThrownBy(() -> taskExecutor.execute(target::concurrentOperation));
43+
assertThat(taskExecutor.execute(target::concurrentOperationWithResult)).isEqualTo("result");
44+
assertThatIOException().isThrownBy(() -> taskExecutor.execute(target::concurrentOperationWithException));
45+
}
46+
47+
@Test
48+
void withConcurrencyLimit() {
49+
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
50+
taskExecutor.setConcurrencyLimit(2);
51+
52+
ConcurrentClass target = new ConcurrentClass();
53+
List<CompletableFuture<?>> futures = new ArrayList<>(10);
54+
for (int i = 0; i < 10; i++) {
55+
futures.add(CompletableFuture.runAsync(() -> taskExecutor.execute(target::concurrentOperation)));
56+
}
57+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
58+
assertThat(target.current).hasValue(0);
59+
assertThat(target.counter).hasValue(10);
60+
}
61+
62+
@Test
63+
void withConcurrencyLimitAndResult() {
64+
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
65+
taskExecutor.setConcurrencyLimit(2);
66+
67+
ConcurrentClass target = new ConcurrentClass();
68+
List<CompletableFuture<?>> futures = new ArrayList<>(10);
69+
for (int i = 0; i < 10; i++) {
70+
futures.add(CompletableFuture.runAsync(() ->
71+
assertThat(taskExecutor.execute(target::concurrentOperationWithResult)).isEqualTo("result")));
72+
}
73+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
74+
assertThat(target.current).hasValue(0);
75+
assertThat(target.counter).hasValue(10);
76+
}
77+
78+
@Test
79+
void withConcurrencyLimitAndException() {
80+
SyncTaskExecutor taskExecutor = new SyncTaskExecutor();
81+
taskExecutor.setConcurrencyLimit(2);
82+
83+
ConcurrentClass target = new ConcurrentClass();
84+
List<CompletableFuture<?>> futures = new ArrayList<>(10);
85+
for (int i = 0; i < 10; i++) {
86+
futures.add(CompletableFuture.runAsync(() ->
87+
assertThatIOException().isThrownBy(() -> taskExecutor.execute(target::concurrentOperationWithException))));
88+
}
89+
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
90+
assertThat(target.current).hasValue(0);
91+
assertThat(target.counter).hasValue(10);
92+
}
93+
94+
95+
static class ConcurrentClass {
96+
97+
final AtomicInteger current = new AtomicInteger();
98+
99+
final AtomicInteger counter = new AtomicInteger();
100+
101+
public void concurrentOperation() {
102+
if (current.incrementAndGet() > 2) {
103+
throw new IllegalStateException();
104+
}
105+
try {
106+
Thread.sleep(10);
107+
}
108+
catch (InterruptedException ex) {
109+
throw new IllegalStateException(ex);
110+
}
111+
current.decrementAndGet();
112+
counter.incrementAndGet();
113+
}
114+
115+
public String concurrentOperationWithResult() {
116+
concurrentOperation();
117+
return "result";
118+
}
119+
120+
public String concurrentOperationWithException() throws IOException {
121+
concurrentOperation();
122+
throw new IOException();
123+
}
124+
}
125+
126+
}

0 commit comments

Comments
 (0)