Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package org.apache.bookkeeper.bookie;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.bookie.storage.EntryLogScanner.ReadLengthType.READ_LEDGER_ENTRY_ID;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -1036,16 +1037,33 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
}
// read the entry
data.clear();
data.capacity(entrySize);
int rc = readFromLogChannel(entryLogId, bc, data, pos);
if (rc != entrySize) {
LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})",
entryLogId, pos, rc, entrySize);
return;
int rc;
// process the entry based on the read length type
switch (scanner.getReadLengthType()) {
case READ_NOTHING:
// skip read
scanner.process(ledgerId, offset, entrySize);
break;
case READ_LEDGER_ENTRY_ID:
data.capacity(READ_LEDGER_ENTRY_ID.getLengthToRead());
rc = readFromLogChannel(entryLogId, bc, data, pos);
if (rc != READ_LEDGER_ENTRY_ID.getLengthToRead()) {
LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId);
return;
}
long entryId = data.getLong(Long.BYTES);
scanner.process(ledgerId, offset, entrySize, entryId);
break;
case READ_ALL:
data.capacity(entrySize);
rc = readFromLogChannel(entryLogId, bc, data, pos);
if (rc != entrySize) {
LOG.warn("Short read for ledger entry id from entrylog {}", entryLogId);
return;
}
scanner.process(ledgerId, offset, data);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add the "default" case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, PTAL.

}
// process the entry
scanner.process(ledgerId, offset, data);

// Advance position to the next entry
pos += entrySize;
}
Expand Down Expand Up @@ -1164,18 +1182,21 @@ private EntryLogMetadata extractEntryLogMetadataByScanning(long entryLogId,
// Read through the entry log file and extract the entry log meta
scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
if (throttler != null) {
throttler.acquire(entry.readableBytes());
}
public void process(long ledgerId, long offset, int entrySize) throws IOException {
// add new entry size of a ledger to entry log meta
meta.addLedgerSize(ledgerId, entry.readableBytes() + 4);
meta.addLedgerSize(ledgerId, entrySize + 4);
}

@Override
public boolean accept(long ledgerId) {
return ledgerId >= 0;
}

@Override
public ReadLengthType getReadLengthType(){
// we only need to read the entry size.
return ReadLengthType.READ_NOTHING;
}
});

