Skip to content

Commit f410aed

Browse files
thongdk8brfrn169ypeckstadt
committed
Move parallelism level for the importing process from the record level to the data chunk level (#2728)
Co-authored-by: Toshihiro Suzuki <[email protected]> Co-authored-by: Peckstadt Yves <[email protected]>
1 parent ec2235d commit f410aed

File tree

2 files changed

+477
-92
lines changed

2 files changed

+477
-92
lines changed

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java

Lines changed: 77 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,11 @@
2525
import java.util.List;
2626
import java.util.concurrent.BlockingQueue;
2727
import java.util.concurrent.CompletableFuture;
28-
import java.util.concurrent.ExecutionException;
2928
import java.util.concurrent.ExecutorService;
3029
import java.util.concurrent.Executors;
31-
import java.util.concurrent.Future;
3230
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.concurrent.Phaser;
32+
import java.util.concurrent.Semaphore;
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import lombok.RequiredArgsConstructor;
@@ -62,41 +62,77 @@ public abstract class ImportProcessor {
6262
* @param reader the {@link BufferedReader} used to read the source file
6363
*/
6464
public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) {
65-
ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor();
65+
ExecutorService dataChunkReaderExecutor = Executors.newSingleThreadExecutor();
66+
ExecutorService dataChunkProcessorExecutor =
67+
Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads());
6668
BlockingQueue<ImportDataChunk> dataChunkQueue =
6769
new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());
6870

71+
// Semaphore controls concurrent task submissions, small buffer to be two times of threads
72+
Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getMaxThreads() * 2);
73+
// Phaser tracks task completion (start with 1 for the main thread)
74+
Phaser phaser = new Phaser(1);
75+
6976
try {
7077
CompletableFuture<Void> readerFuture =
7178
CompletableFuture.runAsync(
72-
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor);
79+
() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkReaderExecutor);
7380

7481
while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
7582
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
7683
if (dataChunk != null) {
77-
processDataChunk(dataChunk, transactionBatchSize);
84+
// Acquire semaphore permit (blocks if no permits available)
85+
taskSemaphore.acquire();
86+
// Register with phaser before submitting
87+
phaser.register();
88+
89+
dataChunkProcessorExecutor.submit(
90+
() -> {
91+
try {
92+
processDataChunk(dataChunk, transactionBatchSize);
93+
} finally {
94+
// Always release semaphore and arrive at phaser
95+
taskSemaphore.release();
96+
phaser.arriveAndDeregister();
97+
}
98+
});
7899
}
79100
}
80101

81102
readerFuture.join();
103+
// Wait for all tasks to complete
104+
phaser.arriveAndAwaitAdvance();
82105
} catch (InterruptedException e) {
83106
Thread.currentThread().interrupt();
84107
throw new RuntimeException(
85108
CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e);
86109
} finally {
87-
dataChunkExecutor.shutdown();
88-
try {
89-
if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) {
90-
dataChunkExecutor.shutdownNow();
91-
}
92-
} catch (InterruptedException e) {
93-
dataChunkExecutor.shutdownNow();
94-
Thread.currentThread().interrupt();
95-
}
110+
shutdownExecutorGracefully(dataChunkReaderExecutor);
111+
shutdownExecutorGracefully(dataChunkProcessorExecutor);
96112
notifyAllDataChunksCompleted();
97113
}
98114
}
99115

