Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -191,4 +191,8 @@ public interface BookKeeperServerStats {
String NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE = "NUM_OF_WRITE_LEDGERS_REMOVED_CACHE_MAXSIZE";
String NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS = "NUM_LEDGERS_HAVING_MULTIPLE_ENTRYLOGS";
String ENTRYLOGS_PER_LEDGER = "ENTRYLOGS_PER_LEDGER";

// Entry logger stats
String NUM_OF_SCANNED_ENTRIES = "NUM_OF_SCANNED_ENTRIES";
String NUM_OF_SCANNED_BYTES = "NUM_OF_SCANNED_BYTES";
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@
package org.apache.bookkeeper.bookie;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.CATEGORY_SERVER;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.ENTRYLOGGER_SCOPE;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OF_SCANNED_BYTES;
import static org.apache.bookkeeper.bookie.BookKeeperServerStats.NUM_OF_SCANNED_ENTRIES;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -56,8 +60,10 @@
import java.util.concurrent.CopyOnWriteArrayList;

import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.bookkeeper.util.DiskChecker;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap;
Expand Down Expand Up @@ -323,6 +329,34 @@ interface EntryLogListener {
void onRotateEntryLog();
}

@StatsDoc(
name = ENTRYLOGGER_SCOPE,
category = CATEGORY_SERVER,
help = "EntryLogger related stats"
)
class EntryLogCounter {

@StatsDoc(
name = NUM_OF_SCANNED_ENTRIES,
help = "Number of scanned entries"
)
private final Counter numOfScannedEntries;

@StatsDoc(
name = NUM_OF_SCANNED_BYTES,
help = "Number of scanned bytes"
)
private final Counter numOfScannedBytes;

EntryLogCounter(StatsLogger statsLogger) {
this.numOfScannedEntries = statsLogger.getCounter(NUM_OF_SCANNED_ENTRIES);
this.numOfScannedBytes = statsLogger.getCounter(NUM_OF_SCANNED_BYTES);
}
}

private final StatsLogger statsLogger;
final EntryLogCounter entryLogCounter;

public EntryLogger(ServerConfiguration conf) throws IOException {
this(conf, new LedgerDirsManager(conf, conf.getLedgerDirs(),
new DiskChecker(conf.getDiskUsageThreshold(), conf.getDiskUsageWarnThreshold())));
Expand Down Expand Up @@ -381,6 +415,9 @@ public EntryLogger(ServerConfiguration conf,
this.entryLogManager = new EntryLogManagerForSingleEntryLog(conf, ledgerDirsManager, entryLoggerAllocator,
listeners, recentlyCreatedEntryLogsStatus);
}

this.statsLogger = statsLogger;
this.entryLogCounter = new EntryLogCounter(statsLogger);
}

EntryLogManager getEntryLogManager() {
Expand Down Expand Up @@ -1018,6 +1055,9 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
LOG.warn("Short read for entry size from entrylog {}", entryLogId);
return;
}
entryLogCounter.numOfScannedEntries.inc();
entryLogCounter.numOfScannedBytes.add(headerBuffer.writableBytes());

long offset = pos;
pos += 4;
int entrySize = headerBuffer.readInt();
Expand All @@ -1038,6 +1078,7 @@ public void scanEntryLog(long entryLogId, EntryLogScanner scanner) throws IOExce
}
data.capacity(entrySize);
int rc = readFromLogChannel(entryLogId, bc, data, pos);
entryLogCounter.numOfScannedBytes.add(data.writableBytes());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

data.readableBytes()?

if (rc != entrySize) {
LOG.warn("Short read for ledger entry from entryLog {}@{} ({} != {})",
entryLogId, pos, rc, entrySize);
Expand Down