Skip to content

Commit c7d7fc5

Browse files
authored
fix: Every time use KeyValueStorage.Batch, close it in finally (#4578)
Co-authored-by: nuolin <[email protected]>
1 parent c64dc8a commit c7d7fc5

File tree

3 files changed

+45
-45
lines changed

3 files changed

+45
-45
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -827,23 +827,24 @@ public void checkpoint(Checkpoint checkpoint) throws IOException {
827827
// Write all the pending entries into the entry logger and collect the offset
828828
// position for each entry
829829

830-
Batch batch = entryLocationIndex.newBatch();
831-
writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
832-
long location = entryLogger.addEntry(ledgerId, entry);
833-
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
834-
});
830+
try (Batch batch = entryLocationIndex.newBatch()) {
831+
writeCacheBeingFlushed.forEach((ledgerId, entryId, entry) -> {
832+
long location = entryLogger.addEntry(ledgerId, entry);
833+
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
834+
});
835835

836-
long entryLoggerStart = MathUtils.nowInNano();
837-
entryLogger.flush();
838-
recordSuccessfulEvent(dbLedgerStorageStats.getFlushEntryLogStats(), entryLoggerStart);
836+
long entryLoggerStart = MathUtils.nowInNano();
837+
entryLogger.flush();
838+
recordSuccessfulEvent(dbLedgerStorageStats.getFlushEntryLogStats(), entryLoggerStart);
839839

840-
long batchFlushStartTime = MathUtils.nowInNano();
841-
batch.flush();
842-
batch.close();
843-
recordSuccessfulEvent(dbLedgerStorageStats.getFlushLocationIndexStats(), batchFlushStartTime);
844-
if (log.isDebugEnabled()) {
845-
log.debug("DB batch flushed time : {} s",
846-
MathUtils.elapsedNanos(batchFlushStartTime) / (double) TimeUnit.SECONDS.toNanos(1));
840+
long batchFlushStartTime = MathUtils.nowInNano();
841+
batch.flush();
842+
843+
recordSuccessfulEvent(dbLedgerStorageStats.getFlushLocationIndexStats(), batchFlushStartTime);
844+
if (log.isDebugEnabled()) {
845+
log.debug("DB batch flushed time : {} s",
846+
MathUtils.elapsedNanos(batchFlushStartTime) / (double) TimeUnit.SECONDS.toNanos(1));
847+
}
847848
}
848849

849850
long ledgerIndexStartTime = MathUtils.nowInNano();
@@ -1095,20 +1096,20 @@ public long addLedgerToIndex(long ledgerId, boolean isFenced, byte[] masterKey,
10951096
MutableLong numberOfEntries = new MutableLong();
10961097

10971098
// Iterate over all the entries pages
1098-
Batch batch = entryLocationIndex.newBatch();
1099-
for (LedgerCache.PageEntries page: pages) {
1100-
try (LedgerEntryPage lep = page.getLEP()) {
1101-
lep.getEntries((entryId, location) -> {
1102-
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
1103-
numberOfEntries.increment();
1104-
return true;
1105-
});
1099+
try (Batch batch = entryLocationIndex.newBatch()) {
1100+
for (LedgerCache.PageEntries page : pages) {
1101+
try (LedgerEntryPage lep = page.getLEP()) {
1102+
lep.getEntries((entryId, location) -> {
1103+
entryLocationIndex.addLocation(batch, ledgerId, entryId, location);
1104+
numberOfEntries.increment();
1105+
return true;
1106+
});
1107+
}
11061108
}
1107-
}
11081109

1109-
ledgerIndex.flush();
1110-
batch.flush();
1111-
batch.close();
1110+
ledgerIndex.flush();
1111+
batch.flush();
1112+
}
11121113

11131114
return numberOfEntries.longValue();
11141115
}

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/EntryLocationIndexTest.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -93,16 +93,15 @@ public void deleteBatchLedgersTest() throws Exception {
9393
int numEntriesPerLedger = 100;
9494

9595
int location = 0;
96-
KeyValueStorage.Batch batch = idx.newBatch();
97-
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
98-
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
99-
idx.addLocation(batch, ledgerId, entryId, location);
100-
location++;
96+
try (KeyValueStorage.Batch batch = idx.newBatch()) {
97+
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {
98+
for (int ledgerId = 0; ledgerId < numLedgers; ++ledgerId) {
99+
idx.addLocation(batch, ledgerId, entryId, location);
100+
location++;
101+
}
101102
}
103+
batch.flush();
102104
}
103-
batch.flush();
104-
batch.close();
105-
106105

107106
int expectedLocation = 0;
108107
for (int entryId = 0; entryId < numEntriesPerLedger; ++entryId) {

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/KeyValueStorageTest.java

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -156,16 +156,16 @@ public void simple() throws Exception {
156156
assertEquals(null, db.get(toArray(10)));
157157
assertTrue(db.count() > 0);
158158

159-
Batch batch = db.newBatch();
160-
batch.remove(toArray(11));
161-
batch.remove(toArray(12));
162-
batch.remove(toArray(13));
163-
batch.flush();
164-
assertEquals(null, db.get(toArray(11)));
165-
assertEquals(null, db.get(toArray(12)));
166-
assertEquals(null, db.get(toArray(13)));
167-
assertEquals(14L, fromArray(db.get(toArray(14))));
168-
batch.close();
159+
try (Batch batch = db.newBatch()) {
160+
batch.remove(toArray(11));
161+
batch.remove(toArray(12));
162+
batch.remove(toArray(13));
163+
batch.flush();
164+
assertEquals(null, db.get(toArray(11)));
165+
assertEquals(null, db.get(toArray(12)));
166+
assertEquals(null, db.get(toArray(13)));
167+
assertEquals(14L, fromArray(db.get(toArray(14))));
168+
}
169169

170170
db.close();
171171
FileUtils.deleteDirectory(tmpDir);

0 commit comments

Comments
 (0)