116+
/**
117+
* Shuts down the given `ExecutorService` gracefully. This method attempts to cleanly shut down
118+
* the executor by first invoking `shutdown` and waiting for termination for up to 60 seconds. If
119+
* the executor does not terminate within this time, it forces a shutdown using `shutdownNow`. If
120+
* interrupted, it forces a shutdown and interrupts the current thread.
121+
*
122+
* @param es the `ExecutorService` to be shut down gracefully
123+
*/
124+
private void shutdownExecutorGracefully(ExecutorService es) {
125+
es.shutdown();
126+
try {
127+
if (!es.awaitTermination(60, TimeUnit.SECONDS)) {
128+
es.shutdownNow();
129+
}
130+
} catch (InterruptedException e) {
131+
es.shutdownNow();
132+
Thread.currentThread().interrupt();
133+
}
134+
}
135+
100136
/**
101137
* Reads and processes data in chunks from the provided reader.
102138
*
@@ -373,46 +409,26 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
373409
Instant startTime = Instant.now();
374410
List<ImportTransactionBatch> transactionBatches =
375411
splitIntoTransactionBatches(dataChunk, transactionBatchSize);
376-
ExecutorService transactionBatchExecutor =
377-
Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads());
378-
List<Future<?>> transactionBatchFutures = new ArrayList<>();
379412
AtomicInteger successCount = new AtomicInteger(0);
380413
AtomicInteger failureCount = new AtomicInteger(0);
381-
try {
382-
for (ImportTransactionBatch transactionBatch : transactionBatches) {
383-
Future<?> transactionBatchFuture =
384-
transactionBatchExecutor.submit(
385-
() -> processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch));
386-
transactionBatchFutures.add(transactionBatchFuture);
387-
}
388414

389-
waitForFuturesToComplete(transactionBatchFutures);
390-
transactionBatchFutures.forEach(
391-
batchResult -> {
392-
try {
393-
ImportTransactionBatchResult importTransactionBatchResult =
394-
(ImportTransactionBatchResult) batchResult.get();
395-
importTransactionBatchResult
396-
.getRecords()
397-
.forEach(
398-
batchRecords -> {
399-
if (batchRecords.getTargets().stream()
400-
.allMatch(
401-
targetResult ->
402-
targetResult
403-
.getStatus()
404-
.equals(ImportTargetResultStatus.SAVED))) {
405-
successCount.incrementAndGet();
406-
} else {
407-
failureCount.incrementAndGet();
408-
}
409-
});
410-
} catch (InterruptedException | ExecutionException e) {
411-
throw new RuntimeException(e);
412-
}
413-
});
414-
} finally {
415-
transactionBatchExecutor.shutdown();
415+
for (ImportTransactionBatch transactionBatch : transactionBatches) {
416+
ImportTransactionBatchResult importTransactionBatchResult =
417+
processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch);
418+
419+
importTransactionBatchResult
420+
.getRecords()
421+
.forEach(
422+
batchRecords -> {
423+
if (batchRecords.getTargets().stream()
424+
.allMatch(
425+
targetResult ->
426+
targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))) {
427+
successCount.incrementAndGet();
428+
} else {
429+
failureCount.incrementAndGet();
430+
}
431+
});
416432
}
417433
Instant endTime = Instant.now();
418434
int totalDuration = (int) Duration.between(startTime, endTime).toMillis();
@@ -440,32 +456,17 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun
440456
Instant startTime = Instant.now();
441457
AtomicInteger successCount = new AtomicInteger(0);
442458
AtomicInteger failureCount = new AtomicInteger(0);
443-
ExecutorService recordExecutor =
444-
Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads());
445-
List<Future<?>> recordFutures = new ArrayList<>();
446-
try {
447-
for (ImportRow importRow : dataChunk.getSourceData()) {
448-
Future<?> recordFuture =
449-
recordExecutor.submit(
450-
() -> processStorageRecord(dataChunk.getDataChunkId(), importRow));
451-
recordFutures.add(recordFuture);
459+
460+
for (ImportRow importRow : dataChunk.getSourceData()) {
461+
ImportTaskResult result = processStorageRecord(dataChunk.getDataChunkId(), importRow);
462+
boolean allSaved =
463+
result.getTargets().stream()
464+
.allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));
465+
if (allSaved) {
466+
successCount.incrementAndGet();
467+
} else {
468+
failureCount.incrementAndGet();
452469
}
453-
waitForFuturesToComplete(recordFutures);
454-
recordFutures.forEach(
455-
r -> {
456-
try {
457-
ImportTaskResult result = (ImportTaskResult) r.get();
458-
boolean allSaved =
459-
result.getTargets().stream()
460-
.allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED));
461-
if (allSaved) successCount.incrementAndGet();
462-
else failureCount.incrementAndGet();
463-
} catch (InterruptedException | ExecutionException e) {
464-
throw new RuntimeException(e);
465-
}
466-
});
467-
} finally {
468-
recordExecutor.shutdown();
469470
}
470471
Instant endTime = Instant.now();
471472
int totalDuration = (int) Duration.between(startTime, endTime).toMillis();
@@ -480,20 +481,4 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun
480481
.status(ImportDataChunkStatusState.COMPLETE)
481482
.build();
482483
}
483-
484-
/**
485-
* Waits for all futures in the provided list to complete. Any exceptions during execution are
486-
* logged but not propagated.
487-
*
488-
* @param futures the list of {@link Future} objects to wait for
489-
*/
490-
private void waitForFuturesToComplete(List<Future<?>> futures) {
491-
for (Future<?> future : futures) {
492-
try {
493-
future.get();
494-
} catch (Exception e) {
495-
LOGGER.error(e.getMessage());
496-
}
497-
}
498-
}
499484
}

0 commit comments

Comments
 (0)