diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java index a47baddd35c..e4a4a839bf2 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java @@ -34,13 +34,16 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.PrimitiveIterator.OfLong; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.bookie.BookieException; @@ -177,77 +180,108 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE, DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE); - ledgerStorageList = Lists.newArrayList(); + ExecutorService storageInitExecutor = Executors.newFixedThreadPool( + Math.min(ledgerDirsManager.getAllLedgerDirs().size(), + Runtime.getRuntime().availableProcessors() * 2), + new DefaultThreadFactory("LedgerStorage-Initializer") + ); + + ledgerStorageList = new ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(), null)); + CountDownLatch downLatch = new CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size()); + AtomicInteger initFailed = new AtomicInteger(0); for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) { - File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i); - File indexDir = indexDirsManager.getAllLedgerDirs().get(i); - // Create a ledger dirs manager for the single directory - File[] lDirs = new File[1]; - // Remove the `/current` suffix which will be appended again by LedgersDirManager - lDirs[0] = ledgerDir.getParentFile(); - LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(), - NullStatsLogger.INSTANCE); - - // Create a index dirs manager for the single directory - File[] iDirs = new File[1]; - // Remove the `/current` suffix which will be appended again by LedgersDirManager - iDirs[0] = indexDir.getParentFile(); - LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), - NullStatsLogger.INSTANCE); - - EntryLogger entrylogger; - if (directIOEntryLogger) { - long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( - conf, - DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, - DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; - long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( - conf, - DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, - DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; - int readBufferSize = MB * (int) getLongVariableOrDefault( - conf, - DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, - DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); - int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( - conf, - DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, - DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); - Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); - entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( - new DefaultThreadFactory("EntryLoggerWrite")); - entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( - new DefaultThreadFactory("EntryLoggerFlush")); - - int numReadThreads = conf.getNumReadWorkerThreads(); - if (numReadThreads == 0) { - numReadThreads = conf.getServerNumIOThreads(); + int finalI = i; + storageInitExecutor.execute(() -> { + try { + File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(finalI); + File indexDir = indexDirsManager.getAllLedgerDirs().get(finalI); + // Create a ledger dirs manager for the single directory + File[] lDirs = new File[1]; + // Remove the `/current` suffix which will be appended again by LedgersDirManager + lDirs[0] = ledgerDir.getParentFile(); + LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(), + NullStatsLogger.INSTANCE); + + // Create a index dirs manager for the single directory + File[] iDirs = new File[1]; + // Remove the `/current` suffix which will be appended again by LedgersDirManager + iDirs[0] = indexDir.getParentFile(); + LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), + NullStatsLogger.INSTANCE); + + EntryLogger entrylogger; + if (directIOEntryLogger) { + long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; + long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; + int readBufferSize = MB * (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, + DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); + int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( + conf, + DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, + DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); + Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); + entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerWrite")); + entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( + new DefaultThreadFactory("EntryLoggerFlush")); + + int numReadThreads = conf.getNumReadWorkerThreads(); + if (numReadThreads == 0) { + numReadThreads = conf.getServerNumIOThreads(); + } + + entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog), + new NativeIOImpl(), + allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, + conf.getEntryLogSizeLimit(), + conf.getNettyMaxFrameSizeBytes() - 500, + perDirectoryTotalWriteBufferSize, + perDirectoryTotalReadBufferSize, + readBufferSize, + numReadThreads, + maxFdCacheTimeSeconds, + slog, statsLogger); + } else { + entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); + } + ledgerStorageList.set(finalI, newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, + idm, entrylogger, + statsLogger, perDirectoryWriteCacheSize, + perDirectoryReadCacheSize, + readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); + ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); + if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { + idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); + } + } catch (IOException e) { + log.error("Failed to initialize DbLedgerStorage", e); + initFailed.incrementAndGet(); } + downLatch.countDown(); + }); + } - entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog), - new NativeIOImpl(), - allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, - conf.getEntryLogSizeLimit(), - conf.getNettyMaxFrameSizeBytes() - 500, - perDirectoryTotalWriteBufferSize, - perDirectoryTotalReadBufferSize, - readBufferSize, - numReadThreads, - maxFdCacheTimeSeconds, - slog, statsLogger); - } else { - entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); - } - ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, - idm, entrylogger, - statsLogger, perDirectoryWriteCacheSize, - perDirectoryReadCacheSize, - readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); - ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); - if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { - idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); - } + log.info("awaiting for DbLedgerStorage initialization"); + try { + downLatch.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("Failed to initialize DbLedgerStorage", e); + } finally { + storageInitExecutor.shutdown(); + } + if (initFailed.get() > 0) { + throw new IOException("Failed to initialize DbLedgerStorage"); } + log.info("DbLedgerStorage initialization completed"); // parent DbLedgerStorage stats (not per directory) readaheadBatchSizeGauge = new Gauge() {