Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,16 @@
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.PrimitiveIterator.OfLong;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.bookie.BookieException;
Expand Down Expand Up @@ -177,77 +180,108 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);

ledgerStorageList = Lists.newArrayList();
ExecutorService storageInitExecutor = Executors.newFixedThreadPool(
Math.min(ledgerDirsManager.getAllLedgerDirs().size(),
Runtime.getRuntime().availableProcessors() * 2),
new DefaultThreadFactory("LedgerStorage-Initializer")
);

ledgerStorageList = new ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(), null));
CountDownLatch downLatch = new CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size());
AtomicInteger initFailed = new AtomicInteger(0);
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
// Create a ledger dirs manager for the single directory
File[] lDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
lDirs[0] = ledgerDir.getParentFile();
LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);

// Create a index dirs manager for the single directory
File[] iDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
iDirs[0] = indexDir.getParentFile();
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);

EntryLogger entrylogger;
if (directIOEntryLogger) {
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
int readBufferSize = MB * (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerWrite"));
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerFlush"));

int numReadThreads = conf.getNumReadWorkerThreads();
if (numReadThreads == 0) {
numReadThreads = conf.getServerNumIOThreads();
int finalI = i;
storageInitExecutor.execute(() -> {
try {
File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(finalI);
File indexDir = indexDirsManager.getAllLedgerDirs().get(finalI);
// Create a ledger dirs manager for the single directory
File[] lDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
lDirs[0] = ledgerDir.getParentFile();
LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);

// Create a index dirs manager for the single directory
File[] iDirs = new File[1];
// Remove the `/current` suffix which will be appended again by LedgersDirManager
iDirs[0] = indexDir.getParentFile();
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
NullStatsLogger.INSTANCE);

EntryLogger entrylogger;
if (directIOEntryLogger) {
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
int readBufferSize = MB * (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
conf,
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerWrite"));
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
new DefaultThreadFactory("EntryLoggerFlush"));

int numReadThreads = conf.getNumReadWorkerThreads();
if (numReadThreads == 0) {
numReadThreads = conf.getServerNumIOThreads();
}

entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
new NativeIOImpl(),
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
conf.getEntryLogSizeLimit(),
conf.getNettyMaxFrameSizeBytes() - 500,
perDirectoryTotalWriteBufferSize,
perDirectoryTotalReadBufferSize,
readBufferSize,
numReadThreads,
maxFdCacheTimeSeconds,
slog, statsLogger);
} else {
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
}
ledgerStorageList.set(finalI, newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
}
} catch (IOException e) {
log.error("Failed to initialize DbLedgerStorage", e);
initFailed.incrementAndGet();
}
downLatch.countDown();
});
}

entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
new NativeIOImpl(),
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
conf.getEntryLogSizeLimit(),
conf.getNettyMaxFrameSizeBytes() - 500,
perDirectoryTotalWriteBufferSize,
perDirectoryTotalReadBufferSize,
readBufferSize,
numReadThreads,
maxFdCacheTimeSeconds,
slog, statsLogger);
} else {
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
}
ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
idm, entrylogger,
statsLogger, perDirectoryWriteCacheSize,
perDirectoryReadCacheSize,
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
}
log.info("awaiting for DbLedgerStorage initialization");
try {
downLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Failed to initialize DbLedgerStorage", e);
} finally {
storageInitExecutor.shutdown();
Copy link

Copilot AI Jun 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After shutting down the executor, consider awaiting its termination to ensure all tasks have completed gracefully before proceeding.

Suggested change
storageInitExecutor.shutdown();
storageInitExecutor.shutdown();
try {
if (!storageInitExecutor.awaitTermination(60, java.util.concurrent.TimeUnit.SECONDS)) {
log.warn("storageInitExecutor did not terminate within the timeout.");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("Interrupted while waiting for storageInitExecutor to terminate", e);
}

Copilot uses AI. Check for mistakes.
}
if (initFailed.get() > 0) {
throw new IOException("Failed to initialize DbLedgerStorage");
}
log.info("DbLedgerStorage initialization completed");

// parent DbLedgerStorage stats (not per directory)
readaheadBatchSizeGauge = new Gauge<Integer>() {
Expand Down