Skip to content

Commit 25c7506

Browse files
Samuel Justsijie
authored andcommitted
BOOKKEEPER-1919: putEntryOffset translate FileInfoDeletedException
IndexInMemPageMgr should translate FileInfoDeletedException into NoLedgerException as expected by users like InterleavedLedgerStorage.updateEntriesLocations and EntryMemTable.flushSnapshot. Signed-off-by: Samuel Just <sjustsalesforce.com> Reviewers: Enrico Olivelli <eolivelli@gmail.com>, Sijie Guo <sijie@apache.org> This closes #1950 from athanatos/forupstream/wip-1919
1 parent 0f55253 commit 25c7506

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/IndexInMemPageMgr.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,8 @@ void putEntryOffset(long ledger, long entry, long offset) throws IOException {
571571
lep = getLedgerEntryPage(ledger, pageEntry);
572572
assert lep != null;
573573
lep.setOffset(offset, offsetInPage * LedgerEntryPage.getIndexEntrySize());
574+
} catch (FileInfo.FileInfoDeletedException e) {
575+
throw new Bookie.NoLedgerException(ledger);
574576
} finally {
575577
if (null != lep) {
576578
lep.releasePage();

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/LedgerCacheTest.java

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,128 @@ public void testSyncThreadNPE() throws IOException {
379379
}
380380
}
381381

382+
383+
/**
384+
* Test for race between putEntryOffset and flush.
385+
* {@link https://github.com/apache/bookkeeper/issues/1919}
386+
*/
387+
@Test
388+
public void testPutEntryOffsetDeleteRace() throws Exception {
389+
newLedgerCache();
390+
final AtomicInteger rc = new AtomicInteger(0);
391+
final LinkedBlockingQueue<Long> putQ = new LinkedBlockingQueue<>(100);
392+
final LinkedBlockingQueue<Long> deleteQ = new LinkedBlockingQueue<>(100);
393+
final byte[] masterKey = "masterKey".getBytes();
394+
final long numLedgers = 1000;
395+
final int numPutters = 10;
396+
final int numDeleters = 10;
397+
final AtomicBoolean running = new AtomicBoolean(true);
398+
Thread newLedgerThread = new Thread() {
399+
public void run() {
400+
try {
401+
for (long i = 0; i < numLedgers && rc.get() == 0; i++) {
402+
ledgerCache.setMasterKey(i, masterKey);
403+
404+
ledgerCache.putEntryOffset(i, 1, 0);
405+
deleteQ.put(i);
406+
putQ.put(i);
407+
}
408+
for (int i = 0; i < numPutters; ++i) {
409+
putQ.put(-1L);
410+
}
411+
for (int i = 0; i < numDeleters; ++i) {
412+
deleteQ.put(-1L);
413+
}
414+
} catch (Throwable e) {
415+
rc.set(-1);
416+
LOG.error("Exception in new ledger thread", e);
417+
}
418+
}
419+
};
420+
newLedgerThread.start();
421+
422+
Thread[] flushThreads = new Thread[numPutters];
423+
for (int i = 0; i < numPutters; ++i) {
424+
Thread flushThread = new Thread() {
425+
public void run() {
426+
try {
427+
while (true) {
428+
long id = putQ.take();
429+
if (id == -1L) {
430+
break;
431+
}
432+
LOG.info("Putting {}", id);
433+
try {
434+
ledgerCache.putEntryOffset(id, 2, 0);
435+
ledgerCache.deleteLedger(id);
436+
} catch (NoLedgerException e) {
437+
// No problem
438+
}
439+
}
440+
} catch (Throwable e) {
441+
rc.set(-1);
442+
LOG.error("Exception in put thread", e);
443+
}
444+
}
445+
};
446+
flushThread.start();
447+
flushThreads[i] = flushThread;
448+
}
449+
450+
Thread[] deleteThreads = new Thread[numDeleters];
451+
for (int i = 0; i < numDeleters; ++i) {
452+
Thread deleteThread = new Thread() {
453+
public void run() {
454+
try {
455+
while (true) {
456+
long id = deleteQ.take();
457+
if (id == -1L) {
458+
break;
459+
}
460+
LOG.info("Deleting {}", id);
461+
try {
462+
ledgerCache.deleteLedger(id);
463+
} catch (NoLedgerException e) {
464+
// No problem
465+
}
466+
}
467+
} catch (Throwable e) {
468+
rc.set(-1);
469+
LOG.error("Exception in delete thread", e);
470+
}
471+
}
472+
};
473+
deleteThread.start();
474+
deleteThreads[i] = deleteThread;
475+
}
476+
477+
newLedgerThread.join();
478+
479+
for (Thread deleteThread : deleteThreads) {
480+
deleteThread.join();
481+
}
482+
483+
running.set(false);
484+
for (Thread flushThread : flushThreads) {
485+
flushThread.join();
486+
}
487+
488+
assertEquals("Should have been no errors", rc.get(), 0);
489+
for (long i = 0L; i < numLedgers; ++i) {
490+
boolean gotError = false;
491+
try {
492+
LOG.error("Checking {}", i);
493+
ledgerCache.getEntryOffset(i, 0);
494+
} catch (NoLedgerException e) {
495+
gotError = true;
496+
}
497+
if (!gotError) {
498+
LOG.error("Ledger {} is still around", i);
499+
fail("Found ledger " + i + ", which should have been removed");
500+
}
501+
}
502+
}
503+
382504
/**
383505
* Test for race between delete and flush.
384506
* {@link https://issues.apache.org/jira/browse/BOOKKEEPER-604}

0 commit comments

Comments
 (0)