Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
logPosition = markedLog.getLogFileOffset();
}
LOG.info("Replaying journal {} from position {}", id, logPosition);
long scanOffset = journal.scanJournal(id, logPosition, scanner);
long scanOffset = journal.scanJournal(id, logPosition, scanner, this.conf.isSkipReplayJournalInvalidRecord());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a question:
How do we decide to open the switch
What I understand is that open this switch ,maube cause to loss data,
does this need to discuss it on the dev@ mailing list @eolivelli , have a look,thanks

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what we meet is

a bk shutdown because of VM crash,then VM recovered , but we can not restart bk , because of the exception。we can not skip this, we only can do format data for this VM.. and re install bk

so if we want to recovery the bk in the scene, we can open the switch only on the machine,it will startup, and next time, we will close this switch

// Update LastLogMark after completely replaying journal
// scanOffset will point to EOF position
// After LedgerStorage flush, SyncThread should persist this to disk
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,14 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
/**
* Scan the journal.
*
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @param journalId Journal Log Id
* @param journalPos Offset to start scanning
* @param scanner Scanner to handle entries
* @param skipInvalidRecord when invalid record,should we skip it or not
* @return scanOffset - represents the byte till which journal was read
* @throws IOException
*/
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
public long scanJournal(long journalId, long journalPos, JournalScanner scanner, boolean skipInvalidRecord)
throws IOException {
JournalChannel recLog;
if (journalPos <= 0) {
Expand Down Expand Up @@ -897,7 +898,12 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
continue;
}
isPaddingRecord = true;
} else {
} else if (skipInvalidRecord){
LOG.warn("Invalid record found with negative length: {},because of " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can catch the exception before finally.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

"skipInvalidRecord is true,we skip the next data", len);
break;
}
else {
LOG.error("Invalid record found with negative length: {}", len);
throw new IOException("Invalid record found with negative length " + len);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ public void process(int journalVersion, long offset, ByteBuffer entry) {
LOG.info("Found ledger {} in journal", ledgerId);
}
}
});
}, false);
}

private void delete(Path path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
// Used for location index, lots of writes and much bigger dataset
protected static final String LEDGER_METADATA_ROCKSDB_CONF = "ledgerMetadataRocksdbConf";

protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";

/**
* Construct a default configuration object.
*/
Expand Down Expand Up @@ -3917,6 +3919,25 @@ public boolean isDataIntegrityStampMissingCookiesEnabled() {
return this.getBoolean(DATA_INTEGRITY_COOKIE_STAMPING_ENABLED, false);
}


/**
* When this config is set to true,if we replay journal failed, we will skip
* @param skipReplayJournalInvalidRecord
* @return
*/
public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean skipReplayJournalInvalidRecord) {
this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
Boolean.toString(skipReplayJournalInvalidRecord));
return this;
}

/**
* @see #isSkipReplayJournalInvalidRecord
*/
public boolean isSkipReplayJournalInvalidRecord() {
return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
}

/**
* Get default rocksdb conf.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,6 @@ public void process(int journalVersion, long offset, ByteBuffer entry) throws IO
}

private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException {
journal.scanJournal(journalId, 0L, scanner);
journal.scanJournal(journalId, 0L, scanner, false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,51 @@ private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] ma
return jc;
}

private JournalChannel writeV4JournalWithInvalidRecord(File journalDir, int numEntries, byte[] masterKey) throws Exception {
long logId = System.currentTimeMillis();
JournalChannel jc = new JournalChannel(journalDir, logId);

moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);

BufferedChannel bc = jc.getBufferedChannel();

byte[] data = new byte[1024];
Arrays.fill(data, (byte) 'X');
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
for (int i = 0; i <= numEntries; i++) {
ByteBuf packet;
if (i == 0) {
packet = generateMetaEntry(1, masterKey);
} else {
packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
}
lastConfirmed = i;
ByteBuffer lenBuff = ByteBuffer.allocate(4);
if (i == numEntries - 1) {
//mock when flush data to file ,it writes an invalid entry to journal
lenBuff.putInt(-1);
} else {
lenBuff.putInt(packet.readableBytes());
}
lenBuff.flip();
bc.write(Unpooled.wrappedBuffer(lenBuff));
bc.write(packet);
packet.release();
}

// write fence key
ByteBuf packet = generateFenceEntry(1);
ByteBuf lenBuf = Unpooled.buffer();
lenBuf.writeInt(packet.readableBytes());
//mock
bc.write(lenBuf);
bc.write(packet);
bc.flushAndForceWrite(false);
updateJournalVersion(jc, JournalChannel.V4);

return jc;
}

static JournalChannel writeV5Journal(File journalDir, int numEntries,
byte[] masterKey) throws Exception {
return writeV5Journal(journalDir, numEntries, masterKey, false);
Expand Down Expand Up @@ -844,7 +889,7 @@ public void testJournalScanIOException() throws Exception {
assertEquals(journalIds.size(), 1);

try {
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner);
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner, false);
fail("Should not have been able to scan the journal");
} catch (Exception e) {
// Expected
Expand All @@ -854,6 +899,46 @@ public void testJournalScanIOException() throws Exception {
b.shutdown();
}

/**
* Test for invalid record data during read of Journal.
*/
@Test
public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
File journalDir = createTempDir("bookie", "journal");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(journalDir));

File ledgerDir = createTempDir("bookie", "ledger");
BookieImpl.checkDirectoryStructure(BookieImpl.getCurrentDirectory(ledgerDir));

writeV4JournalWithInvalidRecord(BookieImpl.getCurrentDirectory(journalDir), 100, "testPasswd".getBytes());


ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
conf.setJournalDirName(journalDir.getPath())
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
.setMetadataServiceUri(null)
.setSkipReplayJournalInvalidRecord(true);

Journal.JournalScanner journalScanner = new DummyJournalScan();

BookieImpl b = new TestBookieImpl(conf);

for (Journal journal : b.journals) {
List<Long> journalIds = journal.listJournalIds(journal.getJournalDirectory(), null);

assertEquals(journalIds.size(), 1);

try {
journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord());
} catch (Exception e) {
fail("Should have been able to scan the journal,because of skip flag");
}
}

b.shutdown();
}


private class DummyJournalScan implements Journal.JournalScanner {

@Override
Expand Down