Skip to content

Commit 6b8d37b

Browse files
GH-2971: Add LockRegistry.executeLocked() API (#8729)
* GH-2971: Add `LockRegistry.executeLocked()` API Fixes #2971 * Following best practice and well-known patterns with `Jdbc`, `Rest` or `Jms` templates, introduce `default` methods into `LockRegistry` interface to make it easier to perform tasks when within a lock. * Since all the required logic is now covered by those `LockRegistry.executeLocked()` methods, there is no need in the dedicated abstract `WhileLockedProcessor` class. Deprecated it for removal in the next version * Use a new `LockRegistry.executeLocked()` API in the `FileWritingMessageHandler` instead of just deprecated `WhileLockedProcessor` * To satisfy Java limitations for checked lambdas, introduce `CheckedCallable` and `CheckedRunnable` utilities similar to interfaces in the `io.micrometer.observation.Observation` * Change existing `CheckedFunction` to expose extra generic argument for `Throwable` * Add dedicated chapter for distributed lock into docs * Fix some links and typos in the docs * * Fix Javadoc for `CheckedFunction` * Fix language in docs Co-authored-by: Gary Russell <[email protected]> --------- Co-authored-by: Gary Russell <[email protected]>
1 parent 863c525 commit 6b8d37b

File tree

16 files changed

+420
-108
lines changed

16 files changed

+420
-108
lines changed

spring-integration-core/src/main/java/org/springframework/integration/support/locks/LockRegistry.java

Lines changed: 97 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,24 +16,119 @@
1616

1717
package org.springframework.integration.support.locks;
1818

19+
import java.time.Duration;
20+
import java.util.concurrent.TimeUnit;
21+
import java.util.concurrent.TimeoutException;
1922
import java.util.concurrent.locks.Lock;
2023

24+
import org.springframework.integration.util.CheckedCallable;
25+
import org.springframework.integration.util.CheckedRunnable;
26+
2127
/**
2228
* Strategy for maintaining a registry of shared locks.
2329
*
2430
* @author Oleg Zhurakousky
2531
* @author Gary Russell
32+
* @author Artem Bilan
2633
*
2734
* @since 2.1.1
2835
*/
2936
@FunctionalInterface
3037
public interface LockRegistry {
3138

3239
/**
33-
* Obtains the lock associated with the parameter object.
40+
* Obtain the lock associated with the parameter object.
3441
* @param lockKey The object with which the lock is associated.
3542
* @return The associated lock.
3643
*/
3744
Lock obtain(Object lockKey);
3845

46+
/**
47+
* Perform the provided task when the lock for the key is locked.
48+
* @param lockKey the lock key to use
49+
* @param runnable the {@link CheckedRunnable} to execute within a lock
50+
* @param <E> type of exception runnable throws
51+
* @throws InterruptedException from a lock operation
52+
* @since 6.2
53+
*/
54+
default <E extends Throwable> void executeLocked(Object lockKey, CheckedRunnable<E> runnable)
55+
throws E, InterruptedException {
56+
57+
executeLocked(lockKey,
58+
() -> {
59+
runnable.run();
60+
return null;
61+
});
62+
}
63+
64+
/**
65+
* Perform the provided task when the lock for the key is locked.
66+
* @param lockKey the lock key to use
67+
* @param callable the {@link CheckedCallable} to execute within a lock
68+
* @param <T> type of callable result
69+
* @param <E> type of exception callable throws
70+
* @return the result of callable
71+
* @throws InterruptedException from a lock operation
72+
* @since 6.2
73+
*/
74+
default <T, E extends Throwable> T executeLocked(Object lockKey, CheckedCallable<T, E> callable)
75+
throws E, InterruptedException {
76+
77+
Lock lock = obtain(lockKey);
78+
lock.lockInterruptibly();
79+
try {
80+
return callable.call();
81+
}
82+
finally {
83+
lock.unlock();
84+
}
85+
}
86+
87+
/**
88+
* Perform the provided task when the lock for the key is locked.
89+
* @param lockKey the lock key to use
90+
* @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)}
91+
* @param runnable the {@link CheckedRunnable} to execute within a lock
92+
* @param <E> type of exception runnable throws
93+
* @throws InterruptedException from a lock operation
94+
* @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed
95+
* @since 6.2
96+
*/
97+
default <E extends Throwable> void executeLocked(Object lockKey, Duration waitLockDuration,
98+
CheckedRunnable<E> runnable) throws E, InterruptedException, TimeoutException {
99+
100+
executeLocked(lockKey, waitLockDuration,
101+
() -> {
102+
runnable.run();
103+
return null;
104+
});
105+
}
106+
107+
/**
108+
* Perform the provided task when the lock for the key is locked.
109+
* @param lockKey the lock key to use
110+
* @param waitLockDuration the {@link Duration} for {@link Lock#tryLock(long, TimeUnit)}
111+
* @param callable the {@link CheckedCallable} to execute within a lock
112+
* @param <E> type of exception callable throws
113+
* @throws InterruptedException from a lock operation
114+
* @throws TimeoutException when {@link Lock#tryLock(long, TimeUnit)} has elapsed
115+
* @since 6.2
116+
*/
117+
default <T, E extends Throwable> T executeLocked(Object lockKey, Duration waitLockDuration,
118+
CheckedCallable<T, E> callable) throws E, InterruptedException, TimeoutException {
119+
120+
Lock lock = obtain(lockKey);
121+
if (!lock.tryLock(waitLockDuration.toMillis(), TimeUnit.MILLISECONDS)) {
122+
throw new TimeoutException(
123+
"The lock [%s] was not acquired in time: %s".formatted(lockKey, waitLockDuration));
124+
}
125+
126+
try {
127+
return callable.call();
128+
}
129+
finally {
130+
lock.unlock();
131+
}
132+
}
133+
39134
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
/**
20+
* A Callable-like interface which allows throwing any Throwable.
21+
* Checked exceptions are wrapped in an IllegalStateException.
22+
*
23+
* @param <T> the output type.
24+
* @param <E> the throwable type.
25+
*
26+
* @author Artem Bilan
27+
*
28+
* @since 6.2
29+
*/
30+
@FunctionalInterface
31+
public interface CheckedCallable<T, E extends Throwable> {
32+
33+
T call() throws E;
34+
35+
default Runnable unchecked() {
36+
return () -> {
37+
try {
38+
call();
39+
}
40+
catch (Throwable t) { // NOSONAR
41+
if (t instanceof RuntimeException runtimeException) { // NOSONAR
42+
throw runtimeException;
43+
}
44+
else if (t instanceof Error error) { // NOSONAR
45+
throw error;
46+
}
47+
else {
48+
throw new IllegalStateException(t);
49+
}
50+
}
51+
};
52+
}
53+
54+
}

spring-integration-core/src/main/java/org/springframework/integration/util/CheckedFunction.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@
2323
*
2424
* @param <T> the input type.
2525
* @param <R> the output type.
26+
* @param <E> the throwable type.
2627
*
2728
* @author Artem Bilan
2829
*
2930
* @since 6.1
3031
*/
3132
@FunctionalInterface
32-
public interface CheckedFunction<T, R> {
33+
public interface CheckedFunction<T, R, E extends Throwable> {
3334

34-
R apply(T t) throws Throwable; // NOSONAR
35+
R apply(T t) throws E;
3536

3637
default Function<T, R> unchecked() {
3738
return t1 -> {
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2023 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.util;
18+
19+
/**
20+
* A Runnable-like interface which allows throwing any Throwable.
21+
* Checked exceptions are wrapped in an IllegalStateException.
22+
*
23+
* @param <E> the throwable type.
24+
*
25+
* @author Artem Bilan
26+
*
27+
* @since 6.2
28+
*/
29+
@FunctionalInterface
30+
public interface CheckedRunnable<E extends Throwable> {
31+
32+
void run() throws E;
33+
34+
default Runnable unchecked() {
35+
return () -> {
36+
try {
37+
run();
38+
}
39+
catch (Throwable t) { // NOSONAR
40+
if (t instanceof RuntimeException runtimeException) { // NOSONAR
41+
throw runtimeException;
42+
}
43+
else if (t instanceof Error error) { // NOSONAR
44+
throw error;
45+
}
46+
else {
47+
throw new IllegalStateException(t);
48+
}
49+
}
50+
};
51+
}
52+
53+
}

spring-integration-core/src/main/java/org/springframework/integration/util/WhileLockedProcessor.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2022 the original author or authors.
2+
* Copyright 2002-2023 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -30,10 +30,13 @@
3030
* then call {@link #doWhileLocked()}.
3131
*
3232
* @author Oleg Zhurakousky
33+
* @author Artem Bilan
3334
*
3435
* @since 2.2
3536
*
37+
* @deprecated since 6.2 in favor of {@link LockRegistry#executeLocked}.
3638
*/
39+
@Deprecated(since = "6.2", forRemoval = true)
3740
public abstract class WhileLockedProcessor {
3841

3942
private final Object key;

0 commit comments

Comments
 (0)