Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -861,9 +861,6 @@ int shutdown(int exitCode) {
LOG.info("Turning bookie to read only during shut down");
stateManager.forceToReadOnly();

// Shutdown Sync thread
syncThread.shutdown();

// Shutdown journals
for (Journal journal : journals) {
journal.shutdown();
Expand All @@ -872,6 +869,9 @@ int shutdown(int exitCode) {
// Shutdown the EntryLogger which has the GarbageCollector Thread running
ledgerStorage.shutdown();

// Shutdown Sync thread
syncThread.shutdown();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here may through exceptions when shutdown the syncThread which will call checkpoint of ledgerStoreage, since we have already shut down the ledgeerStorage before

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. I updated the code, please help take a look, thanks.


//Shutdown disk checker
dirsMonitor.shutdown();
}
Expand Down Expand Up @@ -1285,4 +1285,9 @@ public OfLong getListOfEntriesOfLedger(long ledgerId) throws IOException, NoLedg
}
}
}

@VisibleForTesting
public List<Journal> getJournals() {
return this.journals;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ void rollLog(LastLogMark lastMark) throws NoWritableLedgerDirException {
* The last mark should first be max journal log id,
* and then max log position in max journal log.
*/
void readLog() {
public void readLog() {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class SingleDirectoryDbLedgerStorage implements CompactableLedgerStorage
private final long maxReadAheadBytesSize;

private final Counter flushExecutorTime;
private final boolean singleLedgerDirs;

public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager ledgerManager,
LedgerDirsManager ledgerDirsManager, LedgerDirsManager indexDirsManager,
Expand Down Expand Up @@ -172,6 +173,7 @@ public SingleDirectoryDbLedgerStorage(ServerConfiguration conf, LedgerManager le
this.writeCacheMaxSize = writeCacheSize;
this.writeCache = new WriteCache(allocator, writeCacheMaxSize / 2);
this.writeCacheBeingFlushed = new WriteCache(allocator, writeCacheMaxSize / 2);
this.singleLedgerDirs = conf.getLedgerDirs().length == 1;

readCacheMaxSize = readCacheSize;
this.readAheadCacheBatchSize = readAheadCacheBatchSize;
Expand Down Expand Up @@ -905,7 +907,9 @@ private void swapWriteCache() {
public void flush() throws IOException {
Checkpoint cp = checkpointSource.newCheckpoint();
checkpoint(cp);
checkpointSource.checkpointComplete(cp, true);
if (singleLedgerDirs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a small comment with a quick description of the motivation for this condition

checkpointSource.checkpointComplete(cp, true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,21 @@
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.Bookie.NoEntryException;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.BookieImpl;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.CheckpointSourceList;
import org.apache.bookkeeper.bookie.DefaultEntryLogger;
import org.apache.bookkeeper.bookie.EntryLocation;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.LogMark;
import org.apache.bookkeeper.bookie.TestBookieImpl;
import org.apache.bookkeeper.bookie.storage.EntryLogger;
import org.apache.bookkeeper.conf.ServerConfiguration;
Expand Down Expand Up @@ -637,4 +642,182 @@ public void testStorageStateFlags() throws Exception {

storage = (DbLedgerStorage) new TestBookieImpl(conf).getLedgerStorage();
}

@Test
public void testMultiLedgerDirectoryCheckpoint() throws Exception {
int gcWaitTime = 1000;
File firstDir = new File(tmpDir, "dir1");
File secondDir = new File(tmpDir, "dir2");
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { firstDir.getCanonicalPath(), secondDir.getCanonicalPath() });

BookieImpl bookie = new TestBookieImpl(conf);
ByteBuf entry1 = Unpooled.buffer(1024);
entry1.writeLong(1); // ledger id
entry1.writeLong(2); // entry id
entry1.writeBytes("entry-1".getBytes());

bookie.getLedgerStorage().addEntry(entry1);
// write one entry to first ledger directory and flush with logMark(1, 2),
// only the first ledger directory should have lastMark
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();

File firstDirMark = new File(firstDir + "/current", "lastMark");
File secondDirMark = new File(secondDir + "/current", "lastMark");

// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
try {
readLogMark(firstDirMark);
readLogMark(secondDirMark);
fail();
} catch (Exception e) {
//
}

// write the second entry to second leger directory and flush with log(4, 5),
// the fist ledger directory's lastMark is (1, 2) and the second ledger directory's lastMark is (4, 5);
ByteBuf entry2 = Unpooled.buffer(1024);
entry2.writeLong(2); // ledger id
entry2.writeLong(1); // entry id
entry2.writeBytes("entry-2".getBytes());

bookie.getLedgerStorage().addEntry(entry2);
// write one entry to first ledger directory and flush with logMark(1, 2),
// only the first ledger directory should have lastMark
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(1).flush();

// LedgerStorage flush won't trigger lastMark update due to two ledger directories configured
try {
readLogMark(firstDirMark);
readLogMark(secondDirMark);
fail();
} catch (Exception e) {
//
}

// The dbLedgerStorage flush also won't trigger lastMark update due to two ledger directories configured.
bookie.getLedgerStorage().flush();
try {
readLogMark(firstDirMark);
readLogMark(secondDirMark);
fail();
} catch (Exception e) {
//
}

// trigger checkpoint simulate SyncThread do checkpoint.
CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
checkpointSource.checkpointComplete(checkpoint, false);

try {
LogMark firstLogMark = readLogMark(firstDirMark);
LogMark secondLogMark = readLogMark(secondDirMark);
assertEquals(7, firstLogMark.getLogFileId());
assertEquals(8, firstLogMark.getLogFileOffset());
assertEquals(7, secondLogMark.getLogFileId());
assertEquals(8, secondLogMark.getLogFileOffset());
} catch (Exception e) {
fail();
}

// test replay journal lastMark, to make sure we get the right LastMark position
bookie.getJournals().get(0).getLastLogMark().readLog();
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
assertEquals(7, logMark.getLogFileId());
assertEquals(8, logMark.getLogFileOffset());
}

private LogMark readLogMark(File file) throws IOException {
byte[] buff = new byte[16];
ByteBuffer bb = ByteBuffer.wrap(buff);
LogMark mark = new LogMark();
try (FileInputStream fis = new FileInputStream(file)) {
int bytesRead = fis.read(buff);
if (bytesRead != 16) {
throw new IOException("Couldn't read enough bytes from lastMark."
+ " Wanted " + 16 + ", got " + bytesRead);
}
}
bb.clear();
mark.readLogMark(bb);

return mark;
}

@Test
public void testSingleLedgerDirectoryCheckpoint() throws Exception {
int gcWaitTime = 1000;
File ledgerDir = new File(tmpDir, "dir");
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setGcWaitTime(gcWaitTime);
conf.setProperty(DbLedgerStorage.WRITE_CACHE_MAX_SIZE_MB, 4);
conf.setProperty(DbLedgerStorage.READ_AHEAD_CACHE_MAX_SIZE_MB, 4);
conf.setLedgerStorageClass(DbLedgerStorage.class.getName());
conf.setLedgerDirNames(new String[] { ledgerDir.getCanonicalPath() });

BookieImpl bookie = new TestBookieImpl(conf);
ByteBuf entry1 = Unpooled.buffer(1024);
entry1.writeLong(1); // ledger id
entry1.writeLong(2); // entry id
entry1.writeBytes("entry-1".getBytes());
bookie.getLedgerStorage().addEntry(entry1);

bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(1, 2);
((DbLedgerStorage) bookie.getLedgerStorage()).getLedgerStorageList().get(0).flush();

File ledgerDirMark = new File(ledgerDir + "/current", "lastMark");
try {
LogMark logMark = readLogMark(ledgerDirMark);
assertEquals(1, logMark.getLogFileId());
assertEquals(2, logMark.getLogFileOffset());
} catch (Exception e) {
fail();
}

ByteBuf entry2 = Unpooled.buffer(1024);
entry2.writeLong(2); // ledger id
entry2.writeLong(1); // entry id
entry2.writeBytes("entry-2".getBytes());

bookie.getLedgerStorage().addEntry(entry2);
// write one entry to first ledger directory and flush with logMark(1, 2),
// only the first ledger directory should have lastMark
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(4, 5);

bookie.getLedgerStorage().flush();
try {
LogMark logMark = readLogMark(ledgerDirMark);
assertEquals(4, logMark.getLogFileId());
assertEquals(5, logMark.getLogFileOffset());
} catch (Exception e) {
fail();
}

CheckpointSource checkpointSource = new CheckpointSourceList(bookie.getJournals());
bookie.getJournals().get(0).getLastLogMark().getCurMark().setLogMark(7, 8);
CheckpointSource.Checkpoint checkpoint = checkpointSource.newCheckpoint();
checkpointSource.checkpointComplete(checkpoint, false);

try {
LogMark firstLogMark = readLogMark(ledgerDirMark);
assertEquals(7, firstLogMark.getLogFileId());
assertEquals(8, firstLogMark.getLogFileOffset());
} catch (Exception e) {
fail();
}

// test replay journal lastMark, to make sure we get the right LastMark position
bookie.getJournals().get(0).getLastLogMark().readLog();
LogMark logMark = bookie.getJournals().get(0).getLastLogMark().getCurMark();
assertEquals(7, logMark.getLogFileId());
assertEquals(8, logMark.getLogFileOffset());
}
}