Skip to content

Commit 51adea5

Browse files
hangc0276zhiyuanlei
andcommitted
Support skip invalid journal record in replying journal stage (apache#3956)
Co-authored-by: zhiyuanlei <zhiyuanlei@tencent.com> (cherry picked from commit 5e9fdc2)
1 parent 963c80d commit 51adea5

File tree

5 files changed

+149
-9
lines changed

5 files changed

+149
-9
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Bookie.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,7 @@ private void replay(Journal journal, JournalScanner scanner) throws IOException
994994
logPosition = markedLog.getLogFileOffset();
995995
}
996996
LOG.info("Replaying journal {} from position {}", id, logPosition);
997-
long scanOffset = journal.scanJournal(id, logPosition, scanner);
997+
long scanOffset = journal.scanJournal(id, logPosition, scanner, conf.isSkipReplayJournalInvalidRecord());
998998
// Update LastLogMark after completely replaying journal
999999
// scanOffset will point to EOF position
10001000
// After LedgerStorage flush, SyncThread should persist this to disk

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/Journal.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -777,13 +777,14 @@ public void checkpointComplete(Checkpoint checkpoint, boolean compact) throws IO
777777
/**
778778
* Scan the journal.
779779
*
780-
* @param journalId Journal Log Id
781-
* @param journalPos Offset to start scanning
782-
* @param scanner Scanner to handle entries
780+
* @param journalId Journal Log Id
781+
* @param journalPos Offset to start scanning
782+
* @param scanner Scanner to handle entries
783+
* @param skipInvalidRecord when invalid record,should we skip it or not
783784
* @return scanOffset - represents the byte till which journal was read
784785
* @throws IOException
785786
*/
786-
public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
787+
public long scanJournal(long journalId, long journalPos, JournalScanner scanner, boolean skipInvalidRecord)
787788
throws IOException {
788789
JournalChannel recLog;
789790
if (journalPos <= 0) {
@@ -846,6 +847,13 @@ public long scanJournal(long journalId, long journalPos, JournalScanner scanner)
846847
}
847848
}
848849
return recLog.fc.position();
850+
} catch (IOException e) {
851+
if (skipInvalidRecord) {
852+
LOG.warn("Failed to parse journal file, and skipInvalidRecord is true, skip this journal file reply");
853+
} else {
854+
throw e;
855+
}
856+
return recLog.fc.position();
849857
} finally {
850858
recLog.close();
851859
}

bookkeeper-server/src/main/java/org/apache/bookkeeper/conf/ServerConfiguration.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,8 @@ public class ServerConfiguration extends AbstractConfiguration<ServerConfigurati
309309
protected static final String AUTHORIZED_ROLES = "authorizedRoles";
310310
protected static final String ROCKSDB_DELETE_ENTRIES_BATCH_SIZE = "rocksDBDeleteEntriesBatchSize";
311311

312+
protected static final String SKIP_REPLAY_JOURNAL_INVALID_RECORD = "skipReplayJournalInvalidRecord";
313+
312314
/**
313315
* Construct a default configuration object.
314316
*/
@@ -3617,7 +3619,24 @@ public int getInFlightReadEntryNumInLedgerChecker(){
36173619
}
36183620

36193621
/**
3620-
* Get entry log location index delete entries batch size from RocksDB.
3622+
* When this config is set to true,if we replay journal failed, we will skip.
3623+
* @param skipReplayJournalInvalidRecord
3624+
* @return
3625+
*/
3626+
public ServerConfiguration setSkipReplayJournalInvalidRecord(boolean skipReplayJournalInvalidRecord) {
3627+
this.setProperty(SKIP_REPLAY_JOURNAL_INVALID_RECORD,
3628+
Boolean.toString(skipReplayJournalInvalidRecord));
3629+
return this;
3630+
}
3631+
3632+
/**
3633+
* @see #isSkipReplayJournalInvalidRecord .
3634+
*/
3635+
public boolean isSkipReplayJournalInvalidRecord() {
3636+
return this.getBoolean(SKIP_REPLAY_JOURNAL_INVALID_RECORD, false);
3637+
}
3638+
3639+
/**
36213640
*
36223641
* @return Int rocksDB delete entries batch size configured in Service configuration.
36233642
*/

bookkeeper-server/src/main/java/org/apache/bookkeeper/tools/cli/commands/bookie/ReadJournalCommand.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,6 @@ public void process(int journalVersion, long offset, ByteBuffer entry) throws IO
211211
}
212212

213213
private void scanJournal(Journal journal, long journalId, Journal.JournalScanner scanner) throws IOException {
214-
journal.scanJournal(journalId, 0L, scanner);
214+
journal.scanJournal(journalId, 0L, scanner, false);
215215
}
216216
}

bookkeeper-server/src/test/java/org/apache/bookkeeper/bookie/BookieJournalTest.java

Lines changed: 115 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,52 @@ private JournalChannel writeV4Journal(File journalDir, int numEntries, byte[] ma
297297
return jc;
298298
}
299299