if (LOG.isDebugEnabled()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,7 @@ public void initiate(boolean dryRun) throws IOException {
LOG.info("Scanning {}", entryLogId);
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
long entryId = entry.getLong(8);

public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {
stats.computeIfAbsent(ledgerId, (ignore) -> new RecoveryStats()).registerEntry(entryId);

// Actual location indexed is pointing past the entry size
Expand All @@ -138,6 +136,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
public boolean accept(long ledgerId) {
return ledgerIds.contains(ledgerId);
}

@Override
public ReadLengthType getReadLengthType() {
return ReadLengthType.READ_LEDGER_ENTRY_ID;
}
});

ledgerCache.flushLedger(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,17 +315,19 @@ public boolean accept(long ledgerId) {
}

@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
long lid = entry.getLong(entry.readerIndex());
long entryId = entry.getLong(entry.readerIndex() + 8);
if (lid != ledgerId || entryId < -1) {
LOG.warn("Scanning expected ledgerId {}, but found invalid entry "
+ "with ledgerId {} entryId {} at offset {}",
ledgerId, lid, entryId, offset);
public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {
if (entryId < -1) {
LOG.warn("Scanning found invalid entry with ledgerId {} entryId {} at offset {}",
ledgerId, entryId, offset);
throw new IOException("Invalid entry found @ offset " + offset);
}
long location = (compactionLog.getDstLogId() << 32L) | (offset + 4);
offsets.add(new EntryLocation(lid, entryId, location));
offsets.add(new EntryLocation(ledgerId, entryId, location));
}

@Override
public ReadLengthType getReadLengthType() {
return ReadLengthType.READ_LEDGER_ENTRY_ID;
}
});
LOG.info("Recovered {} entry locations from compacted log {}", offsets.size(), compactionLog.getDstLogId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,26 +27,62 @@
* Scan entries in a entry log file.
*/
public interface EntryLogScanner {
enum ReadLengthType {
// Read all data of the entry
READ_ALL(Integer.MAX_VALUE),
// Read nothing of the entry
READ_NOTHING(0),
// Read ledger id(8 byte) and entry id(8 byte) in the beginning of the entry
READ_LEDGER_ENTRY_ID(16);

private final int lengthToRead;
ReadLengthType(int lengthToRead) {
this.lengthToRead = lengthToRead;
}

public int getLengthToRead() {
return lengthToRead;
}
}

/**
* Tests whether or not the entries belongs to the specified ledger
* should be processed.
*
* @param ledgerId
* Ledger ID.
* @param ledgerId ledger id
* @return true if and only the entries of the ledger should be scanned.
*/
boolean accept(long ledgerId);

/**
* Process an entry.
*
* @param ledgerId
* Ledger ID.
* @param offset
* File offset of this entry.
* @param entry
* Entry ByteBuf
* Process an entry when ReadLengthType is READ_NOTHING.
* @param ledgerId ledger id
* @param offset init offset of the entry
* @param entrySize entry size
* @throws IOException
*/
void process(long ledgerId, long offset, ByteBuf entry) throws IOException;
default void process(long ledgerId, long offset, int entrySize) throws IOException {}

/**
* Process an entry when ReadLengthType is READ_LEDGER_ENTRY_ID_LENGTH.
* @param ledgerId ledger id
* @param offset init offset of the entry
* @param entrySize entry size
* @param entryId entry id
* @throws IOException
*/
default void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {}

/**
* Process an entry when ReadLengthType is READ_ALL.
* @param ledgerId ledger id
* @param offset init offset of the entry
* @param entry entry, containing ledgerId(8byte), entryId(8byte),... entrySize=entry.readableBytes()
*/
default void process(long ledgerId, long offset, ByteBuf entry) throws IOException{}


default ReadLengthType getReadLengthType(){
return ReadLengthType.READ_ALL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -423,18 +423,20 @@ EntryLogMetadata scanEntryLogMetadata(long logId, AbstractLogCompactor.Throttler
// Read through the entry log file and extract the entry log meta
scanEntryLog(logId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
public void process(long ledgerId, long offset, int entrySize) throws IOException {
// add new entry size of a ledger to entry log meta
if (throttler != null) {
throttler.acquire(entry.readableBytes());
}
meta.addLedgerSize(ledgerId, entry.readableBytes() + Integer.BYTES);
meta.addLedgerSize(ledgerId, entrySize + Integer.BYTES);
}

@Override
public boolean accept(long ledgerId) {
return ledgerId >= 0;
}

@Override
public ReadLengthType getReadLengthType() {
return ReadLengthType.READ_NOTHING;
}
});
return meta;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import java.io.IOException;
import org.apache.bookkeeper.bookie.storage.EntryLogScanner;

import static org.apache.bookkeeper.bookie.storage.directentrylogger.LogMetadata.INVALID_LID;

class LogReaderScan {
static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner scanner) throws IOException {
int offset = Header.LOGFILE_LEGACY_HEADER_SIZE;
Expand All @@ -47,11 +49,24 @@ static void scan(ByteBufAllocator allocator, LogReader reader, EntryLogScanner s
// have realigned on the block boundary.
offset += Integer.BYTES;

long ledgerId = reader.readLongAt(offset);
if (ledgerId == INVALID_LID || !scanner.accept(ledgerId)) {
offset += entrySize;
continue;
}

entry.clear();
reader.readIntoBufferAt(entry, offset, entrySize);
long ledgerId = entry.getLong(0);
if (ledgerId >= 0 && scanner.accept(ledgerId)) {
scanner.process(ledgerId, initOffset, entry);
switch (scanner.getReadLengthType()) {
case READ_NOTHING:
scanner.process(ledgerId, initOffset, entrySize);
break;
case READ_LEDGER_ENTRY_ID:
long entryId = reader.readLongAt(offset + Long.BYTES);
scanner.process(ledgerId, initOffset, entrySize, entryId);
break;
case READ_ALL:
reader.readIntoBufferAt(entry, offset, entrySize);
scanner.process(ledgerId, initOffset, entry);
}
offset += entrySize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ private void scanEntryLogFiles(Set<Long> ledgers, File[] lDirs) throws IOExcepti
for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
public void process(long ledgerId, long offset, int entrySize) throws IOException {
if (ledgers.add(ledgerId)) {
if (verbose) {
LOG.info("Found ledger {} in entry log", ledgerId);
Expand All @@ -174,6 +174,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
public boolean accept(long ledgerId) {
return true;
}

@Override
public ReadLengthType getReadLengthType() {
return ReadLengthType.READ_NOTHING;
}
});

++completedEntryLogs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
package org.apache.bookkeeper.bookie.storage.ldb;

import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;

import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
Expand Down Expand Up @@ -100,9 +100,7 @@ public void initiate() throws IOException {
for (long entryLogId : entryLogs) {
entryLogger.scanEntryLog(entryLogId, new EntryLogScanner() {
@Override
public void process(long ledgerId, long offset, ByteBuf entry) throws IOException {
long entryId = entry.getLong(8);

public void process(long ledgerId, long offset, int entrySize, long entryId) throws IOException {
// Actual location indexed is pointing past the entry size
long location = (entryLogId << 32L) | (offset + 4);

Expand Down Expand Up @@ -135,6 +133,11 @@ public void process(long ledgerId, long offset, ByteBuf entry) throws IOExceptio
public boolean accept(long ledgerId) {
return activeLedgers.contains(ledgerId);
}

@Override
public ReadLengthType getReadLengthType() {
return ReadLengthType.READ_LEDGER_ENTRY_ID;
}
});

++completedEntryLogs;
Expand Down