|
34 | 34 | import java.io.File; |
35 | 35 | import java.io.IOException; |
36 | 36 | import java.util.ArrayList; |
| 37 | +import java.util.Collections; |
37 | 38 | import java.util.EnumSet; |
38 | 39 | import java.util.HashMap; |
39 | 40 | import java.util.List; |
40 | 41 | import java.util.Map; |
41 | 42 | import java.util.PrimitiveIterator.OfLong; |
| 43 | +import java.util.concurrent.CountDownLatch; |
42 | 44 | import java.util.concurrent.ExecutorService; |
43 | 45 | import java.util.concurrent.Executors; |
| 46 | +import java.util.concurrent.atomic.AtomicInteger; |
44 | 47 | import java.util.stream.Collectors; |
45 | 48 | import lombok.extern.slf4j.Slf4j; |
46 | 49 | import org.apache.bookkeeper.bookie.BookieException; |
@@ -177,77 +180,108 @@ public void initialize(ServerConfiguration conf, LedgerManager ledgerManager, Le |
177 | 180 | long readAheadCacheBatchBytesSize = conf.getInt(READ_AHEAD_CACHE_BATCH_BYTES_SIZE, |
178 | 181 | DEFAULT_READ_AHEAD_CACHE_BATCH_BYTES_SIZE); |
179 | 182 |
|
180 | | - ledgerStorageList = Lists.newArrayList(); |
| 183 | + ExecutorService storageInitExecutor = Executors.newFixedThreadPool( |
| 184 | + Math.min(ledgerDirsManager.getAllLedgerDirs().size(), |
| 185 | + Runtime.getRuntime().availableProcessors() * 2), |
| 186 | + new DefaultThreadFactory("LedgerStorage-Initializer") |
| 187 | + ); |
| 188 | + |
| 189 | + ledgerStorageList = new ArrayList<>(Collections.nCopies(ledgerDirsManager.getAllLedgerDirs().size(), null)); |
| 190 | + CountDownLatch downLatch = new CountDownLatch(ledgerDirsManager.getAllLedgerDirs().size()); |
| 191 | + AtomicInteger initFailed = new AtomicInteger(0); |
181 | 192 | for (int i = 0; i < ledgerDirsManager.getAllLedgerDirs().size(); i++) { |
182 | | - File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(i); |
183 | | - File indexDir = indexDirsManager.getAllLedgerDirs().get(i); |
184 | | - // Create a ledger dirs manager for the single directory |
185 | | - File[] lDirs = new File[1]; |
186 | | - // Remove the `/current` suffix which will be appended again by LedgersDirManager |
187 | | - lDirs[0] = ledgerDir.getParentFile(); |
188 | | - LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(), |
189 | | - NullStatsLogger.INSTANCE); |
190 | | - |
191 | | - // Create a index dirs manager for the single directory |
192 | | - File[] iDirs = new File[1]; |
193 | | - // Remove the `/current` suffix which will be appended again by LedgersDirManager |
194 | | - iDirs[0] = indexDir.getParentFile(); |
195 | | - LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), |
196 | | - NullStatsLogger.INSTANCE); |
197 | | - |
198 | | - EntryLogger entrylogger; |
199 | | - if (directIOEntryLogger) { |
200 | | - long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( |
201 | | - conf, |
202 | | - DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, |
203 | | - DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; |
204 | | - long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( |
205 | | - conf, |
206 | | - DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, |
207 | | - DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; |
208 | | - int readBufferSize = MB * (int) getLongVariableOrDefault( |
209 | | - conf, |
210 | | - DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, |
211 | | - DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); |
212 | | - int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( |
213 | | - conf, |
214 | | - DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, |
215 | | - DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); |
216 | | - Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); |
217 | | - entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( |
218 | | - new DefaultThreadFactory("EntryLoggerWrite")); |
219 | | - entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( |
220 | | - new DefaultThreadFactory("EntryLoggerFlush")); |
221 | | - |
222 | | - int numReadThreads = conf.getNumReadWorkerThreads(); |
223 | | - if (numReadThreads == 0) { |
224 | | - numReadThreads = conf.getServerNumIOThreads(); |
| 193 | + int finalI = i; |
| 194 | + storageInitExecutor.execute(() -> { |
| 195 | + try { |
| 196 | + File ledgerDir = ledgerDirsManager.getAllLedgerDirs().get(finalI); |
| 197 | + File indexDir = indexDirsManager.getAllLedgerDirs().get(finalI); |
| 198 | + // Create a ledger dirs manager for the single directory |
| 199 | + File[] lDirs = new File[1]; |
| 200 | + // Remove the `/current` suffix which will be appended again by LedgersDirManager |
| 201 | + lDirs[0] = ledgerDir.getParentFile(); |
| 202 | + LedgerDirsManager ldm = new LedgerDirsManager(conf, lDirs, ledgerDirsManager.getDiskChecker(), |
| 203 | + NullStatsLogger.INSTANCE); |
| 204 | + |
| 205 | + // Create a index dirs manager for the single directory |
| 206 | + File[] iDirs = new File[1]; |
| 207 | + // Remove the `/current` suffix which will be appended again by LedgersDirManager |
| 208 | + iDirs[0] = indexDir.getParentFile(); |
| 209 | + LedgerDirsManager idm = new LedgerDirsManager(conf, iDirs, indexDirsManager.getDiskChecker(), |
| 210 | + NullStatsLogger.INSTANCE); |
| 211 | + |
| 212 | + EntryLogger entrylogger; |
| 213 | + if (directIOEntryLogger) { |
| 214 | + long perDirectoryTotalWriteBufferSize = MB * getLongVariableOrDefault( |
| 215 | + conf, |
| 216 | + DIRECT_IO_ENTRYLOGGER_TOTAL_WRITEBUFFER_SIZE_MB, |
| 217 | + DEFAULT_DIRECT_IO_TOTAL_WRITEBUFFER_SIZE_MB) / numberOfDirs; |
| 218 | + long perDirectoryTotalReadBufferSize = MB * getLongVariableOrDefault( |
| 219 | + conf, |
| 220 | + DIRECT_IO_ENTRYLOGGER_TOTAL_READBUFFER_SIZE_MB, |
| 221 | + DEFAULT_DIRECT_IO_TOTAL_READBUFFER_SIZE_MB) / numberOfDirs; |
| 222 | + int readBufferSize = MB * (int) getLongVariableOrDefault( |
| 223 | + conf, |
| 224 | + DIRECT_IO_ENTRYLOGGER_READBUFFER_SIZE_MB, |
| 225 | + DEFAULT_DIRECT_IO_READBUFFER_SIZE_MB); |
| 226 | + int maxFdCacheTimeSeconds = (int) getLongVariableOrDefault( |
| 227 | + conf, |
| 228 | + DIRECT_IO_ENTRYLOGGER_MAX_FD_CACHE_TIME_SECONDS, |
| 229 | + DEFAULT_DIRECT_IO_MAX_FD_CACHE_TIME_SECONDS); |
| 230 | + Slf4jSlogger slog = new Slf4jSlogger(DbLedgerStorage.class); |
| 231 | + entryLoggerWriteExecutor = Executors.newSingleThreadExecutor( |
| 232 | + new DefaultThreadFactory("EntryLoggerWrite")); |
| 233 | + entryLoggerFlushExecutor = Executors.newSingleThreadExecutor( |
| 234 | + new DefaultThreadFactory("EntryLoggerFlush")); |
| 235 | + |
| 236 | + int numReadThreads = conf.getNumReadWorkerThreads(); |
| 237 | + if (numReadThreads == 0) { |
| 238 | + numReadThreads = conf.getServerNumIOThreads(); |
| 239 | + } |
| 240 | + |
| 241 | + entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog), |
| 242 | + new NativeIOImpl(), |
| 243 | + allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, |
| 244 | + conf.getEntryLogSizeLimit(), |
| 245 | + conf.getNettyMaxFrameSizeBytes() - 500, |
| 246 | + perDirectoryTotalWriteBufferSize, |
| 247 | + perDirectoryTotalReadBufferSize, |
| 248 | + readBufferSize, |
| 249 | + numReadThreads, |
| 250 | + maxFdCacheTimeSeconds, |
| 251 | + slog, statsLogger); |
| 252 | + } else { |
| 253 | + entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); |
| 254 | + } |
| 255 | + ledgerStorageList.set(finalI, newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, |
| 256 | + idm, entrylogger, |
| 257 | + statsLogger, perDirectoryWriteCacheSize, |
| 258 | + perDirectoryReadCacheSize, |
| 259 | + readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); |
| 260 | + ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); |
| 261 | + if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { |
| 262 | + idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); |
| 263 | + } |
| 264 | + } catch (IOException e) { |
| 265 | + log.error("Failed to initialize DbLedgerStorage", e); |
| 266 | + initFailed.incrementAndGet(); |
225 | 267 | } |
| 268 | + downLatch.countDown(); |
| 269 | + }); |
| 270 | + } |
226 | 271 |
|
227 | | - entrylogger = new DirectEntryLogger(ledgerDir, new EntryLogIdsImpl(ldm, slog), |
228 | | - new NativeIOImpl(), |
229 | | - allocator, entryLoggerWriteExecutor, entryLoggerFlushExecutor, |
230 | | - conf.getEntryLogSizeLimit(), |
231 | | - conf.getNettyMaxFrameSizeBytes() - 500, |
232 | | - perDirectoryTotalWriteBufferSize, |
233 | | - perDirectoryTotalReadBufferSize, |
234 | | - readBufferSize, |
235 | | - numReadThreads, |
236 | | - maxFdCacheTimeSeconds, |
237 | | - slog, statsLogger); |
238 | | - } else { |
239 | | - entrylogger = new DefaultEntryLogger(conf, ldm, null, statsLogger, allocator); |
240 | | - } |
241 | | - ledgerStorageList.add(newSingleDirectoryDbLedgerStorage(conf, ledgerManager, ldm, |
242 | | - idm, entrylogger, |
243 | | - statsLogger, perDirectoryWriteCacheSize, |
244 | | - perDirectoryReadCacheSize, |
245 | | - readAheadCacheBatchSize, readAheadCacheBatchBytesSize)); |
246 | | - ldm.getListeners().forEach(ledgerDirsManager::addLedgerDirsListener); |
247 | | - if (!lDirs[0].getPath().equals(iDirs[0].getPath())) { |
248 | | - idm.getListeners().forEach(indexDirsManager::addLedgerDirsListener); |
249 | | - } |
| 272 | + log.info("awaiting for DbLedgerStorage initialization"); |
| 273 | + try { |
| 274 | + downLatch.await(); |
| 275 | + } catch (InterruptedException e) { |
| 276 | + Thread.currentThread().interrupt(); |
| 277 | + log.error("Failed to initialize DbLedgerStorage", e); |
| 278 | + } finally { |
| 279 | + storageInitExecutor.shutdown(); |
| 280 | + } |
| 281 | + if (initFailed.get() > 0) { |
| 282 | + throw new IOException("Failed to initialize DbLedgerStorage"); |
250 | 283 | } |
| 284 | + log.info("DbLedgerStorage initialization completed"); |
251 | 285 |
|
252 | 286 | // parent DbLedgerStorage stats (not per directory) |
253 | 287 | readaheadBatchSizeGauge = new Gauge<Integer>() { |
|
0 commit comments