Skip to content

Commit f0ec6b2

Browse files
thetumbledZhangJian He
authored andcommitted
[fix] Fix ArrayIndexOut0fBoundsException caused by optimistic lock (#4066)
1 parent d86ca0b commit f0ec6b2

12 files changed

+488
-74
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMap.java

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -327,15 +327,18 @@ private static final class Section<V> extends StampedLock {
327327
}
328328

329329
V get(long key, int keyHash) {
330-
int bucket = keyHash;
331330

332331
long stamp = tryOptimisticRead();
333332
boolean acquiredLock = false;
334333

334+
// add local variable here, so OutOfBound won't happen
335+
long[] keys = this.keys;
336+
V[] values = this.values;
337+
// calculate table.length as capacity to avoid rehash changing capacity
338+
int bucket = signSafeMod(keyHash, values.length);
339+
335340
try {
336341
while (true) {
337-
int capacity = this.capacity;
338-
bucket = signSafeMod(bucket, capacity);
339342

340343
// First try optimistic locking
341344
long storedKey = keys[bucket];
@@ -354,16 +357,15 @@ V get(long key, int keyHash) {
354357
if (!acquiredLock) {
355358
stamp = readLock();
356359
acquiredLock = true;
360+
361+
// update local variable
362+
keys = this.keys;
363+
values = this.values;
364+
bucket = signSafeMod(keyHash, values.length);
357365
storedKey = keys[bucket];
358366
storedValue = values[bucket];
359367
}
360368

361-
if (capacity != this.capacity) {
362-
// There has been a rehashing. We need to restart the search
363-
bucket = keyHash;
364-
continue;
365-
}
366-
367369
if (storedKey == key) {
368370
return storedValue != DeletedValue ? storedValue : null;
369371
} else if (storedValue == EmptyValue) {
@@ -372,7 +374,7 @@ V get(long key, int keyHash) {
372374
}
373375
}
374376

375-
++bucket;
377+
bucket = (bucket + 1) & (values.length - 1);
376378
}
377379
} finally {
378380
if (acquiredLock) {

bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashSet.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,11 @@ private static final class Section extends StampedLock {
290290
boolean contains(long item, int hash) {
291291
long stamp = tryOptimisticRead();
292292
boolean acquiredLock = false;
293-
int bucket = signSafeMod(hash, capacity);
293+
294+
// add local variable here, so OutOfBound won't happen
295+
long[] table = this.table;
296+
// calculate table.length as capacity to avoid rehash changing capacity
297+
int bucket = signSafeMod(hash, table.length);
294298

295299
try {
296300
while (true) {
@@ -311,7 +315,9 @@ boolean contains(long item, int hash) {
311315
stamp = readLock();
312316
acquiredLock = true;
313317

314-
bucket = signSafeMod(hash, capacity);
318+
// update local variable
319+
table = this.table;
320+
bucket = signSafeMod(hash, table.length);
315321
storedItem = table[bucket];
316322
}
317323

bookkeeper-server/src/main/java/org/apache/bookkeeper/util/collections/ConcurrentLongLongHashMap.java

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,9 @@ public Map<Long, Long> asMap() {
370370
// A section is a portion of the hash map that is covered by a single
371371
@SuppressWarnings("serial")
372372
private static final class Section extends StampedLock {
373+
// Each item take up 2 continuous array space.
374+
private static final int ITEM_SIZE = 2;
375+
373376
// Keys and values are stored interleaved in the table array
374377
private volatile long[] table;
375378

@@ -389,7 +392,7 @@ private static final class Section extends StampedLock {
389392
float expandFactor, float shrinkFactor) {
390393
this.capacity = alignToPowerOfTwo(capacity);
391394
this.initCapacity = this.capacity;
392-
this.table = new long[2 * this.capacity];
395+
this.table = new long[ITEM_SIZE * this.capacity];
393396
this.size = 0;
394397
this.usedBuckets = 0;
395398
this.autoShrink = autoShrink;
@@ -405,7 +408,10 @@ private static final class Section extends StampedLock {
405408
long get(long key, int keyHash) {
406409
long stamp = tryOptimisticRead();
407410
boolean acquiredLock = false;
408-
int bucket = signSafeMod(keyHash, capacity);
411+
// add local variable here, so OutOfBound won't happen
412+
long[] table = this.table;
413+
// calculate table.length/2 as capacity to avoid rehash changing capacity
414+
int bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
409415

410416
try {
411417
while (true) {
@@ -427,7 +433,9 @@ long get(long key, int keyHash) {
427433
stamp = readLock();
428434
acquiredLock = true;
429435

430-
bucket = signSafeMod(keyHash, capacity);
436+
// update local variable
437+
table = this.table;
438+
bucket = signSafeMod(keyHash, table.length / ITEM_SIZE);
431439
storedKey = table[bucket];
432440
storedValue = table[bucket + 1];
433441
}
@@ -440,7 +448,7 @@ long get(long key, int keyHash) {
440448
}
441449
}
442450

443-
bucket = (bucket + 2) & (table.length - 1);
451+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
444452
}
445453
} finally {
446454
if (acquiredLock) {
@@ -493,7 +501,7 @@ long put(long key, long value, int keyHash, boolean onlyIfAbsent, LongLongFuncti
493501
}
494502
}
495503

496-
bucket = (bucket + 2) & (table.length - 1);
504+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
497505
}
498506
} finally {
499507
if (usedBuckets > resizeThresholdUp) {
@@ -551,7 +559,7 @@ long addAndGet(long key, long delta, int keyHash) {
551559
}
552560
}
553561

554-
bucket = (bucket + 2) & (table.length - 1);
562+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
555563
}
556564
} finally {
557565
if (usedBuckets > resizeThresholdUp) {
@@ -611,7 +619,7 @@ boolean compareAndSet(long key, long currentValue, long newValue, int keyHash) {
611619
}
612620
}
613621

614-
bucket = (bucket + 2) & (table.length - 1);
622+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
615623
}
616624
} finally {
617625
if (usedBuckets > resizeThresholdUp) {
@@ -650,7 +658,7 @@ private long remove(long key, long value, int keyHash) {
650658
return ValueNotFound;
651659
}
652660

653-
bucket = (bucket + 2) & (table.length - 1);
661+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
654662
}
655663

656664
} finally {
@@ -681,7 +689,7 @@ int removeIf(LongPredicate filter) {
681689
int removedCount = 0;
682690
try {
683691
// Go through all the buckets for this section
684-
for (int bucket = 0; size > 0 && bucket < table.length; bucket += 2) {
692+
for (int bucket = 0; size > 0 && bucket < table.length; bucket += ITEM_SIZE) {
685693
long storedKey = table[bucket];
686694

687695
if (storedKey != DeletedKey && storedKey != EmptyKey) {
@@ -719,7 +727,7 @@ int removeIf(LongLongPredicate filter) {
719727
int removedCount = 0;
720728
try {
721729
// Go through all the buckets for this section
722-
for (int bucket = 0; size > 0 && bucket < table.length; bucket += 2) {
730+
for (int bucket = 0; size > 0 && bucket < table.length; bucket += ITEM_SIZE) {
723731
long storedKey = table[bucket];
724732
long storedValue = table[bucket + 1];
725733

@@ -753,20 +761,20 @@ int removeIf(LongLongPredicate filter) {
753761
}
754762

755763
private void cleanBucket(int bucket) {
756-
int nextInArray = (bucket + 2) & (table.length - 1);
764+
int nextInArray = (bucket + ITEM_SIZE) & (table.length - 1);
757765
if (table[nextInArray] == EmptyKey) {
758766
table[bucket] = EmptyKey;
759767
table[bucket + 1] = ValueNotFound;
760768
--usedBuckets;
761769

762770
// Cleanup all the buckets that were in `DeletedKey` state, so that we can reduce unnecessary expansions
763-
bucket = (bucket - 2) & (table.length - 1);
771+
bucket = (bucket - ITEM_SIZE) & (table.length - 1);
764772
while (table[bucket] == DeletedKey) {
765773
table[bucket] = EmptyKey;
766774
table[bucket + 1] = ValueNotFound;
767775
--usedBuckets;
768776

769-
bucket = (bucket - 2) & (table.length - 1);
777+
bucket = (bucket - ITEM_SIZE) & (table.length - 1);
770778
}
771779
} else {
772780
table[bucket] = DeletedKey;
@@ -807,7 +815,7 @@ public void forEach(BiConsumerLong processor) {
807815
}
808816

809817
// Go through all the buckets for this section
810-
for (int bucket = 0; bucket < table.length; bucket += 2) {
818+
for (int bucket = 0; bucket < table.length; bucket += ITEM_SIZE) {
811819
long storedKey = table[bucket];
812820
long storedValue = table[bucket + 1];
813821

@@ -833,11 +841,11 @@ public void forEach(BiConsumerLong processor) {
833841

834842
private void rehash(int newCapacity) {
835843
// Expand the hashmap
836-
long[] newTable = new long[2 * newCapacity];
844+
long[] newTable = new long[ITEM_SIZE * newCapacity];
837845
Arrays.fill(newTable, EmptyKey);
838846

839847
// Re-hash table
840-
for (int i = 0; i < table.length; i += 2) {
848+
for (int i = 0; i < table.length; i += ITEM_SIZE) {
841849
long storedKey = table[i];
842850
long storedValue = table[i + 1];
843851
if (storedKey != EmptyKey && storedKey != DeletedKey) {
@@ -855,7 +863,7 @@ private void rehash(int newCapacity) {
855863
}
856864

857865
private void shrinkToInitCapacity() {
858-
long[] newTable = new long[2 * initCapacity];
866+
long[] newTable = new long[ITEM_SIZE * initCapacity];
859867
Arrays.fill(newTable, EmptyKey);
860868

861869
table = newTable;
@@ -881,7 +889,7 @@ private static void insertKeyValueNoLock(long[] table, int capacity, long key, l
881889
return;
882890
}
883891

884-
bucket = (bucket + 2) & (table.length - 1);
892+
bucket = (bucket + ITEM_SIZE) & (table.length - 1);
885893
}
886894
}
887895
}
@@ -897,6 +905,8 @@ static final long hash(long key) {
897905
}
898906

899907
static final int signSafeMod(long n, int max) {
908+
// as the ITEM_SIZE of Section is 2, so the index is the multiple of 2
909+
// that is to left shift 1 bit
900910
return (int) (n & (max - 1)) << 1;
901911
}
902912

0 commit comments

Comments
 (0)