Skip to content

Commit 71a0dbd

Browse files
author
zhaizhibo
committed
Concurrently initialize LedgerStorage to optimize startup performance.
1 parent 3fea440 commit 71a0dbd

File tree

1 file changed

+102
-67
lines changed

1 file changed

+102
-67
lines changed

bookkeeper-server/src/main/java/org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.java

Lines changed: 102 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -34,13 +34,16 @@
3434
import java.io.File;
3535
import java.io.IOException;
3636
import java.util.ArrayList;
37+
import java.util.Collections;
3738
import java.util.EnumSet;
3839
import java.util.HashMap;
3940
import java.util.List;
4041
import java.util.Map;
4142
import java.util.PrimitiveIterator.OfLong;
43+
import java.util.concurrent.CountDownLatch;
4244
import java.util.concurrent.ExecutorService;
4345
import java.util.concurrent.Executors;
46+
import java.util.concurrent.atomic.AtomicInteger;
4447
import java.util.stream.Collectors;
4548
import lombok.extern.slf4j.Slf4j;
4649
import org.apache.bookkeeper.bookie.BookieException;
@@ -177,77 +180,109 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le
177180
long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE,
178181
DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE);
179182

180-
ledgerStorageList = Lists.newArrayList();
183+
ExecutorService storageInitExecutor = Executors.newFixedThreadPool(
184+
Math.min(ledgerDirsManager.getAllLedgerDirs().size(),
185+
Runtime.getRuntime().availableProcessors() * 2),
186+
new DefaultThreadFactory("LedgerStorage-Initializer")
187+
);
188+
189+
ledgerStorageList = new ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(), null));
190+
CountDownLatch downLatch = new CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size());
191+
AtomicInteger initFailed = new AtomicInteger(0);
181192
for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) {
182-
File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i);
183-
File indexDir = indexDirsManager.getAllLedgerDirs().get(i);
184-
// Create a ledger dirs manager for the single directory
185-
File[] lDirs = new File[1];
186-
// Remove the `/current` suffix which will be appended again by LedgersDirManager
187-
lDirs[0] = ledgerDir.getParentFile();
188-
LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
189-
NullStatsLogger.INSTANCE);
190-
191-
// Create a index dirs manager for the single directory
192-
File[] iDirs = new File[1];
193-
// Remove the `/current` suffix which will be appended again by LedgersDirManager
194-
iDirs[0] = indexDir.getParentFile();
195-
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
196-
NullStatsLogger.INSTANCE);
197-
198-
EntryLogger entrylogger;
199-
if (directIOEntryLogger) {
200-
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
201-
conf,
202-
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
203-
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
204-
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
205-
conf,
206-
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
207-
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
208-
int readBufferSize = MB * (int) getLongVariableOrDefault(
209-
conf,
210-
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
211-
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
212-
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
213-
conf,
214-
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
215-
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
216-
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
217-
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
218-
new DefaultThreadFactory("EntryLoggerWrite"));
219-
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
220-
new DefaultThreadFactory("EntryLoggerFlush"));
221-
222-
int numReadThreads = conf.getNumReadWorkerThreads();
223-
if (numReadThreads == 0) {
224-
numReadThreads = conf.getServerNumIOThreads();
193+
int finalI = i;
194+
storageInitExecutor.execute(() -> {
195+
try {
196+
File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(finalI);
197+
File indexDir = indexDirsManager.getAllLedgerDirs().get(finalI);
198+
// Create a ledger dirs manager for the single directory
199+
File[] lDirs = new File[1];
200+
// Remove the `/current` suffix which will be appended again by LedgersDirManager
201+
lDirs[0] = ledgerDir.getParentFile();
202+
LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(),
203+
NullStatsLogger.INSTANCE);
204+
205+
// Create a index dirs manager for the single directory
206+
File[] iDirs = new File[1];
207+
// Remove the `/current` suffix which will be appended again by LedgersDirManager
208+
iDirs[0] = indexDir.getParentFile();
209+
LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(),
210+
NullStatsLogger.INSTANCE);
211+
212+
EntryLogger entrylogger;
213+
if (directIOEntryLogger) {
214+
long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault(
215+
conf,
216+
DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB,
217+
DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs;
218+
long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault(
219+
conf,
220+
DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB,
221+
DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs;
222+
int readBufferSize = MB * (int) getLongVariableOrDefault(
223+
conf,
224+
DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB,
225+
DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB);
226+
int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault(
227+
conf,
228+
DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS,
229+
DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS);
230+
Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class);
231+
entryLoggerWriteExecutor = Executors.newSingleThreadExecutor(
232+
new DefaultThreadFactory("EntryLoggerWrite"));
233+
entryLoggerFlushExecutor = Executors.newSingleThreadExecutor(
234+
new DefaultThreadFactory("EntryLoggerFlush"));
235+
236+
int numReadThreads = conf.getNumReadWorkerThreads();
237+
if (numReadThreads == 0) {
238+
numReadThreads = conf.getServerNumIOThreads();
239+
}
240+
241+
entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
242+
new NativeIOImpl(),
243+
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
244+
conf.getEntryLogSizeLimit(),
245+
conf.getNettyMaxFrameSizeBytes() - 500,
246+
perDirectoryTotalWriteBufferSize,
247+
perDirectoryTotalReadBufferSize,
248+
readBufferSize,
249+
numReadThreads,
250+
maxFdCacheTimeSeconds,
251+
slog, statsLogger);
252+
} else {
253+
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
254+
}
255+
ledgerStorageList.set(finalI, newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
256+
idm, entrylogger,
257+
statsLogger, perDirectoryWriteCacheSize,
258+
perDirectoryReadCacheSize,
259+
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
260+
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
261+
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
262+
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
263+
}
264+
} catch (IOException e) {
265+
log.error("Failed to initialize DbLedgerStorage", e);
266+
initFailed.incrementAndGet();
225267
}
268+
downLatch.countDown();
269+
});
270+
}
226271

227-
entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog),
228-
new NativeIOImpl(),
229-
allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor,
230-
conf.getEntryLogSizeLimit(),
231-
conf.getNettyMaxFrameSizeBytes() - 500,
232-
perDirectoryTotalWriteBufferSize,
233-
perDirectoryTotalReadBufferSize,
234-
readBufferSize,
235-
numReadThreads,
236-
maxFdCacheTimeSeconds,
237-
slog, statsLogger);
238-
} else {
239-
entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator);
240-
}
241-
ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm,
242-
idm, entrylogger,
243-
statsLogger, perDirectoryWriteCacheSize,
244-
perDirectoryReadCacheSize,
245-
readAheadCacheBatchSize, readAheadCacheBatchBytesSize));
246-
ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener);
247-
if (!lDirs[0].getPath().equals(iDirs[0].getPath())) {
248-
idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener);
249-
}
272+
log.info("awaiting for DbLedgerStorage initialization");
273+
try {
274+
downLatch.await();
275+
} catch (InterruptedException e) {
276+
Thread.currentThread().interrupt();
277+
log.error("Failed to initialize DbLedgerStorage", e);
278+
} finally {
279+
storageInitExecutor.shutdown();
280+
}
281+
if (initFailed.get() > 0) {
282+
throw new IOException("Failed to initialize DbLedgerStorage");
250283
}
284+
log.info("DbLedgerStorage initialization completed");
285+
storageInitExecutor.shutdown();
251286

252287
// parent DbLedgerStorage stats (not per directory)
253288
readaheadBatchSizeGauge = new Gauge<Integer>() {

0 commit comments

Comments
 (0)