Skip to content

Commit cc7dc95

Browse files
committed
Provide programmatic alternative to @ConcurrencyLimit
Closes gh-35460 Signed-off-by: Geonhu Park <[email protected]>
1 parent 60673a0 commit cc7dc95

File tree

5 files changed

+228
-0
lines changed

5 files changed

+228
-0
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.concurrency;
18+
19+
import org.jspecify.annotations.Nullable;
20+
21+
import org.springframework.util.ConcurrencyThrottleSupport;
22+
23+
/**
24+
* Template-style API that throttles concurrent executions of user-provided callbacks
25+
* according to a configurable concurrency limit.
26+
*
27+
* <p>Blocking semantics are identical to {@link ConcurrencyThrottleSupport}:
28+
* when the configured limit is reached, additional callers will block until a
29+
* permit becomes available.
30+
*
31+
* <p>The default concurrency limit of this template is 1.
32+
*
33+
* @author Geonhu Park
34+
* @since 7.1
35+
* @see ConcurrencyThrottleSupport
36+
*/
37+
@SuppressWarnings("serial")
38+
public class ConcurrencyLimitTemplate extends ConcurrencyThrottleSupport {
39+
40+
/**
41+
* Create a default {@code ConcurrencyLimitTemplate}
42+
* with concurrency limit 1.
43+
*/
44+
public ConcurrencyLimitTemplate() {
45+
this(1);
46+
}
47+
48+
/**
49+
* Create a {@code ConcurrencyThrottleInterceptor}
50+
* with the given concurrency limit.
51+
*/
52+
public ConcurrencyLimitTemplate(int concurrencyLimit) {
53+
setConcurrencyLimit(concurrencyLimit);
54+
}
55+
56+
/**
57+
* Execute the supplied callback under the configured concurrency limit.
58+
* @param concurrencyLimited the unit of work to run
59+
* @param <R> the result type (nullable)
60+
* @return the callback's result (possibly {@code null})
61+
* @throws Throwable any exception thrown by the callback
62+
*/
63+
public <R extends @Nullable Object> @Nullable R execute(ConcurrencyLimited<R> concurrencyLimited) throws Throwable {
64+
beforeAccess();
65+
try {
66+
return concurrencyLimited.execute();
67+
}
68+
finally {
69+
afterAccess();
70+
}
71+
}
72+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.concurrency;
18+
19+
import org.jspecify.annotations.Nullable;
20+
21+
/**
22+
* Functional callback representing a single unit of work to be executed under
23+
* the concurrency throttling of a {@link ConcurrencyLimitTemplate}.
24+
*
25+
* @author Geonhu Park
26+
* @since 7.1
27+
* @param <R> the result type (nullable)
28+
* @see ConcurrencyLimitTemplate
29+
*/
30+
@FunctionalInterface
31+
public interface ConcurrencyLimited<R extends @Nullable Object> {
32+
33+
/**
34+
* Execute the concurrency-limited operation.
35+
* @return the result (may be {@code null})
36+
* @throws Throwable any error from the underlying operation
37+
*/
38+
R execute() throws Throwable;
39+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
/**
2+
* Concurrency limiting (throttling) support via {@link org.springframework.core.concurrency.ConcurrencyLimitTemplate}
3+
* and {@link org.springframework.core.concurrency.ConcurrencyLimited}.
4+
*/
5+
package org.springframework.core.concurrency;

spring-core/src/main/java/org/springframework/util/ConcurrencyThrottleSupport.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
* @see #beforeAccess()
4545
* @see #afterAccess()
4646
* @see org.springframework.aop.interceptor.ConcurrencyThrottleInterceptor
47+
* @see org.springframework.core.concurrency.ConcurrencyLimitTemplate
4748
* @see java.io.Serializable
4849
*/
4950
@SuppressWarnings("serial")
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Copyright 2002-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.core.concurrency;
18+
19+
import org.apache.commons.logging.Log;
20+
import org.apache.commons.logging.LogFactory;
21+
import org.junit.jupiter.params.ParameterizedTest;
22+
import org.junit.jupiter.params.provider.ValueSource;
23+
24+
/**
25+
* Tests for {@link ConcurrencyLimitTemplate}.
26+
*
27+
* @author Geonhu Park
28+
*/
29+
class ConcurrencyLimitTemplateTests {
30+
31+
private static final Log logger = LogFactory.getLog(ConcurrencyLimitTemplateTests.class);
32+
33+
private static final int NR_OF_THREADS = 100;
34+
35+
private static final int NR_OF_ITERATIONS = 1000;
36+
37+
@ParameterizedTest
38+
@ValueSource(ints = {1, 10})
39+
void multipleThreadsWithLimit(int concurrencyLimit) {
40+
ConcurrencyLimitTemplate template = new ConcurrencyLimitTemplate(concurrencyLimit);
41+
42+
Thread[] threads = new Thread[NR_OF_THREADS];
43+
for (int i = 0; i < NR_OF_THREADS; i++) {
44+
threads[i] = new ConcurrencyThread(template, null);
45+
threads[i].start();
46+
}
47+
for (int i = 0; i < NR_OF_THREADS / 10; i++) {
48+
try {
49+
Thread.sleep(5);
50+
}
51+
catch (InterruptedException ex) {
52+
ex.printStackTrace();
53+
}
54+
threads[i] = new ConcurrencyThread(template,
55+
(i % 2 == 0 ? new OutOfMemoryError() : new IllegalStateException()));
56+
threads[i].start();
57+
}
58+
for (Thread t : threads) {
59+
try {
60+
t.join();
61+
}
62+
catch (InterruptedException ex) {
63+
ex.printStackTrace();
64+
}
65+
}
66+
}
67+
68+
private static class ConcurrencyThread extends Thread {
69+
70+
private final ConcurrencyLimitTemplate template;
71+
private final Throwable ex;
72+
73+
ConcurrencyThread(ConcurrencyLimitTemplate template, Throwable ex) {
74+
this.template = template;
75+
this.ex = ex;
76+
}
77+
78+
@Override
79+
public void run() {
80+
if (this.ex != null) {
81+
try {
82+
this.template.execute(() -> {
83+
throw this.ex;
84+
});
85+
}
86+
catch (RuntimeException | Error err) {
87+
if (err == this.ex) {
88+
logger.info("Expected exception thrown", err);
89+
}
90+
else {
91+
ex.printStackTrace();
92+
}
93+
}
94+
catch (Throwable th) {
95+
th.printStackTrace();
96+
}
97+
}
98+
else {
99+
for (int i = 0; i < NR_OF_ITERATIONS; i++) {
100+
try {
101+
this.template.execute(() -> null);
102+
}
103+
catch (Throwable th) {
104+
th.printStackTrace();
105+
break;
106+
}
107+
}
108+
}
109+
}
110+
}
111+
}

0 commit comments

Comments
 (0)