Skip to content

Commit db61102

Browse files
authored
GH-3672: Clean up Jdbc & ZK LockRegistry caches
Fixes #3672 * Clean up `JdbcLockRegistry`, `ZookeeperLockRegistry` cache automatically * setCapacity(int capacity) to cacheCapacity(int capacity) * field rename `capacity`to `cacheCapacity`, add static
1 parent 5452a6f commit db61102

File tree

9 files changed

+447
-42
lines changed

9 files changed

+447
-42
lines changed

spring-integration-jdbc/src/main/java/org/springframework/integration/jdbc/lock/JdbcLockRegistry.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
package org.springframework.integration.jdbc.lock;
1818

1919
import java.time.Duration;
20-
import java.util.Iterator;
20+
import java.util.LinkedHashMap;
2121
import java.util.Map;
2222
import java.util.Map.Entry;
23-
import java.util.concurrent.ConcurrentHashMap;
2423
import java.util.concurrent.TimeUnit;
2524
import java.util.concurrent.locks.Condition;
2625
import java.util.concurrent.locks.Lock;
@@ -54,19 +53,32 @@
5453
* @author Stefan Vassilev
5554
* @author Olivier Hubaut
5655
* @author Fran Aranda
56+
* @author Unseok Kim
5757
*
5858
* @since 4.3
5959
*/
6060
public class JdbcLockRegistry implements ExpirableLockRegistry, RenewableLockRegistry {
6161

6262
private static final int DEFAULT_IDLE = 100;
6363

64-
private final Map<String, JdbcLock> locks = new ConcurrentHashMap<>();
64+
private static final int DEFAULT_CAPACITY = 100_000;
65+
66+
private final Map<String, JdbcLock> locks =
67+
new LinkedHashMap<String, JdbcLock>(16, 0.75F, true) {
68+
69+
@Override
70+
protected boolean removeEldestEntry(Entry<String, JdbcLock> eldest) {
71+
return size() > JdbcLockRegistry.this.cacheCapacity;
72+
}
73+
74+
};
6575

6676
private final LockRepository client;
6777

6878
private Duration idleBetweenTries = Duration.ofMillis(DEFAULT_IDLE);
6979

80+
private int cacheCapacity = DEFAULT_CAPACITY;
81+
7082
public JdbcLockRegistry(LockRepository client) {
7183
this.client = client;
7284
}
@@ -82,11 +94,22 @@ public void setIdleBetweenTries(Duration idleBetweenTries) {
8294
this.idleBetweenTries = idleBetweenTries;
8395
}
8496

97+
/**
98+
* Set the capacity of cached locks.
99+
* @param cacheCapacity The capacity of cached lock, (default 100_000).
100+
* @since 5.5.6
101+
*/
102+
public void setCacheCapacity(int cacheCapacity) {
103+
this.cacheCapacity = cacheCapacity;
104+
}
105+
85106
@Override
86107
public Lock obtain(Object lockKey) {
87108
Assert.isInstanceOf(String.class, lockKey);
88109
String path = pathFor((String) lockKey);
89-
return this.locks.computeIfAbsent(path, (key) -> new JdbcLock(this.client, this.idleBetweenTries, key));
110+
synchronized (this.locks) {
111+
return this.locks.computeIfAbsent(path, key -> new JdbcLock(this.client, this.idleBetweenTries, key));
112+
}
90113
}
91114

92115
private String pathFor(String input) {
@@ -95,22 +118,24 @@ private String pathFor(String input) {
95118

96119
@Override
97120
public void expireUnusedOlderThan(long age) {
98-
Iterator<Entry<String, JdbcLock>> iterator = this.locks.entrySet().iterator();
99121
long now = System.currentTimeMillis();
100-
while (iterator.hasNext()) {
101-
Entry<String, JdbcLock> entry = iterator.next();
102-
JdbcLock lock = entry.getValue();
103-
if (now - lock.getLastUsed() > age && !lock.isAcquiredInThisProcess()) {
104-
iterator.remove();
105-
}
122+
synchronized (this.locks) {
123+
this.locks.entrySet()
124+
.removeIf(entry -> {
125+
JdbcLock lock = entry.getValue();
126+
return now - lock.getLastUsed() > age && !lock.isAcquiredInThisProcess();
127+
});
106128
}
107129
}
108130

109131
@Override
110132
public void renewLock(Object lockKey) {
111133
Assert.isInstanceOf(String.class, lockKey);
112134
String path = pathFor((String) lockKey);
113-
JdbcLock jdbcLock = this.locks.get(path);
135+
JdbcLock jdbcLock;
136+
synchronized (this.locks) {
137+
jdbcLock = this.locks.get(path);
138+
}
114139
if (jdbcLock == null) {
115140
throw new IllegalStateException("Could not found mutex at " + path);
116141
}

spring-integration-jdbc/src/test/java/org/springframework/integration/jdbc/lock/JdbcLockRegistryTests.java

Lines changed: 174 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2020 the original author or authors.
2+
* Copyright 2016-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,8 +20,12 @@
2020
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
2121

2222
import java.util.Map;
23+
import java.util.Queue;
2324
import java.util.concurrent.CountDownLatch;
25+
import java.util.concurrent.ExecutorService;
26+
import java.util.concurrent.Executors;
2427
import java.util.concurrent.Future;
28+
import java.util.concurrent.LinkedBlockingQueue;
2529
import java.util.concurrent.TimeUnit;
2630
import java.util.concurrent.atomic.AtomicBoolean;
2731
import java.util.concurrent.locks.Lock;
@@ -35,6 +39,7 @@
3539
import org.springframework.core.task.AsyncTaskExecutor;
3640
import org.springframework.core.task.SimpleAsyncTaskExecutor;
3741
import org.springframework.integration.test.util.TestUtils;
42+
import org.springframework.integration.util.UUIDConverter;
3843
import org.springframework.test.annotation.DirtiesContext;
3944
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
4045

@@ -43,6 +48,7 @@
4348
* @author Artem Bilan
4449
* @author Stefan Vassilev
4550
* @author Alexandre Strubel
51+
* @author Unseok Kim
4652
*
4753
* @since 4.3
4854
*/
@@ -312,4 +318,171 @@ public void testLockRenewLockNotOwned() {
312318
.isThrownBy(() -> registry.renewLock("foo"));
313319
}
314320

321+
@Test
322+
public void concurrentObtainCapacityTest() throws InterruptedException {
323+
final int KEY_CNT = 500;
324+
final int CAPACITY_CNT = 179;
325+
final int THREAD_CNT = 4;
326+
327+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
328+
registry.setCacheCapacity(CAPACITY_CNT);
329+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
330+
331+
for (int i = 0; i < KEY_CNT; i++) {
332+
int finalI = i;
333+
executorService.submit(() -> {
334+
countDownLatch.countDown();
335+
try {
336+
countDownLatch.await();
337+
}
338+
catch (InterruptedException e) {
339+
Thread.currentThread().interrupt();
340+
}
341+
String keyId = "foo:" + finalI;
342+
Lock obtain = registry.obtain(keyId);
343+
obtain.lock();
344+
obtain.unlock();
345+
});
346+
}
347+
executorService.shutdown();
348+
executorService.awaitTermination(5, TimeUnit.SECONDS);
349+
350+
//capacity limit test
351+
assertThat(getRegistryLocks(registry)).hasSize(CAPACITY_CNT);
352+
353+
354+
registry.expireUnusedOlderThan(-1000);
355+
assertThat(getRegistryLocks(registry)).isEmpty();
356+
}
357+
358+
@Test
359+
public void concurrentObtainRemoveOrderTest() throws InterruptedException {
360+
final int THREAD_CNT = 2;
361+
final int DUMMY_LOCK_CNT = 3;
362+
363+
final int CAPACITY_CNT = THREAD_CNT;
364+
365+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
366+
registry.setCacheCapacity(CAPACITY_CNT);
367+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
368+
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();
369+
370+
//Removed due to capcity limit
371+
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
372+
Lock obtainLock0 = registry.obtain("foo:" + i);
373+
obtainLock0.lock();
374+
obtainLock0.unlock();
375+
}
376+
377+
for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
378+
int finalI = i;
379+
executorService.submit(() -> {
380+
countDownLatch.countDown();
381+
try {
382+
countDownLatch.await();
383+
}
384+
catch (InterruptedException e) {
385+
Thread.currentThread().interrupt();
386+
}
387+
String keyId = "foo:" + finalI;
388+
remainLockCheckQueue.offer(toUUID(keyId));
389+
Lock obtain = registry.obtain(keyId);
390+
obtain.lock();
391+
obtain.unlock();
392+
});
393+
}
394+
395+
executorService.shutdown();
396+
executorService.awaitTermination(5, TimeUnit.SECONDS);
397+
398+
assertThat(getRegistryLocks(registry)).containsKeys(
399+
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
400+
}
401+
402+
@Test
403+
public void concurrentObtainAccessRemoveOrderTest() throws InterruptedException {
404+
final int THREAD_CNT = 2;
405+
final int DUMMY_LOCK_CNT = 3;
406+
407+
final int CAPACITY_CNT = THREAD_CNT + 1;
408+
final String REMAIN_DUMMY_LOCK_KEY = "foo:1";
409+
410+
final CountDownLatch countDownLatch = new CountDownLatch(THREAD_CNT);
411+
registry.setCacheCapacity(CAPACITY_CNT);
412+
final ExecutorService executorService = Executors.newFixedThreadPool(THREAD_CNT);
413+
final Queue<String> remainLockCheckQueue = new LinkedBlockingQueue<>();
414+
415+
//Removed due to capcity limit
416+
for (int i = 0; i < DUMMY_LOCK_CNT; i++) {
417+
Lock obtainLock0 = registry.obtain("foo:" + i);
418+
obtainLock0.lock();
419+
obtainLock0.unlock();
420+
}
421+
422+
Lock obtainLock0 = registry.obtain(REMAIN_DUMMY_LOCK_KEY);
423+
obtainLock0.lock();
424+
obtainLock0.unlock();
425+
remainLockCheckQueue.offer(toUUID(REMAIN_DUMMY_LOCK_KEY));
426+
427+
for (int i = DUMMY_LOCK_CNT; i < THREAD_CNT + DUMMY_LOCK_CNT; i++) {
428+
int finalI = i;
429+
executorService.submit(() -> {
430+
countDownLatch.countDown();
431+
try {
432+
countDownLatch.await();
433+
}
434+
catch (InterruptedException e) {
435+
Thread.currentThread().interrupt();
436+
}
437+
String keyId = "foo:" + finalI;
438+
remainLockCheckQueue.offer(toUUID(keyId));
439+
Lock obtain = registry.obtain(keyId);
440+
obtain.lock();
441+
obtain.unlock();
442+
});
443+
}
444+
445+
executorService.shutdown();
446+
executorService.awaitTermination(5, TimeUnit.SECONDS);
447+
448+
assertThat(getRegistryLocks(registry)).containsKeys(
449+
remainLockCheckQueue.toArray(new String[remainLockCheckQueue.size()]));
450+
}
451+
452+
@Test
453+
public void setCapacityTest() {
454+
final int CAPACITY_CNT = 4;
455+
registry.setCacheCapacity(CAPACITY_CNT);
456+
457+
registry.obtain("foo:1");
458+
registry.obtain("foo:2");
459+
registry.obtain("foo:3");
460+
461+
//capacity 4->3
462+
registry.setCacheCapacity(CAPACITY_CNT - 1);
463+
464+
registry.obtain("foo:4");
465+
466+
assertThat(getRegistryLocks(registry)).hasSize(3);
467+
assertThat(getRegistryLocks(registry)).containsKeys(toUUID("foo:2"),
468+
toUUID("foo:3"),
469+
toUUID("foo:4"));
470+
471+
//capacity 3->4
472+
registry.setCacheCapacity(CAPACITY_CNT);
473+
registry.obtain("foo:5");
474+
assertThat(getRegistryLocks(registry)).hasSize(4);
475+
assertThat(getRegistryLocks(registry)).containsKeys(toUUID("foo:3"),
476+
toUUID("foo:4"),
477+
toUUID("foo:5"));
478+
}
479+
480+
@SuppressWarnings("unchecked")
481+
private static Map<String, Lock> getRegistryLocks(JdbcLockRegistry registry) {
482+
return TestUtils.getPropertyValue(registry, "locks", Map.class);
483+
}
484+
485+
private static String toUUID(String key) {
486+
return UUIDConverter.getUUID(key).toString();
487+
}
315488
}

spring-integration-redis/src/main/java/org/springframework/integration/redis/util/RedisLockRegistry.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public final class RedisLockRegistry implements ExpirableLockRegistry, Disposabl
9999

100100
@Override
101101
protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
102-
return size() > RedisLockRegistry.this.capacity;
102+
return size() > RedisLockRegistry.this.cacheCapacity;
103103
}
104104

105105
};
@@ -114,7 +114,7 @@ protected boolean removeEldestEntry(Entry<String, RedisLock> eldest) {
114114

115115
private final long expireAfter;
116116

117-
private int capacity = DEFAULT_CAPACITY;
117+
private int cacheCapacity = DEFAULT_CAPACITY;
118118

119119
/**
120120
* An {@link ExecutorService} to call {@link StringRedisTemplate#delete} in
@@ -168,11 +168,11 @@ public void setExecutor(Executor executor) {
168168

169169
/**
170170
* Set the capacity of cached locks.
171-
* @param capacity The capacity of cached lock, (default 100_000).
171+
* @param cacheCapacity The capacity of cached lock, (default 100_000).
172172
* @since 5.5.6
173173
*/
174-
public void setCapacity(int capacity) {
175-
this.capacity = capacity;
174+
public void setCacheCapacity(int cacheCapacity) {
175+
this.cacheCapacity = cacheCapacity;
176176
}
177177

178178
@Override

0 commit comments

Comments
 (0)