Skip to content

Commit 60d56e6

Browse files
committed
Fix data lost when configured multiple ledger directories (apache#3329)
(cherry picked from commit 8a76703)
1 parent 51adea5 commit 60d56e6

File tree

5 files changed

+196
-3
lines changed

5 files changed

+196
-3
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1714,4 +1714,9 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
17141714
}
17151715
}
17161716
}
1717+
1718+
@VisibleForTesting
1719+
public List<Journal> getJournals() {
1720+
return this.journals;
1721+
}
17171722
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
229229
* The last mark should first be max journal log id,
230230
* and then max log position in max journal log.
231231
*/
232-
void readLog() {
232+
public void readLog() {
233233
byte[] buff = new byte[16];
234234
ByteBuffer bb = ByteBuffer.wrap(buff);
235235
LogMark mark = new LogMark();

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
137137

138138
private final long maxReadAheadBytesSize;
139139

140+
private final boolean singleLedgerDirs;
141+
140142
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
141143
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StateManager stateManager,
142144
CheckpointSource checkpointSource, Checkpointer checkpointer, StatsLogger statsLogger,
@@ -152,6 +154,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
152154
this.writeCacheMaxSize = writeCacheSize;
153155
this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
154156
this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
157+
this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
155158

156159
this.checkpointSource = checkpointSource;
157160

@@ -717,7 +720,9 @@ private void swapWriteCache() {
717720
public void flush() throws IOException {
718721
Checkpoint cp = checkpointSource.newCheckpoint();
719722
checkpoint(cp);
720-
checkpointSource.checkpointComplete(cp, true);
723+
if (singleLedgerDirs) {
724+
checkpointSource.checkpointComplete(cp, true);
725+
}
721726
}
722727

723728
@Override

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,20 @@
3131
import io.netty.buffer.Unpooled;
3232

3333
import java.io.File;
34+
import java.io.FileInputStream;
3435
import java.io.IOException;
36+
import java.nio.ByteBuffer;
3537
import java.util.List;
3638

3739
import org.apache.bookkeeper.bookie.Bookie;
3840
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
3941
import org.apache.bookkeeper.bookie.BookieException;
42+
import org.apache.bookkeeper.bookie.CheckpointSource;
43+
import org.apache.bookkeeper.bookie.CheckpointSourceList;
4044
import org.apache.bookkeeper.bookie.EntryLocation;
4145
import org.apache.bookkeeper.bookie.EntryLogger;
4246
import org.apache.bookkeeper.bookie.LedgerDirsManager;
47+
import org.apache.bookkeeper.bookie.LogMark;
4348
import org.apache.bookkeeper.conf.ServerConfiguration;
4449
import org.apache.bookkeeper.conf.TestBKConfiguration;
4550
import org.apache.bookkeeper.proto.BookieProtocol;
@@ -445,4 +450,182 @@ public void testGetLedgerDirsListeners() throws IOException {
445450
// and another is EntryLogManagerForEntryLogPerLedger
446451
assertEquals(2, ledgerDirsManager.getListeners().size());
447452
}
453+
454+
@Test
455+
public void testMultiLedgerDirectoryCheckpoint() throws Exception {
456+
int gcWaitTime = 1000;
457+
File firstDir = new File(tmpDir, "dir1");
458+
File secondDir = new File(tmpDir, "dir2");
459+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
460+
conf.setGcWaitTime(gcWaitTime);
461+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
462+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
463+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
464+
conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });
465+
466+
Bookie bookie = new Bookie(conf);
467+
ByteBuf entry1 = Unpooled.buffer(1024);
468+
entry1.writeLong(1); // ledger id
469+
entry1.writeLong(2); // entry id
470+
entry1.writeBytes("entry-1".getBytes());
471+
472+
bookie.getLedgerStorage().addEntry(entry1);
473+
// write one entry to first ledger directory and flush with logMark(1, 2),
474+
// only the first ledger directory should have lastMark
475+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
476+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
477+
478+
File firstDirMark = new File(firstDir + "/current", "lastMark");
479+
File secondDirMark = new File(secondDir + "/current", "lastMark");
480+
481+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
482+
try {
483+
readLogMark(firstDirMark);
484+
readLogMark(secondDirMark);
485+
fail();
486+
} catch (Exception e) {
487+
//
488+
}
489+
490+
// write the second entry to second leger directory and flush with log(4, 5),
491+
// the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
492+
ByteBuf entry2 = Unpooled.buffer(1024);
493+
entry2.writeLong(2); // ledger id
494+
entry2.writeLong(1); // entry id
495+
entry2.writeBytes("entry-2".getBytes());
496+
497+
bookie.getLedgerStorage().addEntry(entry2);
498+
// write one entry to first ledger directory and flush with logMark(1, 2),
499+
// only the first ledger directory should have lastMark
500+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
501+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
502+
503+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
504+
try {
505+
readLogMark(firstDirMark);
506+
readLogMark(secondDirMark);
507+
fail();
508+
} catch (Exception e) {
509+
//
510+
}
511+
512+
// The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
513+
bookie.getLedgerStorage().flush();
514+
try {
515+
readLogMark(firstDirMark);
516+
readLogMark(secondDirMark);
517+
fail();
518+
} catch (Exception e) {
519+
//
520+
}
521+
522+
// trigger checkpoint simulate SyncThread do checkpoint.
523+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
524+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
525+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
526+
checkpointSource.checkpointComplete(checkpoint, false);
527+
528+
try {
529+
LogMark firstLogMark = readLogMark(firstDirMark);
530+
LogMark secondLogMark = readLogMark(secondDirMark);
531+
assertEquals(7, firstLogMark.getLogFileId());
532+
assertEquals(8, firstLogMark.getLogFileOffset());
533+
assertEquals(7, secondLogMark.getLogFileId());
534+
assertEquals(8, secondLogMark.getLogFileOffset());
535+
} catch (Exception e) {
536+
fail();
537+
}
538+
539+
// test replay journal lastMark, to make sure we get the right LastMark position
540+
bookie.getJournals().get(0).getLastLogMark().readLog();
541+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
542+
assertEquals(7, logMark.getLogFileId());
543+
assertEquals(8, logMark.getLogFileOffset());
544+
}
545+
546+
private LogMark readLogMark(File file) throws IOException {
547+
byte[] buff = new byte[16];
548+
ByteBuffer bb = ByteBuffer.wrap(buff);
549+
LogMark mark = new LogMark();
550+
try (FileInputStream fis = new FileInputStream(file)) {
551+
int bytesRead = fis.read(buff);
552+
if (bytesRead != 16) {
553+
throw new IOException("Couldn't read enough bytes from lastMark."
554+
+ " Wanted " + 16 + ", got " + bytesRead);
555+
}
556+
}
557+
bb.clear();
558+
mark.readLogMark(bb);
559+
560+
return mark;
561+
}
562+
563+
@Test
564+
public void testSingleLedgerDirectoryCheckpoint() throws Exception {
565+
int gcWaitTime = 1000;
566+
File ledgerDir = new File(tmpDir, "dir");
567+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
568+
conf.setGcWaitTime(gcWaitTime);
569+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
570+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
571+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
572+
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
573+
574+
Bookie bookie = new Bookie(conf);
575+
ByteBuf entry1 = Unpooled.buffer(1024);
576+
entry1.writeLong(1); // ledger id
577+
entry1.writeLong(2); // entry id
578+
entry1.writeBytes("entry-1".getBytes());
579+
bookie.getLedgerStorage().addEntry(entry1);
580+
581+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
582+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
583+
584+
File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
585+
try {
586+
LogMark logMark = readLogMark(ledgerDirMark);
587+
assertEquals(1, logMark.getLogFileId());
588+
assertEquals(2, logMark.getLogFileOffset());
589+
} catch (Exception e) {
590+
fail();
591+
}
592+
593+
ByteBuf entry2 = Unpooled.buffer(1024);
594+
entry2.writeLong(2); // ledger id
595+
entry2.writeLong(1); // entry id
596+
entry2.writeBytes("entry-2".getBytes());
597+
598+
bookie.getLedgerStorage().addEntry(entry2);
599+
// write one entry to first ledger directory and flush with logMark(1, 2),
600+
// only the first ledger directory should have lastMark
601+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
602+
603+
bookie.getLedgerStorage().flush();
604+
try {
605+
LogMark logMark = readLogMark(ledgerDirMark);
606+
assertEquals(4, logMark.getLogFileId());
607+
assertEquals(5, logMark.getLogFileOffset());
608+
} catch (Exception e) {
609+
fail();
610+
}
611+
612+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
613+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
614+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
615+
checkpointSource.checkpointComplete(checkpoint, false);
616+
617+
try {
618+
LogMark firstLogMark = readLogMark(ledgerDirMark);
619+
assertEquals(7, firstLogMark.getLogFileId());
620+
assertEquals(8, firstLogMark.getLogFileOffset());
621+
} catch (Exception e) {
622+
fail();
623+
}
624+
625+
// test replay journal lastMark, to make sure we get the right LastMark position
626+
bookie.getJournals().get(0).getLastLogMark().readLog();
627+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
628+
assertEquals(7, logMark.getLogFileId());
629+
assertEquals(8, logMark.getLogFileOffset());
630+
}
448631
}

tests/backward-compat/hostname-bookieid/src/test/groovy/org/apache/bookkeeper/tests/backwardcompat/TestCompatUpgradeWithHostnameBookieId.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Licensed to the Apache Software Foundation (ASF) under one
33
* or more contributor license agreements. See the NOTICE file
44
* distributed with this work for additional information

0 commit comments

Comments
 (0)