Skip to content

Commit 120aa34

Browse files
committed
AMQ-9824 - Cleanup code in KahaDB classes
This cleanups the MessageDatabase and KahaDBStore classes. This commit includes the following: * Fixes the scope of several methods and types. For example, there were many cases where protected methods were referencing types that were package scope. * Simplified the code by replacing anonymous methods with lambdas * removed unused methods and parameters * removed unnecessary casts * cleaned up the use of generics where types could be inferred * Replaced the ReentrantReadWriteLock that was used for indexLock with ReentrantLock becuase only the write lock was ever being used (the page file doesn't support concurrent reads right now). This should provide a small performance/memory improvement and simplifies the code a bit. * removed unnecessary null initializations * cleaned up logging to remove string concatenation and instead use parameters * removed method overrides that are the same as the parent or just call the super method * removed unused checked exceptions from method's throws * marked inner classes as static when possible (cherry picked from commit b7184b4)
1 parent 4f3bafb commit 120aa34

File tree

7 files changed

+685
-929
lines changed

7 files changed

+685
-929
lines changed

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java

Lines changed: 364 additions & 461 deletions
Large diffs are not rendered by default.

activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java

Lines changed: 288 additions & 429 deletions
Large diffs are not rendered by default.

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalCorruptionEofIndexRecoveryTest.java

Lines changed: 17 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -427,31 +427,28 @@ private void corruptBatchEndEof(int id) throws Exception{
427427
private void corruptOrderIndex(final int num, final int size) throws Exception {
428428
//This is because of AMQ-6097, now that the MessageOrderIndex stores the size in the Location,
429429
//we need to corrupt that value as well
430-
final KahaDBStore kahaDbStore = (KahaDBStore) ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
431-
kahaDbStore.indexLock.writeLock().lock();
430+
final KahaDBStore kahaDbStore = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore();
431+
kahaDbStore.indexLock.lock();
432432
try {
433-
kahaDbStore.pageFile.tx().execute(new Transaction.Closure<IOException>() {
434-
@Override
435-
public void execute(Transaction tx) throws IOException {
436-
StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(
437-
(ActiveMQQueue)destination), tx);
438-
int i = 1;
439-
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
440-
Entry<Long, MessageKeys> entry = iterator.next();
441-
if (i == num) {
442-
//change the size value to the wrong size
443-
sd.orderIndex.get(tx, entry.getKey());
444-
MessageKeys messageKeys = entry.getValue();
445-
messageKeys.location.setSize(size);
446-
sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys);
447-
break;
448-
}
449-
i++;
433+
kahaDbStore.pageFile.tx().execute(tx -> {
434+
StoredDestination sd = kahaDbStore.getStoredDestination(kahaDbStore.convert(
435+
(ActiveMQQueue)destination), tx);
436+
int i = 1;
437+
for (Iterator<Entry<Long, MessageKeys>> iterator = sd.orderIndex.iterator(tx); iterator.hasNext();) {
438+
Entry<Long, MessageKeys> entry = iterator.next();
439+
if (i == num) {
440+
//change the size value to the wrong size
441+
sd.orderIndex.get(tx, entry.getKey());
442+
MessageKeys messageKeys = entry.getValue();
443+
messageKeys.location.setSize(size);
444+
sd.orderIndex.put(tx, sd.orderIndex.lastGetPriority(), entry.getKey(), messageKeys);
445+
break;
450446
}
447+
i++;
451448
}
452449
});
453450
} finally {
454-
kahaDbStore.indexLock.writeLock().unlock();
451+
kahaDbStore.indexLock.unlock();
455452
}
456453
}
457454

activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/KahaDBStoreOpenWireVersionTest.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -164,22 +164,19 @@ private void corruptIndex() throws IOException {
164164

165165
//blow up the index
166166
try {
167-
store.indexLock.writeLock().lock();
168-
pageFile.tx().execute(new Transaction.Closure<IOException>() {
169-
@Override
170-
public void execute(Transaction tx) throws IOException {
171-
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
172-
.hasNext();) {
173-
Entry<String, StoredDestination> entry = iterator.next();
174-
entry.getValue().orderIndex.nextMessageId = -100;
175-
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
176-
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
177-
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
178-
}
167+
store.indexLock.lock();
168+
pageFile.tx().execute(tx -> {
169+
for (Iterator<Entry<String, StoredDestination>> iterator = metadata.destinations.iterator(tx); iterator
170+
.hasNext();) {
171+
Entry<String, StoredDestination> entry = iterator.next();
172+
entry.getValue().orderIndex.nextMessageId = -100;
173+
entry.getValue().orderIndex.defaultPriorityIndex.clear(tx);
174+
entry.getValue().orderIndex.lowPriorityIndex.clear(tx);
175+
entry.getValue().orderIndex.highPriorityIndex.clear(tx);
179176
}
180177
});
181178
} finally {
182-
store.indexLock.writeLock().unlock();
179+
store.indexLock.unlock();
183180
}
184181
}
185182

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2982Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,11 +62,11 @@ public void forceCleanup() throws IOException {
6262

6363
public int getFileMapSize() throws IOException {
6464
// ensure save memory publishing, use the right lock
65-
indexLock.readLock().lock();
65+
indexLock.lock();
6666
try {
6767
return getJournal().getFileMap().size();
6868
} finally {
69-
indexLock.readLock().unlock();
69+
indexLock.unlock();
7070
}
7171
}
7272
}

activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2983Test.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ public void forceCleanup() throws IOException {
6565

6666
public int getFileMapSize() throws IOException {
6767
// ensure save memory publishing, use the right lock
68-
indexLock.readLock().lock();
68+
indexLock.lock();
6969
try {
7070
return getJournal().getFileMap().size();
7171
} finally {
72-
indexLock.readLock().unlock();
72+
indexLock.unlock();
7373
}
7474
}
7575
}

activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public void testLocationIndexMatchesOrderIndex() throws Exception {
6161

6262
//Iterate over the order index and add up the size of the messages to compare
6363
//to the location index
64-
kahaDbStore.indexLock.readLock().lock();
64+
kahaDbStore.indexLock.lock();
6565
try {
6666
long size = kahaDbStore.pageFile.tx().execute(new Transaction.CallableClosure<Long, IOException>() {
6767
@Override
@@ -79,7 +79,7 @@ public Long execute(Transaction tx) throws IOException {
7979
assertEquals("Order index size values don't match message size",
8080
size, messageStore.getMessageSize());
8181
} finally {
82-
kahaDbStore.indexLock.readLock().unlock();
82+
kahaDbStore.indexLock.unlock();
8383
}
8484
}
8585

0 commit comments

Comments
 (0)