300+
private JournalChannel writeV4JournalWithInvalidRecord(File journalDir,
301+
int numEntries, byte[] masterKey) throws Exception {
302+
long logId = System.currentTimeMillis();
303+
JournalChannel jc = new JournalChannel(journalDir, logId);
304+
305+
moveToPosition(jc, JournalChannel.VERSION_HEADER_SIZE);
306+
307+
BufferedChannel bc = jc.getBufferedChannel();
308+
309+
byte[] data = new byte[1024];
310+
Arrays.fill(data, (byte) 'X');
311+
long lastConfirmed = LedgerHandle.INVALID_ENTRY_ID;
312+
for (int i = 0; i <= numEntries; i++) {
313+
ByteBuf packet;
314+
if (i == 0) {
315+
packet = generateMetaEntry(1, masterKey);
316+
} else {
317+
packet = ClientUtil.generatePacket(1, i, lastConfirmed, i * data.length, data);
318+
}
319+
lastConfirmed = i;
320+
ByteBuffer lenBuff = ByteBuffer.allocate(4);
321+
if (i == numEntries - 1) {
322+
//mock when flush data to file ,it writes an invalid entry to journal
323+
lenBuff.putInt(-1);
324+
} else {
325+
lenBuff.putInt(packet.readableBytes());
326+
}
327+
lenBuff.flip();
328+
bc.write(Unpooled.wrappedBuffer(lenBuff));
329+
bc.write(packet);
330+
packet.release();
331+
}
332+
333+
// write fence key
334+
ByteBuf packet = generateFenceEntry(1);
335+
ByteBuf lenBuf = Unpooled.buffer();
336+
lenBuf.writeInt(packet.readableBytes());
337+
//mock
338+
bc.write(lenBuf);
339+
bc.write(packet);
340+
bc.flushAndForceWrite(false);
341+
updateJournalVersion(jc, JournalChannel.V4);
342+
343+
return jc;
344+
}
345+
300346
static JournalChannel writeV5Journal(File journalDir, int numEntries,
301347
byte[] masterKey) throws Exception {
302348
return writeV5Journal(journalDir, numEntries, masterKey, false);
@@ -838,7 +884,7 @@ public void testJournalScanIOException() throws Exception {
838884
assertEquals(journalIds.size(), 1);
839885

840886
try {
841-
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner);
887+
journal.scanJournal(journalIds.get(0), Long.MAX_VALUE, journalScanner, false);
842888
fail("Should not have been able to scan the journal");
843889
} catch (Exception e) {
844890
// Expected
@@ -848,7 +894,74 @@ public void testJournalScanIOException() throws Exception {
848894
b.shutdown();
849895
}
850896

851-
private class DummyJournalScan implements Journal.JournalScanner {
897+
/**
898+
* Test for invalid record data during read of Journal.
899+
*/
900+
@Test
901+
public void testJournalScanInvalidRecordWithSkipFlag() throws Exception {
902+
File journalDir = createTempDir("bookie", "journal");
903+
Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(journalDir));
904+
905+
File ledgerDir = createTempDir("bookie", "ledger");
906+
Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(ledgerDir));
907+
908+
try {
909+
writeV4JournalWithInvalidRecord(Bookie.getCurrentDirectory(journalDir),
910+
100, "testPasswd".getBytes());
911+
} catch (Exception e) {
912+
fail();
913+
}
914+
915+
916+
ServerConfiguration conf = TestBKConfiguration.newServerConfiguration();
917+
// Disabled skip broken journal files by default
918+
conf.setJournalDirName(journalDir.getPath())
919+
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
920+
.setMetadataServiceUri(null)
921+
.setSkipReplayJournalInvalidRecord(true);
922+
923+
Journal.JournalScanner journalScanner = new DummyJournalScan();
924+
925+
Bookie b = new Bookie(conf);
926+
927+
for (Journal journal : b.journals) {
928+
List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null);
929+
assertEquals(journalIds.size(), 1);
930+
try {
931+
journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord());
932+
} catch (Exception e) {
933+
fail("Should pass the journal scanning because we enabled skip flag by default.");
934+
}
935+
}
936+
937+
b.shutdown();
938+
939+
// Disabled skip broken journal files by default
940+
conf = TestBKConfiguration.newServerConfiguration();
941+
conf.setJournalDirName(journalDir.getPath())
942+
.setLedgerDirNames(new String[] { ledgerDir.getPath() })
943+
.setMetadataServiceUri(null);
944+
945+
journalScanner = new DummyJournalScan();
946+
947+
b = new Bookie(conf);
948+
949+
for (Journal journal : b.journals) {
950+
List<Long> journalIds = Journal.listJournalIds(journal.getJournalDirectory(), null);
951+
assertEquals(journalIds.size(), 1);
952+
try {
953+
journal.scanJournal(journalIds.get(0), 0, journalScanner, conf.isSkipReplayJournalInvalidRecord());
954+
fail("Should fail the journal scanning because of disabled skip flag");
955+
} catch (Exception e) {
956+
// expected.
957+
}
958+
}
959+
960+
b.shutdown();
961+
}
962+
963+
964+
static class DummyJournalScan implements Journal.JournalScanner {
852965

853966
@Override
854967
public void process(int journalVersion, long offset, ByteBuffer entry) throws IOException {

0 commit comments

Comments
 (0)