diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java index 0628ec28afa..e5520185f3c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/BookieImpl.java @@ -1285,4 +1285,9 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg } } } + + @VisibleForTesting + public List getJournals() { + return this.journals; + } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java index 99eaa238191..61ed65cc194 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java @@ -226,7 +226,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException { * The last mark should first be max journal log id, * and then max log position in max journal log. */ - void readLog() { + public void readLog() { byte[] buff = new byte[16]; ByteBuffer bb = ByteBuffer.wrap(buff); LogMark mark = new LogMark(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java index f9553c5fe64..a44fc2319c7 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/SingleDirectoryDbLedgerStorage.java @@ -143,6 +143,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage private final long maxReadAheadBytesSize; private final Counter flushExecutorTime; + private final boolean singleLedgerDirs; public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager, LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager, @@ -172,6 +173,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le this.writeCacheMaxSize = writeCacheSize; this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2); this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2); + this.singleLedgerDirs = conf.getLedgerDirs().length == 1; readCacheMaxSize = readCacheSize; this.readAheadCacheBatchSize = readAheadCacheBatchSize; @@ -905,7 +907,9 @@ private void swapWriteCache() { public void flush() throws IOException { Checkpoint cp = checkpointSource.newCheckpoint(); checkpoint(cp); - checkpointSource.checkpointComplete(cp, true); + if (singleLedgerDirs) { + checkpointSource.checkpointComplete(cp, true); + } } @Override diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java index 4efdf06edb2..7a5d498935b 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorageTest.java @@ -31,16 +31,21 @@ import io.netty.buffer.Unpooled; import io.netty.util.ReferenceCountUtil; import java.io.File; +import java.io.FileInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import org.apache.bookkeeper.bookie.Bookie; import org.apache.bookkeeper.bookie.Bookie.NoEntryException; import org.apache.bookkeeper.bookie.BookieException; import org.apache.bookkeeper.bookie.BookieImpl; +import org.apache.bookkeeper.bookie.CheckpointSource; +import org.apache.bookkeeper.bookie.CheckpointSourceList; import org.apache.bookkeeper.bookie.DefaultEntryLogger; import org.apache.bookkeeper.bookie.EntryLocation; import org.apache.bookkeeper.bookie.LedgerDirsManager; import org.apache.bookkeeper.bookie.LedgerStorage; +import org.apache.bookkeeper.bookie.LogMark; import org.apache.bookkeeper.bookie.TestBookieImpl; import org.apache.bookkeeper.bookie.storage.EntryLogger; import org.apache.bookkeeper.conf.ServerConfiguration; @@ -637,4 +642,182 @@ public void testStorageStateFlags() throws Exception { storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage(); } + + @Test + public void testMultiLedgerDirectoryCheckpoint() throws Exception { + int gcWaitTime = 1000; + File firstDir = new File(tmpDir, "dir1"); + File secondDir = new File(tmpDir, "dir2"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() }); + + BookieImpl bookie = new TestBookieImpl(conf); + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(2); // entry id + entry1.writeBytes("entry-1".getBytes()); + + bookie.getLedgerStorage().addEntry(entry1); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); + + File firstDirMark = new File(firstDir + "/current", "lastMark"); + File secondDirMark = new File(secondDir + "/current", "lastMark"); + + // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // write the second entry to second leger directory and flush with log(4, 5), + // the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5); + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(2); // ledger id + entry2.writeLong(1); // entry id + entry2.writeBytes("entry-2".getBytes()); + + bookie.getLedgerStorage().addEntry(entry2); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush(); + + // LedgerStorage flush won't trigger lastMark update due to two ledger directories configured + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured. + bookie.getLedgerStorage().flush(); + try { + readLogMark(firstDirMark); + readLogMark(secondDirMark); + fail(); + } catch (Exception e) { + // + } + + // trigger checkpoint simulate SyncThread do checkpoint. + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8); + CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint(); + checkpointSource.checkpointComplete(checkpoint, false); + + try { + LogMark firstLogMark = readLogMark(firstDirMark); + LogMark secondLogMark = readLogMark(secondDirMark); + assertEquals(7, firstLogMark.getLogFileId()); + assertEquals(8, firstLogMark.getLogFileOffset()); + assertEquals(7, secondLogMark.getLogFileId()); + assertEquals(8, secondLogMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + // test replay journal lastMark, to make sure we get the right LastMark position + bookie.getJournals().get(0).getLastLogMark().readLog(); + LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark(); + assertEquals(7, logMark.getLogFileId()); + assertEquals(8, logMark.getLogFileOffset()); + } + + private LogMark readLogMark(File file) throws IOException { + byte[] buff = new byte[16]; + ByteBuffer bb = ByteBuffer.wrap(buff); + LogMark mark = new LogMark(); + try (FileInputStream fis = new FileInputStream(file)) { + int bytesRead = fis.read(buff); + if (bytesRead != 16) { + throw new IOException("Couldn't read enough bytes from lastMark." + + " Wanted " + 16 + ", got " + bytesRead); + } + } + bb.clear(); + mark.readLogMark(bb); + + return mark; + } + + @Test + public void testSingleLedgerDirectoryCheckpoint() throws Exception { + int gcWaitTime = 1000; + File ledgerDir = new File(tmpDir, "dir"); + ServerConfiguration conf = TestBKConfiguration.newServerConfiguration(); + conf.setGcWaitTime(gcWaitTime); + conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4); + conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4); + conf.setLedgerStorageClass(DbLedgerStorage.class.getName()); + conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() }); + + BookieImpl bookie = new TestBookieImpl(conf); + ByteBuf entry1 = Unpooled.buffer(1024); + entry1.writeLong(1); // ledger id + entry1.writeLong(2); // entry id + entry1.writeBytes("entry-1".getBytes()); + bookie.getLedgerStorage().addEntry(entry1); + + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2); + ((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush(); + + File ledgerDirMark = new File(ledgerDir + "/current", "lastMark"); + try { + LogMark logMark = readLogMark(ledgerDirMark); + assertEquals(1, logMark.getLogFileId()); + assertEquals(2, logMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + ByteBuf entry2 = Unpooled.buffer(1024); + entry2.writeLong(2); // ledger id + entry2.writeLong(1); // entry id + entry2.writeBytes("entry-2".getBytes()); + + bookie.getLedgerStorage().addEntry(entry2); + // write one entry to first ledger directory and flush with logMark(1, 2), + // only the first ledger directory should have lastMark + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5); + + bookie.getLedgerStorage().flush(); + try { + LogMark logMark = readLogMark(ledgerDirMark); + assertEquals(4, logMark.getLogFileId()); + assertEquals(5, logMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals()); + bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8); + CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint(); + checkpointSource.checkpointComplete(checkpoint, false); + + try { + LogMark firstLogMark = readLogMark(ledgerDirMark); + assertEquals(7, firstLogMark.getLogFileId()); + assertEquals(8, firstLogMark.getLogFileOffset()); + } catch (Exception e) { + fail(); + } + + // test replay journal lastMark, to make sure we get the right LastMark position + bookie.getJournals().get(0).getLastLogMark().readLog(); + LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark(); + assertEquals(7, logMark.getLogFileId()); + assertEquals(8, logMark.getLogFileOffset()); + } }