Skip to content

Commit ae6b454

Browse files
hangc0276zymap
authored andcommitted
Fix data lost when configured multiple ledger directories (#3329)
(cherry picked from commit 8a76703)
1 parent c3a9943 commit ae6b454

File tree

4 files changed

+194
-2
lines changed

4 files changed

+194
-2
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1246,4 +1246,9 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
12461246
}
12471247
}
12481248
}
1249+
1250+
@VisibleForTesting
1251+
public List<Journal> getJournals() {
1252+
return this.journals;
1253+
}
12491254
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
229229
* The last mark should first be max journal log id,
230230
* and then max log position in max journal log.
231231
*/
232-
void readLog() {
232+
public void readLog() {
233233
byte[] buff = new byte[16];
234234
ByteBuffer bb = ByteBuffer.wrap(buff);
235235
LogMark mark = new LogMark();

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
143143
private final long maxReadAheadBytesSize;
144144

145145
private final Counter flushExecutorTime;
146+
private final boolean singleLedgerDirs;
146147

147148
public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
148149
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, StatsLogger statsLogger,
@@ -160,6 +161,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
160161
this.writeCacheMaxSize = writeCacheSize;
161162
this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
162163
this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
164+
this.singleLedgerDirs = conf.getLedgerDirs().length == 1;
163165

164166
readCacheMaxSize = readCacheSize;
165167
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
@@ -821,7 +823,9 @@ private void swapWriteCache() {
821823
public void flush() throws IOException {
822824
Checkpoint cp = checkpointSource.newCheckpoint();
823825
checkpoint(cp);
824-
checkpointSource.checkpointComplete(cp, true);
826+
if (singleLedgerDirs) {
827+
checkpointSource.checkpointComplete(cp, true);
828+
}
825829
}
826830

827831
@Override

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java

Lines changed: 183 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,21 @@
2929
import io.netty.buffer.ByteBufUtil;
3030
import io.netty.buffer.Unpooled;
3131
import java.io.File;
32+
import java.io.FileInputStream;
3233
import java.io.IOException;
34+
import java.nio.ByteBuffer;
3335
import java.util.List;
3436
import org.apache.bookkeeper.bookie.Bookie;
3537
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
3638
import org.apache.bookkeeper.bookie.BookieException;
3739
import org.apache.bookkeeper.bookie.BookieImpl;
40+
import org.apache.bookkeeper.bookie.CheckpointSource;
41+
import org.apache.bookkeeper.bookie.CheckpointSourceList;
3842
import org.apache.bookkeeper.bookie.EntryLocation;
3943
import org.apache.bookkeeper.bookie.EntryLogger;
4044
import org.apache.bookkeeper.bookie.LedgerDirsManager;
4145
import org.apache.bookkeeper.bookie.LedgerStorage;
46+
import org.apache.bookkeeper.bookie.LogMark;
4247
import org.apache.bookkeeper.bookie.TestBookieImpl;
4348
import org.apache.bookkeeper.conf.ServerConfiguration;
4449
import org.apache.bookkeeper.conf.TestBKConfiguration;
@@ -630,4 +635,182 @@ public void testStorageStateFlags() throws Exception {
630635

631636
storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage();
632637
}
638+
639+
@Test
640+
public void testMultiLedgerDirectoryCheckpoint() throws Exception {
641+
int gcWaitTime = 1000;
642+
File firstDir = new File(tmpDir, "dir1");
643+
File secondDir = new File(tmpDir, "dir2");
644+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
645+
conf.setGcWaitTime(gcWaitTime);
646+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
647+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
648+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
649+
conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });
650+
651+
BookieImpl bookie = new TestBookieImpl(conf);
652+
ByteBuf entry1 = Unpooled.buffer(1024);
653+
entry1.writeLong(1); // ledger id
654+
entry1.writeLong(2); // entry id
655+
entry1.writeBytes("entry-1".getBytes());
656+
657+
bookie.getLedgerStorage().addEntry(entry1);
658+
// write one entry to first ledger directory and flush with logMark(1, 2),
659+
// only the first ledger directory should have lastMark
660+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
661+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
662+
663+
File firstDirMark = new File(firstDir + "/current", "lastMark");
664+
File secondDirMark = new File(secondDir + "/current", "lastMark");
665+
666+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
667+
try {
668+
readLogMark(firstDirMark);
669+
readLogMark(secondDirMark);
670+
fail();
671+
} catch (Exception e) {
672+
//
673+
}
674+
675+
// write the second entry to second leger directory and flush with log(4, 5),
676+
// the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
677+
ByteBuf entry2 = Unpooled.buffer(1024);
678+
entry2.writeLong(2); // ledger id
679+
entry2.writeLong(1); // entry id
680+
entry2.writeBytes("entry-2".getBytes());
681+
682+
bookie.getLedgerStorage().addEntry(entry2);
683+
// write one entry to first ledger directory and flush with logMark(1, 2),
684+
// only the first ledger directory should have lastMark
685+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
686+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();
687+
688+
// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
689+
try {
690+
readLogMark(firstDirMark);
691+
readLogMark(secondDirMark);
692+
fail();
693+
} catch (Exception e) {
694+
//
695+
}
696+
697+
// The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
698+
bookie.getLedgerStorage().flush();
699+
try {
700+
readLogMark(firstDirMark);
701+
readLogMark(secondDirMark);
702+
fail();
703+
} catch (Exception e) {
704+
//
705+
}
706+
707+
// trigger checkpoint simulate SyncThread do checkpoint.
708+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
709+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
710+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
711+
checkpointSource.checkpointComplete(checkpoint, false);
712+
713+
try {
714+
LogMark firstLogMark = readLogMark(firstDirMark);
715+
LogMark secondLogMark = readLogMark(secondDirMark);
716+
assertEquals(7, firstLogMark.getLogFileId());
717+
assertEquals(8, firstLogMark.getLogFileOffset());
718+
assertEquals(7, secondLogMark.getLogFileId());
719+
assertEquals(8, secondLogMark.getLogFileOffset());
720+
} catch (Exception e) {
721+
fail();
722+
}
723+
724+
// test replay journal lastMark, to make sure we get the right LastMark position
725+
bookie.getJournals().get(0).getLastLogMark().readLog();
726+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
727+
assertEquals(7, logMark.getLogFileId());
728+
assertEquals(8, logMark.getLogFileOffset());
729+
}
730+
731+
private LogMark readLogMark(File file) throws IOException {
732+
byte[] buff = new byte[16];
733+
ByteBuffer bb = ByteBuffer.wrap(buff);
734+
LogMark mark = new LogMark();
735+
try (FileInputStream fis = new FileInputStream(file)) {
736+
int bytesRead = fis.read(buff);
737+
if (bytesRead != 16) {
738+
throw new IOException("Couldn't read enough bytes from lastMark."
739+
+ " Wanted " + 16 + ", got " + bytesRead);
740+
}
741+
}
742+
bb.clear();
743+
mark.readLogMark(bb);
744+
745+
return mark;
746+
}
747+
748+
@Test
749+
public void testSingleLedgerDirectoryCheckpoint() throws Exception {
750+
int gcWaitTime = 1000;
751+
File ledgerDir = new File(tmpDir, "dir");
752+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
753+
conf.setGcWaitTime(gcWaitTime);
754+
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
755+
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
756+
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
757+
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });
758+
759+
BookieImpl bookie = new TestBookieImpl(conf);
760+
ByteBuf entry1 = Unpooled.buffer(1024);
761+
entry1.writeLong(1); // ledger id
762+
entry1.writeLong(2); // entry id
763+
entry1.writeBytes("entry-1".getBytes());
764+
bookie.getLedgerStorage().addEntry(entry1);
765+
766+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
767+
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();
768+
769+
File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
770+
try {
771+
LogMark logMark = readLogMark(ledgerDirMark);
772+
assertEquals(1, logMark.getLogFileId());
773+
assertEquals(2, logMark.getLogFileOffset());
774+
} catch (Exception e) {
775+
fail();
776+
}
777+
778+
ByteBuf entry2 = Unpooled.buffer(1024);
779+
entry2.writeLong(2); // ledger id
780+
entry2.writeLong(1); // entry id
781+
entry2.writeBytes("entry-2".getBytes());
782+
783+
bookie.getLedgerStorage().addEntry(entry2);
784+
// write one entry to first ledger directory and flush with logMark(1, 2),
785+
// only the first ledger directory should have lastMark
786+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
787+
788+
bookie.getLedgerStorage().flush();
789+
try {
790+
LogMark logMark = readLogMark(ledgerDirMark);
791+
assertEquals(4, logMark.getLogFileId());
792+
assertEquals(5, logMark.getLogFileOffset());
793+
} catch (Exception e) {
794+
fail();
795+
}
796+
797+
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
798+
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
799+
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
800+
checkpointSource.checkpointComplete(checkpoint, false);
801+
802+
try {
803+
LogMark firstLogMark = readLogMark(ledgerDirMark);
804+
assertEquals(7, firstLogMark.getLogFileId());
805+
assertEquals(8, firstLogMark.getLogFileOffset());
806+
} catch (Exception e) {
807+
fail();
808+
}
809+
810+
// test replay journal lastMark, to make sure we get the right LastMark position
811+
bookie.getJournals().get(0).getLastLogMark().readLog();
812+
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
813+
assertEquals(7, logMark.getLogFileId());
814+
assertEquals(8, logMark.getLogFileOffset());
815+
}
633816
}

0 commit comments

Comments
 (0)