From 5fc17b9c9791c5b6084662097dc3f3cee27830b3 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 12:30:48 +0800 Subject: [PATCH 01/11] fix ConcurrentLongLongPairHashMap. --- .../ConcurrentLongLongPairHashMap.java | 39 +++++++++++-------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java index 536eeb74488..04d5d3f1d16 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java @@ -309,6 +309,9 @@ public Map asMap() { // A section is a portion of the hash map that is covered by a single @SuppressWarnings("serial") private static final class Section extends StampedLock { + // Each item take up 4 continuous array space. + private static final int ITEM_SIZE = 4; + // Keys and values are stored interleaved in the table array private volatile long[] table; @@ -328,7 +331,7 @@ private static final class Section extends StampedLock { float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); this.initCapacity = this.capacity; - this.table = new long[4 * this.capacity]; + this.table = new long[ITEM_SIZE * this.capacity]; this.size = 0; this.usedBuckets = 0; this.autoShrink = autoShrink; @@ -344,7 +347,10 @@ private static final class Section extends StampedLock { LongPair get(long key1, long key2, int keyHash) { long stamp = tryOptimisticRead(); boolean acquiredLock = false; - int bucket = signSafeMod(keyHash, capacity); + // add local variable here, so OutOfBound won't happen + long[] table = this.table; + // calculate table.length / 4 as capacity to avoid rehash changing capacity + int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); try { while (true) { @@ -367,8 +373,9 @@ LongPair get(long key1, long key2, int keyHash) { if (!acquiredLock) { stamp = readLock(); acquiredLock = true; - - bucket = signSafeMod(keyHash, capacity); + // update local variable + table = this.table; + bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); storedKey1 = table[bucket]; storedKey2 = table[bucket + 1]; storedValue1 = table[bucket + 2]; @@ -383,7 +390,7 @@ LongPair get(long key1, long key2, int keyHash) { } } - bucket = (bucket + 4) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (acquiredLock) { @@ -435,7 +442,7 @@ boolean put(long key1, long key2, long value1, long value2, int keyHash, boolean } } - bucket = (bucket + 4) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (usedBuckets > resizeThresholdUp) { @@ -476,7 +483,7 @@ private boolean remove(long key1, long key2, long value1, long value2, int keyHa return false; } - bucket = (bucket + 4) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { @@ -502,7 +509,7 @@ private boolean remove(long key1, long key2, long value1, long value2, int keyHa } private void cleanBucket(int bucket) { - int nextInArray = (bucket + 4) & (table.length - 1); + int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1); if (table[nextInArray] == EmptyKey) { table[bucket] = EmptyKey; table[bucket + 1] = EmptyKey; @@ -512,7 +519,7 @@ private void cleanBucket(int bucket) { // Cleanup all the buckets that were in `DeletedKey` state, // so that we can reduce unnecessary expansions - bucket = (bucket - 4) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); while (table[bucket] == DeletedKey) { table[bucket] = EmptyKey; table[bucket + 1] = EmptyKey; @@ -520,7 +527,7 @@ private void cleanBucket(int bucket) { table[bucket + 3] = ValueNotFound; --usedBuckets; - bucket = (bucket - 4) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); } } else { table[bucket] = DeletedKey; @@ -563,7 +570,7 @@ public void forEach(BiConsumerLongPair processor) { } // Go through all the buckets for this section - for (int bucket = 0; bucket < table.length; bucket += 4) { + for (int bucket = 0; bucket < table.length; bucket += ITEM_SIZE) { long storedKey1 = table[bucket]; long storedKey2 = table[bucket + 1]; long storedValue1 = table[bucket + 2]; @@ -592,11 +599,11 @@ public void forEach(BiConsumerLongPair processor) { } private void rehash(int newCapacity) { - long[] newTable = new long[4 * newCapacity]; + long[] newTable = new long[ITEM_SIZE * newCapacity]; Arrays.fill(newTable, EmptyKey); // Re-hash table - for (int i = 0; i < table.length; i += 4) { + for (int i = 0; i < table.length; i += ITEM_SIZE) { long storedKey1 = table[i]; long storedKey2 = table[i + 1]; long storedValue1 = table[i + 2]; @@ -616,7 +623,7 @@ private void rehash(int newCapacity) { } private void shrinkToInitCapacity() { - long[] newTable = new long[4 * initCapacity]; + long[] newTable = new long[ITEM_SIZE * initCapacity]; Arrays.fill(newTable, EmptyKey); table = newTable; @@ -630,7 +637,7 @@ private void shrinkToInitCapacity() { } private static void insertKeyValueNoLock(long[] table, int capacity, long key1, long key2, long value1, - long value2) { + long value2) { int bucket = signSafeMod(hash(key1, key2), capacity); while (true) { @@ -645,7 +652,7 @@ private static void insertKeyValueNoLock(long[] table, int capacity, long key1, return; } - bucket = (bucket + 4) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } } From 77bb555b7fb968f4c68c9066933b35683fb63883 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 14:18:23 +0800 Subject: [PATCH 02/11] add test code. --- .../ConcurrentLongLongPairHashMapTest.java | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java index b13625fcc9b..3d2e1d9edc3 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java @@ -32,9 +32,12 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; import org.junit.Test; @@ -175,6 +178,68 @@ public void testExpandAndShrink() { assertTrue(map.capacity() == 8); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(map.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + while (true) { + try { + map.get(1, 1); + } catch (Exception e) { + ex.set(e); + } + } + }); + } + + assertTrue(map.put(1, 1, 11, 11)); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + assertTrue(map.put(2, 2, 22, 22)); + assertTrue(map.put(3, 3, 33, 33)); + assertEquals(map.capacity(), 8); + + // shrink hashmap + assertTrue(map.remove(2, 2, 22, 22)); + assertTrue(map.remove(3, 3, 33, 33)); + assertEquals(map.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testExpandShrinkAndClear() { ConcurrentLongLongPairHashMap map = ConcurrentLongLongPairHashMap.newBuilder() From bef3ee7dfc1a4352039facdfa0530828ba9b3b7c Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 14:32:34 +0800 Subject: [PATCH 03/11] add test code. --- .../ConcurrentLongHashMapTest.java | 61 +++++++++++++++++ .../ConcurrentOpenHashMapTest.java | 64 ++++++++++++++++++ .../ConcurrentOpenHashSetTest.java | 65 +++++++++++++++++++ 3 files changed, 190 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java index f1372b28944..4f1db0e8644 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java @@ -41,6 +41,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongFunction; import org.junit.Test; @@ -187,6 +188,66 @@ public void testExpandAndShrink() { assertTrue(map.capacity() == 8); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(map.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + try { + map.get(1); + } catch (Exception e) { + ex.set(e); + } + }); + } + + assertNull(map.put(1,"v1")); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + assertNull(map.put(2, "v2")); + assertNull(map.put(3, "v3")); + assertEquals(map.capacity(), 8); + + // shrink hashmap + assertTrue(map.remove(2, "v2")); + assertTrue(map.remove(3, "v3")); + assertEquals(map.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testExpandShrinkAndClear() { ConcurrentLongHashMap map = ConcurrentLongHashMap.newBuilder() diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java index 3ed17edb57a..c0c86bdf71b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java @@ -33,11 +33,13 @@ import java.util.List; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.BiPredicate; import java.util.function.Function; import org.junit.Test; @@ -180,6 +182,68 @@ public void testExpandAndShrink() { assertTrue(map.capacity() == 8); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(map.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + while (true) { + try { + map.get("k3"); + } catch (Exception e) { + ex.set(e); + } + } + }); + } + + assertNull(map.put("k1","v1")); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + assertNull(map.put("k2", "v2")); + assertNull(map.put("k3", "v3")); + assertEquals(map.capacity(), 8); + + // shrink hashmap + assertTrue(map.remove("k2", "v2")); + assertTrue(map.remove("k3", "v3")); + assertEquals(map.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testExpandShrinkAndClear() { ConcurrentOpenHashMap map = ConcurrentOpenHashMap.newBuilder() diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java index 8875be2f579..dcbacd7ae09 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java @@ -30,9 +30,12 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Test; /** @@ -190,6 +193,68 @@ public void testExpandShrinkAndClear() { assertTrue(map.capacity() == initCapacity); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentOpenHashSet set = ConcurrentOpenHashSet.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(set.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + while (true) { + try { + set.contains("k2"); + } catch (Exception e) { + ex.set(e); + } + } + }); + } + + assertTrue(set.add("k1")); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + assertTrue(set.add("k2")); + assertTrue(set.add("k3")); + assertEquals(set.capacity(), 8); + + // shrink hashmap + assertTrue(set.remove("k2")); + assertTrue(set.remove("k3")); + assertEquals(set.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testReduceUnnecessaryExpansions(){ ConcurrentOpenHashSet set = From 73ae1042bedd7239bf81bc22d1927615d86d6225 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 15:14:39 +0800 Subject: [PATCH 04/11] add test code. --- .../ConcurrentLongHashSetTest.java | 65 +++++++++++++++++++ .../ConcurrentLongLongHashMapTest.java | 64 ++++++++++++++++++ 2 files changed, 129 insertions(+) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java index 2c28e882930..dac41ec22e2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java @@ -31,9 +31,12 @@ import java.util.Collections; import java.util.List; import java.util.Random; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicReference; + import org.junit.Test; /** @@ -295,6 +298,68 @@ public void testExpandAndShrink() { assertTrue(map.capacity() == 8); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentLongHashSet set = ConcurrentLongHashSet.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(set.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + while (true) { + try { + set.contains(1); + } catch (Exception e) { + ex.set(e); + } + } + }); + } + + assertTrue(set.add(1)); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + assertTrue(set.add(2)); + assertTrue(set.add(3)); + assertEquals(set.capacity(), 8); + + // shrink hashmap + assertTrue(set.remove(2)); + assertTrue(set.remove(3)); + assertEquals(set.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testExpandShrinkAndClear() { ConcurrentLongHashSet map = ConcurrentLongHashSet.newBuilder() diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java index aca46946746..2c23de1704d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java @@ -32,10 +32,13 @@ import java.util.List; import java.util.Map; import java.util.Random; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction; import org.junit.Test; @@ -159,6 +162,67 @@ public void testExpandAndShrink() { assertTrue(map.capacity() == 8); } + @Test + public void testConcurrentExpandAndShrinkAndGet() throws Throwable { + ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder() + .expectedItems(2) + .concurrencyLevel(1) + .autoShrink(true) + .mapIdleFactor(0.25f) + .build(); + assertEquals(map.capacity(), 4); + + ExecutorService executor = Executors.newCachedThreadPool(); + final int readThreads = 16; + final int writeThreads = 1; + final int n = 1_000; + CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads); + Future future = null; + AtomicReference ex = new AtomicReference<>(); + + for (int i = 0; i < readThreads; i++) { + executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + while (true) { + try { + map.get(1); + } catch (Exception e) { + ex.set(e); + } + } + }); + } + map.put(1, 11); + future = executor.submit(() -> { + try { + barrier.await(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + for (int i = 0; i < n; i++) { + // expand hashmap + map.put(2, 22); + map.put(3, 33); + assertEquals(map.capacity(), 8); + + // shrink hashmap + map.remove(2, 22); + map.remove(3, 33); + assertEquals(map.capacity(), 4); + } + }); + + future.get(); + assertTrue(ex.get() == null); + // shut down pool + executor.shutdown(); + } + @Test public void testExpandShrinkAndClear() { ConcurrentLongLongHashMap map = ConcurrentLongLongHashMap.newBuilder() From 85f5d4f8c730ce280509ac3e2e94b13ce3eea06e Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 15:20:29 +0800 Subject: [PATCH 05/11] fix ConcurrentLongHashMap. --- .../collections/ConcurrentLongHashMap.java | 22 ++++++++++--------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java index 7ff465f0798..ec8b700c994 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java @@ -327,15 +327,18 @@ private static final class Section extends StampedLock { } V get(long key, int keyHash) { - int bucket = keyHash; long stamp = tryOptimisticRead(); boolean acquiredLock = false; + // add local variable here, so OutOfBound won't happen + long[] keys = this.keys; + V[] values = this.values; + // calculate table.length as capacity to avoid rehash changing capacity + int bucket = signSafeMod(keyHash, values.length); + try { while (true) { - int capacity = this.capacity; - bucket = signSafeMod(bucket, capacity); // First try optimistic locking long storedKey = keys[bucket]; @@ -354,16 +357,15 @@ V get(long key, int keyHash) { if (!acquiredLock) { stamp = readLock(); acquiredLock = true; + + // update local variable + keys = this.keys; + values = this.values; + bucket = signSafeMod(keyHash, values.length); storedKey = keys[bucket]; storedValue = values[bucket]; } - if (capacity != this.capacity) { - // There has been a rehashing. We need to restart the search - bucket = keyHash; - continue; - } - if (storedKey == key) { return storedValue != DeletedValue ? storedValue : null; } else if (storedValue == EmptyValue) { @@ -372,7 +374,7 @@ V get(long key, int keyHash) { } } - ++bucket; + bucket = (bucket + 1) & (values.length - 1); } } finally { if (acquiredLock) { From afd621bf0bfe3b11fc8bfa2188201b3b39f6ccbc Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 15:47:26 +0800 Subject: [PATCH 06/11] fix ConcurrentLongHashSet. --- .../util/collections/ConcurrentLongHashSet.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java index a66de9ed8b4..d98b3062a7b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java @@ -290,7 +290,11 @@ private static final class Section extends StampedLock { boolean contains(long item, int hash) { long stamp = tryOptimisticRead(); boolean acquiredLock = false; - int bucket = signSafeMod(hash, capacity); + + // add local variable here, so OutOfBound won't happen + long[] table = this.table; + // calculate table.length as capacity to avoid rehash changing capacity + int bucket = signSafeMod(hash, table.length); try { while (true) { @@ -311,7 +315,9 @@ boolean contains(long item, int hash) { stamp = readLock(); acquiredLock = true; - bucket = signSafeMod(hash, capacity); + // update local variable + table = this.table; + bucket = signSafeMod(hash, table.length); storedItem = table[bucket]; } From b0f7c7c6fdf9d35ed4b9f4f3c739517ada993e32 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 15:56:51 +0800 Subject: [PATCH 07/11] fix ConcurrentLongLongHashMap. --- .../ConcurrentLongLongHashMap.java | 46 +++++++++++-------- 1 file changed, 28 insertions(+), 18 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java index 25bcb4061a8..3cf5be37d58 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java @@ -370,6 +370,9 @@ public Map asMap() { // A section is a portion of the hash map that is covered by a single @SuppressWarnings("serial") private static final class Section extends StampedLock { + // Each item take up 2 continuous array space. + private static final int ITEM_SIZE = 2; + // Keys and values are stored interleaved in the table array private volatile long[] table; @@ -389,7 +392,7 @@ private static final class Section extends StampedLock { float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); this.initCapacity = this.capacity; - this.table = new long[2 * this.capacity]; + this.table = new long[ITEM_SIZE * this.capacity]; this.size = 0; this.usedBuckets = 0; this.autoShrink = autoShrink; @@ -405,7 +408,10 @@ private static final class Section extends StampedLock { long get(long key, int keyHash) { long stamp = tryOptimisticRead(); boolean acquiredLock = false; - int bucket = signSafeMod(keyHash, capacity); + // add local variable here, so OutOfBound won't happen + long[] table = this.table; + // calculate table.length/2 as capacity to avoid rehash changing capacity + int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); try { while (true) { @@ -427,7 +433,9 @@ long get(long key, int keyHash) { stamp = readLock(); acquiredLock = true; - bucket = signSafeMod(keyHash, capacity); + // update local variable + table = this.table; + bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); storedKey = table[bucket]; storedValue = table[bucket + 1]; } @@ -440,7 +448,7 @@ long get(long key, int keyHash) { } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (acquiredLock) { @@ -493,7 +501,7 @@ long put(long key, long value, int keyHash, boolean onlyIfAbsent, LongLongFuncti } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (usedBuckets > resizeThresholdUp) { @@ -551,7 +559,7 @@ long addAndGet(long key, long delta, int keyHash) { } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (usedBuckets > resizeThresholdUp) { @@ -611,7 +619,7 @@ boolean compareAndSet(long key, long currentValue, long newValue, int keyHash) { } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (usedBuckets > resizeThresholdUp) { @@ -650,7 +658,7 @@ private long remove(long key, long value, int keyHash) { return ValueNotFound; } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { @@ -681,7 +689,7 @@ int removeIf(LongPredicate filter) { int removedCount = 0; try { // Go through all the buckets for this section - for (int bucket = 0; size > 0 && bucket < table.length; bucket += 2) { + for (int bucket = 0; size > 0 && bucket < table.length; bucket += ITEM_SIZE) { long storedKey = table[bucket]; if (storedKey != DeletedKey && storedKey != EmptyKey) { @@ -719,7 +727,7 @@ int removeIf(LongLongPredicate filter) { int removedCount = 0; try { // Go through all the buckets for this section - for (int bucket = 0; size > 0 && bucket < table.length; bucket += 2) { + for (int bucket = 0; size > 0 && bucket < table.length; bucket += ITEM_SIZE) { long storedKey = table[bucket]; long storedValue = table[bucket + 1]; @@ -753,20 +761,20 @@ int removeIf(LongLongPredicate filter) { } private void cleanBucket(int bucket) { - int nextInArray = (bucket + 2) & (table.length - 1); + int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1); if (table[nextInArray] == EmptyKey) { table[bucket] = EmptyKey; table[bucket + 1] = ValueNotFound; --usedBuckets; // Cleanup all the buckets that were in `DeletedKey` state, so that we can reduce unnecessary expansions - bucket = (bucket - 2) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); while (table[bucket] == DeletedKey) { table[bucket] = EmptyKey; table[bucket + 1] = ValueNotFound; --usedBuckets; - bucket = (bucket - 2) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); } } else { table[bucket] = DeletedKey; @@ -807,7 +815,7 @@ public void forEach(BiConsumerLong processor) { } // Go through all the buckets for this section - for (int bucket = 0; bucket < table.length; bucket += 2) { + for (int bucket = 0; bucket < table.length; bucket += ITEM_SIZE) { long storedKey = table[bucket]; long storedValue = table[bucket + 1]; @@ -833,11 +841,11 @@ public void forEach(BiConsumerLong processor) { private void rehash(int newCapacity) { // Expand the hashmap - long[] newTable = new long[2 * newCapacity]; + long[] newTable = new long[ITEM_SIZE * newCapacity]; Arrays.fill(newTable, EmptyKey); // Re-hash table - for (int i = 0; i < table.length; i += 2) { + for (int i = 0; i < table.length; i += ITEM_SIZE) { long storedKey = table[i]; long storedValue = table[i + 1]; if (storedKey != EmptyKey && storedKey != DeletedKey) { @@ -855,7 +863,7 @@ private void rehash(int newCapacity) { } private void shrinkToInitCapacity() { - long[] newTable = new long[2 * initCapacity]; + long[] newTable = new long[ITEM_SIZE * initCapacity]; Arrays.fill(newTable, EmptyKey); table = newTable; @@ -881,7 +889,7 @@ private static void insertKeyValueNoLock(long[] table, int capacity, long key, l return; } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } } @@ -897,6 +905,8 @@ static final long hash(long key) { } static final int signSafeMod(long n, int max) { + // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2 + // that is to left shift 1 bit return (int) (n & (max - 1)) << 1; } From 26eb3a18a1a8953ae08ee5d3730545d2fe9bb135 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 15:58:26 +0800 Subject: [PATCH 08/11] add comment in ConcurrentLongLongPairHashMap. --- .../util/collections/ConcurrentLongLongPairHashMap.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java index 04d5d3f1d16..5b23acbe014 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMap.java @@ -671,6 +671,8 @@ static final long hash(long key1, long key2) { } static final int signSafeMod(long n, int max) { + // as the ITEM_SIZE of Section is 4, so the index is the multiple of 4 + // that is to left shift 2 bits return (int) (n & (max - 1)) << 2; } From 7984d34482b5af5069918f9e099ff4c3aae91e96 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 16:04:21 +0800 Subject: [PATCH 09/11] fix ConcurrentOpenHashMap. --- .../collections/ConcurrentOpenHashMap.java | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java index cab3ce8ea3b..163918adbcd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMap.java @@ -278,6 +278,9 @@ public List values() { // A section is a portion of the hash map that is covered by a single @SuppressWarnings("serial") private static final class Section extends StampedLock { + // Each item take up 2 continuous array space. + private static final int ITEM_SIZE = 2; + // Keys and values are stored interleaved in the table array private volatile Object[] table; @@ -297,7 +300,7 @@ private static final class Section extends StampedLock { float expandFactor, float shrinkFactor) { this.capacity = alignToPowerOfTwo(capacity); this.initCapacity = this.capacity; - this.table = new Object[2 * this.capacity]; + this.table = new Object[ITEM_SIZE * this.capacity]; this.size = 0; this.usedBuckets = 0; this.autoShrink = autoShrink; @@ -312,7 +315,11 @@ private static final class Section extends StampedLock { V get(K key, int keyHash) { long stamp = tryOptimisticRead(); boolean acquiredLock = false; - int bucket = signSafeMod(keyHash, capacity); + + // add local variable here, so OutOfBound won't happen + Object[] table = this.table; + // calculate table.length / 2 as capacity to avoid rehash changing capacity + int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); try { while (true) { @@ -334,7 +341,9 @@ V get(K key, int keyHash) { stamp = readLock(); acquiredLock = true; - bucket = signSafeMod(keyHash, capacity); + // update local variable + table = this.table; + bucket = signSafeMod(keyHash, table.length / ITEM_SIZE); storedKey = (K) table[bucket]; storedValue = (V) table[bucket + 1]; } @@ -347,7 +356,7 @@ V get(K key, int keyHash) { } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (acquiredLock) { @@ -400,7 +409,7 @@ V put(K key, V value, int keyHash, boolean onlyIfAbsent, Function valuePro } } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { if (usedBuckets > resizeThresholdUp) { @@ -438,7 +447,7 @@ private V remove(K key, Object value, int keyHash) { return null; } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } finally { @@ -496,7 +505,7 @@ public void forEach(BiConsumer processor) { } // Go through all the buckets for this section - for (int bucket = 0; bucket < table.length; bucket += 2) { + for (int bucket = 0; bucket < table.length; bucket += ITEM_SIZE) { K storedKey = (K) table[bucket]; V storedValue = (V) table[bucket + 1]; @@ -526,7 +535,7 @@ int removeIf(BiPredicate filter) { int removedCount = 0; try { // Go through all the buckets for this section - for (int bucket = 0; size > 0 && bucket < table.length; bucket += 2) { + for (int bucket = 0; size > 0 && bucket < table.length; bucket += ITEM_SIZE) { K storedKey = (K) table[bucket]; V storedValue = (V) table[bucket + 1]; @@ -564,7 +573,7 @@ int removeIf(BiPredicate filter) { } private void cleanBucket(int bucket) { - int nextInArray = (bucket + 2) & (table.length - 1); + int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1); if (table[nextInArray] == EmptyKey) { table[bucket] = EmptyKey; table[bucket + 1] = null; @@ -572,13 +581,13 @@ private void cleanBucket(int bucket) { // Cleanup all the buckets that were in `DeletedKey` state, // so that we can reduce unnecessary expansions - bucket = (bucket - 2) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); while (table[bucket] == DeletedKey) { table[bucket] = EmptyKey; table[bucket + 1] = null; --usedBuckets; - bucket = (bucket - 2) & (table.length - 1); + bucket = (bucket - ITEM_SIZE) & (table.length - 1); } } else { table[bucket] = DeletedKey; @@ -588,10 +597,10 @@ private void cleanBucket(int bucket) { private void rehash(int newCapacity) { // Expand the hashmap - Object[] newTable = new Object[2 * newCapacity]; + Object[] newTable = new Object[ITEM_SIZE * newCapacity]; // Re-hash table - for (int i = 0; i < table.length; i += 2) { + for (int i = 0; i < table.length; i += ITEM_SIZE) { K storedKey = (K) table[i]; V storedValue = (V) table[i + 1]; if (storedKey != EmptyKey && storedKey != DeletedKey) { @@ -609,7 +618,7 @@ private void rehash(int newCapacity) { } private void shrinkToInitCapacity() { - Object[] newTable = new Object[2 * initCapacity]; + Object[] newTable = new Object[ITEM_SIZE * initCapacity]; table = newTable; size = 0; @@ -634,7 +643,7 @@ private static void insertKeyValueNoLock(Object[] table, int capacity, K return; } - bucket = (bucket + 2) & (table.length - 1); + bucket = (bucket + ITEM_SIZE) & (table.length - 1); } } } @@ -650,6 +659,8 @@ static final long hash(K key) { } static final int signSafeMod(long n, int max) { + // as the ITEM_SIZE of Section is 2, so the index is the multiple of 2 + // that is to left shift 1 bit return (int) (n & (max - 1)) << 1; } From 2727e746a64f72f5f3ad32470b6c6f879adcf269 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Fri, 1 Sep 2023 16:06:30 +0800 Subject: [PATCH 10/11] fix ConcurrentOpenHashSet. --- .../collections/ConcurrentOpenHashSet.java | 22 ++++++++----------- 1 file changed, 9 insertions(+), 13 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java index a7f39173d39..a5e12d10fa2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSet.java @@ -265,16 +265,16 @@ private static final class Section extends StampedLock { } boolean contains(V value, int keyHash) { - int bucket = keyHash; - long stamp = tryOptimisticRead(); boolean acquiredLock = false; + // add local variable here, so OutOfBound won't happen + V[] values = this.values; + // calculate table.length as capacity to avoid rehash changing capacity + int bucket = signSafeMod(keyHash, values.length); + try { while (true) { - int capacity = this.capacity; - bucket = signSafeMod(bucket, capacity); - // First try optimistic locking V storedValue = values[bucket]; @@ -292,15 +292,12 @@ boolean contains(V value, int keyHash) { stamp = readLock(); acquiredLock = true; + // update local variable + values = this.values; + bucket = signSafeMod(keyHash, values.length); storedValue = values[bucket]; } - if (capacity != this.capacity) { - // There has been a rehashing. We need to restart the search - bucket = keyHash; - continue; - } - if (value.equals(storedValue)) { return true; } else if (storedValue == EmptyValue) { @@ -308,8 +305,7 @@ boolean contains(V value, int keyHash) { return false; } } - - ++bucket; + bucket = (bucket + 1) & (values.length - 1); } } finally { if (acquiredLock) { From fd5c41af243a11f55f75327259a56b88de8f4767 Mon Sep 17 00:00:00 2001 From: thetumbled <843221020@qq.com> Date: Sun, 28 Apr 2024 10:22:52 +0800 Subject: [PATCH 11/11] fix checkstyle. --- .../bookkeeper/util/collections/ConcurrentLongHashMapTest.java | 2 +- .../bookkeeper/util/collections/ConcurrentLongHashSetTest.java | 1 - .../util/collections/ConcurrentLongLongHashMapTest.java | 1 - .../util/collections/ConcurrentLongLongPairHashMapTest.java | 1 - .../bookkeeper/util/collections/ConcurrentOpenHashMapTest.java | 2 +- .../bookkeeper/util/collections/ConcurrentOpenHashSetTest.java | 1 - 6 files changed, 2 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java index 4f1db0e8644..b1f1b5437d4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java @@ -221,7 +221,7 @@ public void testConcurrentExpandAndShrinkAndGet() throws Throwable { }); } - assertNull(map.put(1,"v1")); + assertNull(map.put(1, "v1")); future = executor.submit(() -> { try { barrier.await(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java index dac41ec22e2..1c6bf12c695 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSetTest.java @@ -36,7 +36,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; - import org.junit.Test; /** diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java index 2c23de1704d..8121e3364c7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMapTest.java @@ -38,7 +38,6 @@ import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap.LongLongFunction; import org.junit.Test; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java index 3d2e1d9edc3..36605c5b96f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongPairHashMapTest.java @@ -37,7 +37,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; - import org.apache.bookkeeper.util.collections.ConcurrentLongLongPairHashMap.LongPair; import org.junit.Test; diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java index c0c86bdf71b..a7835e63897 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashMapTest.java @@ -217,7 +217,7 @@ public void testConcurrentExpandAndShrinkAndGet() throws Throwable { }); } - assertNull(map.put("k1","v1")); + assertNull(map.put("k1", "v1")); future = executor.submit(() -> { try { barrier.await(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java index dcbacd7ae09..8840eacb09d 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentOpenHashSetTest.java @@ -35,7 +35,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; - import org.junit.Test; /**