Skip to content

Commit 91c1b57

Browse files
authored
[Improvement-17715][Registry] The registry center's lock mechanism supports reentrant acquisition. (#17744)
1 parent 4505d4b commit 91c1b57

File tree

4 files changed

+107
-38
lines changed
  • dolphinscheduler-registry/dolphinscheduler-registry-plugins
    • dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd
    • dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry
    • dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server
    • dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper

4 files changed

+107
-38
lines changed

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-etcd/src/main/java/org/apache/dolphinscheduler/plugin/registry/etcd/EtcdRegistry.java

Lines changed: 57 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import java.util.concurrent.ExecutionException;
3838
import java.util.concurrent.TimeUnit;
3939
import java.util.concurrent.TimeoutException;
40+
import java.util.concurrent.atomic.AtomicInteger;
4041
import java.util.stream.Collectors;
4142

4243
import javax.net.ssl.SSLException;
@@ -81,7 +82,7 @@ public class EtcdRegistry implements Registry {
8182
public static final String FOLDER_SEPARATOR = "/";
8283
// save the lock info for thread
8384
// key:lockKey Value:leaseId
84-
private static final ThreadLocal<Map<String, Long>> threadLocalLockMap = new ThreadLocal<>();
85+
private static final ThreadLocal<Map<String, LockEntry>> threadLocalLockMap = new ThreadLocal<>();
8586

8687
private final Map<String, Watch.Watcher> watcherMap = new ConcurrentHashMap<>();
8788

@@ -297,13 +298,9 @@ public boolean exists(String key) {
297298
* get the lock with a lease
298299
*/
299300
@Override
300-
public boolean acquireLock(String key) {
301-
Map<String, Long> leaseIdMap = threadLocalLockMap.get();
302-
if (null == leaseIdMap) {
303-
leaseIdMap = new HashMap<>();
304-
threadLocalLockMap.set(leaseIdMap);
305-
}
306-
if (leaseIdMap.containsKey(key)) {
301+
public boolean acquireLock(String lockKey) {
302+
Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
303+
if (acquireBasedOnThreadHeldLocks(lockKey, threadHeldLocks)) {
307304
return true;
308305
}
309306

@@ -315,27 +312,41 @@ public boolean acquireLock(String key) {
315312
// keep the lease
316313
client.getLeaseClient().keepAlive(leaseId, Observers.observer(response -> {
317314
}));
318-
lockClient.lock(byteSequence(key), leaseId).get();
315+
lockClient.lock(byteSequence(lockKey), leaseId).get();
319316

320317
// save the leaseId for release Lock
321-
leaseIdMap.put(key, leaseId);
318+
threadHeldLocks.put(lockKey, new LockEntry(leaseId));
322319
return true;
323320
} catch (InterruptedException e) {
324321
Thread.currentThread().interrupt();
325322
throw new RegistryException("etcd get lock error", e);
326323
} catch (Exception e) {
327-
throw new RegistryException("etcd get lock error, lockKey: " + key, e);
324+
throw new RegistryException("etcd get lock error, lockKey: " + lockKey, e);
325+
}
326+
}
327+
328+
private static boolean acquireBasedOnThreadHeldLocks(String lockKey, Map<String, LockEntry> threadHeldLocks) {
329+
LockEntry lockEntry = threadHeldLocks.get(lockKey);
330+
if (lockEntry != null) {
331+
lockEntry.lockCount.incrementAndGet();
332+
return true;
328333
}
334+
return false;
335+
}
336+
337+
private static Map<String, LockEntry> getThreadHeldLocks() {
338+
Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
339+
if (null == lockEntryMap) {
340+
lockEntryMap = new HashMap<>();
341+
threadLocalLockMap.set(lockEntryMap);
342+
}
343+
return lockEntryMap;
329344
}
330345

331346
@Override
332347
public boolean acquireLock(String key, long timeout) {
333-
Map<String, Long> leaseIdMap = threadLocalLockMap.get();
334-
if (null == leaseIdMap) {
335-
leaseIdMap = new HashMap<>();
336-
threadLocalLockMap.set(leaseIdMap);
337-
}
338-
if (leaseIdMap.containsKey(key)) {
348+
Map<String, LockEntry> threadHeldLocks = getThreadHeldLocks();
349+
if (acquireBasedOnThreadHeldLocks(key, threadHeldLocks)) {
339350
return true;
340351
}
341352

@@ -350,7 +361,7 @@ public boolean acquireLock(String key, long timeout) {
350361
}));
351362

352363
// save the leaseId for release Lock
353-
leaseIdMap.put(key, leaseId);
364+
threadHeldLocks.put(key, new LockEntry(leaseId));
354365
return true;
355366
} catch (TimeoutException timeoutException) {
356367
log.debug("Acquire lock: {} in {}/ms timeout", key, timeout);
@@ -369,10 +380,24 @@ public boolean acquireLock(String key, long timeout) {
369380
@Override
370381
public boolean releaseLock(String key) {
371382
try {
372-
Long leaseId = threadLocalLockMap.get().get(key);
373-
client.getLeaseClient().revoke(leaseId);
374-
threadLocalLockMap.get().remove(key);
375-
if (threadLocalLockMap.get().isEmpty()) {
383+
Map<String, LockEntry> lockEntryMap = threadLocalLockMap.get();
384+
if (lockEntryMap == null) {
385+
return true;
386+
}
387+
LockEntry lockEntry = lockEntryMap.get(key);
388+
if (lockEntry == null) {
389+
return true;
390+
}
391+
int newLockCount = lockEntry.lockCount.decrementAndGet();
392+
if (newLockCount > 0) {
393+
return true;
394+
}
395+
if (newLockCount < 0) {
396+
throw new IllegalMonitorStateException("Etcd lock count has gone negative for lock: " + key);
397+
}
398+
client.getLeaseClient().revoke(lockEntry.leaseId);
399+
lockEntryMap.remove(key);
400+
if (lockEntryMap.isEmpty()) {
376401
threadLocalLockMap.remove();
377402
}
378403
} catch (Exception e) {
@@ -418,4 +443,14 @@ private Event toEvent(final WatchEvent watchEvent, final String watchedPath) {
418443
.build();
419444
}
420445

446+
private static class LockEntry {
447+
448+
final Long leaseId;
449+
final AtomicInteger lockCount = new AtomicInteger(1);
450+
451+
private LockEntry(Long leaseId) {
452+
this.leaseId = leaseId;
453+
}
454+
}
455+
421456
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-it/src/test/java/org/apache/dolphinscheduler/plugin/registry/RegistryTestCase.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,29 @@ public void testReleaseLock() {
258258
assertThat(acquireResult.get()).isTrue();
259259
}
260260

261+
@SneakyThrows
262+
@Test
263+
public void testReentrantLock() {
264+
registry.start();
265+
String lockKey = "/lock" + System.nanoTime();
266+
assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
267+
assertThat(registry.acquireLock(lockKey, 3000)).isTrue();
268+
269+
CompletableFuture<Boolean> acquireResult =
270+
CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
271+
assertThat(acquireResult.get()).isFalse();
272+
273+
assertThat(registry.releaseLock(lockKey)).isTrue();
274+
275+
acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
276+
assertThat(acquireResult.get()).isFalse();
277+
278+
assertThat(registry.releaseLock(lockKey)).isTrue();
279+
280+
acquireResult = CompletableFuture.supplyAsync(() -> registry.acquireLock(lockKey, 3000));
281+
assertThat(acquireResult.get()).isTrue();
282+
}
283+
261284
public abstract R createRegistry();
262285

263286
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-jdbc/src/main/java/org/apache/dolphinscheduler/plugin/registry/jdbc/server/JdbcRegistryLockManager.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,9 @@
2424
import org.apache.dolphinscheduler.plugin.registry.jdbc.repository.JdbcRegistryLockRepository;
2525

2626
import java.util.Date;
27-
import java.util.HashMap;
2827
import java.util.Map;
28+
import java.util.concurrent.ConcurrentHashMap;
29+
import java.util.concurrent.atomic.AtomicInteger;
2930

3031
import lombok.AllArgsConstructor;
3132
import lombok.Builder;
@@ -42,7 +43,7 @@ public class JdbcRegistryLockManager implements IJdbcRegistryLockManager {
4243
private final JdbcRegistryLockRepository jdbcRegistryLockRepository;
4344

4445
// lockKey -> LockEntry
45-
private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new HashMap<>();
46+
private final Map<String, LockEntry> jdbcRegistryLockHolderMap = new ConcurrentHashMap<>();
4647

4748
public JdbcRegistryLockManager(JdbcRegistryProperties jdbcRegistryProperties,
4849
JdbcRegistryLockRepository jdbcRegistryLockRepository) {
@@ -54,8 +55,7 @@ public JdbcRegistryLockManager(JdbcRegistryProperties jdbcRegistryProperties,
5455
public void acquireJdbcRegistryLock(Long clientId, String lockKey) {
5556
String lockOwner = LockUtils.getLockOwner();
5657
while (true) {
57-
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
58-
if (lockEntry != null && lockOwner.equals(lockEntry.getLockOwner())) {
58+
if (tryReenterLock(lockKey, lockOwner)) {
5959
return;
6060
}
6161
JdbcRegistryLockDTO jdbcRegistryLock = JdbcRegistryLockDTO.builder()
@@ -85,13 +85,21 @@ public void acquireJdbcRegistryLock(Long clientId, String lockKey) {
8585
}
8686
}
8787

88+
private boolean tryReenterLock(String lockKey, String lockAcquirer) {
89+
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
90+
if (lockEntry != null && lockAcquirer.equals(lockEntry.getLockOwner())) {
91+
lockEntry.lockCount.incrementAndGet();
92+
return true;
93+
}
94+
return false;
95+
}
96+
8897
@Override
8998
public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeout) {
9099
String lockOwner = LockUtils.getLockOwner();
91100
long start = System.currentTimeMillis();
92101
while (System.currentTimeMillis() - start <= timeout) {
93-
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
94-
if (lockEntry != null && lockOwner.equals(lockEntry.getLockOwner())) {
102+
if (tryReenterLock(lockKey, lockOwner)) {
95103
return true;
96104
}
97105
JdbcRegistryLockDTO jdbcRegistryLock = JdbcRegistryLockDTO.builder()
@@ -124,14 +132,22 @@ public boolean acquireJdbcRegistryLock(Long clientId, String lockKey, long timeo
124132

125133
@Override
126134
public void releaseJdbcRegistryLock(Long clientId, String lockKey) {
135+
String lockOwner = LockUtils.getLockOwner();
127136
LockEntry lockEntry = jdbcRegistryLockHolderMap.get(lockKey);
128-
if (lockEntry == null) {
137+
if (lockEntry == null || !lockOwner.equals(lockEntry.getLockOwner())) {
129138
return;
130139
}
131140
if (!clientId.equals(lockEntry.getJdbcRegistryLock().getClientId())) {
132141
throw new UnsupportedOperationException(
133142
"The client " + clientId + " is not the lock owner of the lock: " + lockKey);
134143
}
144+
int newLockCount = lockEntry.lockCount.decrementAndGet();
145+
if (newLockCount > 0) {
146+
return;
147+
}
148+
if (newLockCount < 0) {
149+
throw new IllegalMonitorStateException("Jdbc lock count has gone negative for lock: " + lockKey);
150+
}
135151
jdbcRegistryLockRepository.deleteById(lockEntry.getJdbcRegistryLock().getId());
136152
jdbcRegistryLockHolderMap.remove(lockKey);
137153
}
@@ -144,6 +160,7 @@ public static class LockEntry {
144160

145161
private String lockKey;
146162
private String lockOwner;
163+
final AtomicInteger lockCount = new AtomicInteger(1);
147164
private JdbcRegistryLockDTO jdbcRegistryLock;
148165
}
149166
}

dolphinscheduler-registry/dolphinscheduler-registry-plugins/dolphinscheduler-registry-zookeeper/src/main/java/org/apache/dolphinscheduler/plugin/registry/zookeeper/ZookeeperRegistry.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -218,12 +218,6 @@ public boolean acquireLock(String key) {
218218
try {
219219
interProcessMutex =
220220
Optional.ofNullable(processMutexMap.get(key)).orElse(new InterProcessMutex(client, key));
221-
if (interProcessMutex.isAcquiredInThisProcess()) {
222-
// Since etcd/jdbc cannot implement a reentrant lock, we need to check if the lock is already acquired
223-
// If it is already acquired, return true directly
224-
// This means you only need to release once when you acquire multiple times
225-
return true;
226-
}
227221
interProcessMutex.acquire();
228222
processMutexMap.put(key, interProcessMutex);
229223
return true;
@@ -250,9 +244,6 @@ public boolean acquireLock(String key, long timeout) {
250244
try {
251245
interProcessMutex =
252246
Optional.ofNullable(processMutexMap.get(key)).orElse(new InterProcessMutex(client, key));
253-
if (interProcessMutex.isAcquiredInThisProcess()) {
254-
return true;
255-
}
256247
if (interProcessMutex.acquire(timeout, MILLISECONDS)) {
257248
processMutexMap.put(key, interProcessMutex);
258249
return true;
@@ -282,6 +273,9 @@ public boolean releaseLock(String key) {
282273
}
283274
try {
284275
interProcessMutex.release();
276+
if (interProcessMutex.isOwnedByCurrentThread()) {
277+
return true;
278+
}
285279
processMutexMap.remove(key);
286280
if (processMutexMap.isEmpty()) {
287281
threadLocalLockMap.remove();

0 commit comments

Comments
 (0)