-
Notifications
You must be signed in to change notification settings - Fork 40
Move parallelism level for the importing process from the record level to the data chunk level #2728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Move parallelism level for the importing process from the record level to the data chunk level #2728
Changes from 2 commits
a27b52f
ad37da6
11ab70b
5b8e63a
eb33df4
59040c7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,11 +25,11 @@ | |
| import java.util.List; | ||
| import java.util.concurrent.BlockingQueue; | ||
| import java.util.concurrent.CompletableFuture; | ||
| import java.util.concurrent.ExecutionException; | ||
| import java.util.concurrent.ExecutorService; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.Future; | ||
| import java.util.concurrent.LinkedBlockingQueue; | ||
| import java.util.concurrent.Phaser; | ||
| import java.util.concurrent.Semaphore; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import lombok.RequiredArgsConstructor; | ||
|
|
@@ -62,41 +62,77 @@ public abstract class ImportProcessor { | |
| * @param reader the {@link BufferedReader} used to read the source file | ||
| */ | ||
| public void process(int dataChunkSize, int transactionBatchSize, BufferedReader reader) { | ||
| ExecutorService dataChunkExecutor = Executors.newSingleThreadExecutor(); | ||
| ExecutorService dataChunkReaderExecutor = Executors.newSingleThreadExecutor(); | ||
| ExecutorService dataChunkProcessorExecutor = | ||
| Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); | ||
| BlockingQueue<ImportDataChunk> dataChunkQueue = | ||
| new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize()); | ||
|
|
||
| // Semaphore controls concurrent task submissions, small buffer to be two times of threads | ||
| Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getMaxThreads() * 2); | ||
| // Phaser tracks task completion (start with 1 for the main thread) | ||
| Phaser phaser = new Phaser(1); | ||
|
|
||
| try { | ||
| CompletableFuture<Void> readerFuture = | ||
| CompletableFuture.runAsync( | ||
| () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkExecutor); | ||
| () -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkReaderExecutor); | ||
|
|
||
| while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) { | ||
| ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is not related to this PR, but why we need to wake up every 100 ms? I just think longer timeout or using
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The queue is buffered, so the timeout of 100ms would be fine I think.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, you're correct. I didn't consider the exit condition. Let's keep it as is. |
||
| if (dataChunk != null) { | ||
| processDataChunk(dataChunk, transactionBatchSize); | ||
| // Acquire semaphore permit (blocks if no permits available) | ||
| taskSemaphore.acquire(); | ||
| // Register with phaser before submitting | ||
| phaser.register(); | ||
|
|
||
| dataChunkProcessorExecutor.submit( | ||
| () -> { | ||
| try { | ||
| processDataChunk(dataChunk, transactionBatchSize); | ||
| } finally { | ||
| // Always release semaphore and arrive at phaser | ||
| taskSemaphore.release(); | ||
| phaser.arriveAndDeregister(); | ||
| } | ||
| }); | ||
| } | ||
| } | ||
|
|
||
| readerFuture.join(); | ||
| // Wait for all tasks to complete | ||
| phaser.arriveAndAwaitAdvance(); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new RuntimeException( | ||
| CoreError.DATA_LOADER_DATA_CHUNK_PROCESS_FAILED.buildMessage(e.getMessage()), e); | ||
| } finally { | ||
| dataChunkExecutor.shutdown(); | ||
| try { | ||
| if (!dataChunkExecutor.awaitTermination(60, TimeUnit.SECONDS)) { | ||
| dataChunkExecutor.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| dataChunkExecutor.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| shutdownExecutorGracefully(dataChunkReaderExecutor); | ||
| shutdownExecutorGracefully(dataChunkProcessorExecutor); | ||
| notifyAllDataChunksCompleted(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Shuts down the given `ExecutorService` gracefully. This method attempts to cleanly shut down | ||
| * the executor by first invoking `shutdown` and waiting for termination for up to 60 seconds. If | ||
| * the executor does not terminate within this time, it forces a shutdown using `shutdownNow`. If | ||
| * interrupted, it forces a shutdown and interrupts the current thread. | ||
| * | ||
| * @param es the `ExecutorService` to be shut down gracefully | ||
| */ | ||
| private void shutdownExecutorGracefully(ExecutorService es) { | ||
| es.shutdown(); | ||
| try { | ||
| if (!es.awaitTermination(60, TimeUnit.SECONDS)) { | ||
| es.shutdownNow(); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| es.shutdownNow(); | ||
| Thread.currentThread().interrupt(); | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Reads and processes data in chunks from the provided reader. | ||
| * | ||
|
|
@@ -373,46 +409,26 @@ private ImportDataChunkStatus processDataChunkWithTransactions( | |
| Instant startTime = Instant.now(); | ||
| List<ImportTransactionBatch> transactionBatches = | ||
| splitIntoTransactionBatches(dataChunk, transactionBatchSize); | ||
| ExecutorService transactionBatchExecutor = | ||
| Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); | ||
| List<Future<?>> transactionBatchFutures = new ArrayList<>(); | ||
| AtomicInteger successCount = new AtomicInteger(0); | ||
| AtomicInteger failureCount = new AtomicInteger(0); | ||
| try { | ||
| for (ImportTransactionBatch transactionBatch : transactionBatches) { | ||
| Future<?> transactionBatchFuture = | ||
| transactionBatchExecutor.submit( | ||
| () -> processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch)); | ||
| transactionBatchFutures.add(transactionBatchFuture); | ||
| } | ||
|
|
||
| waitForFuturesToComplete(transactionBatchFutures); | ||
| transactionBatchFutures.forEach( | ||
| batchResult -> { | ||
| try { | ||
| ImportTransactionBatchResult importTransactionBatchResult = | ||
| (ImportTransactionBatchResult) batchResult.get(); | ||
| importTransactionBatchResult | ||
| .getRecords() | ||
| .forEach( | ||
| batchRecords -> { | ||
| if (batchRecords.getTargets().stream() | ||
| .allMatch( | ||
| targetResult -> | ||
| targetResult | ||
| .getStatus() | ||
| .equals(ImportTargetResultStatus.SAVED))) { | ||
| successCount.incrementAndGet(); | ||
| } else { | ||
| failureCount.incrementAndGet(); | ||
| } | ||
| }); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| } finally { | ||
| transactionBatchExecutor.shutdown(); | ||
| for (ImportTransactionBatch transactionBatch : transactionBatches) { | ||
| ImportTransactionBatchResult importTransactionBatchResult = | ||
| processTransactionBatch(dataChunk.getDataChunkId(), transactionBatch); | ||
|
|
||
| importTransactionBatchResult | ||
| .getRecords() | ||
| .forEach( | ||
| batchRecords -> { | ||
| if (batchRecords.getTargets().stream() | ||
| .allMatch( | ||
| targetResult -> | ||
| targetResult.getStatus().equals(ImportTargetResultStatus.SAVED))) { | ||
| successCount.incrementAndGet(); | ||
| } else { | ||
| failureCount.incrementAndGet(); | ||
| } | ||
| }); | ||
| } | ||
| Instant endTime = Instant.now(); | ||
| int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); | ||
|
|
@@ -440,32 +456,14 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun | |
| Instant startTime = Instant.now(); | ||
| AtomicInteger successCount = new AtomicInteger(0); | ||
| AtomicInteger failureCount = new AtomicInteger(0); | ||
| ExecutorService recordExecutor = | ||
| Executors.newFixedThreadPool(params.getImportOptions().getMaxThreads()); | ||
| List<Future<?>> recordFutures = new ArrayList<>(); | ||
| try { | ||
| for (ImportRow importRow : dataChunk.getSourceData()) { | ||
| Future<?> recordFuture = | ||
| recordExecutor.submit( | ||
| () -> processStorageRecord(dataChunk.getDataChunkId(), importRow)); | ||
| recordFutures.add(recordFuture); | ||
| } | ||
| waitForFuturesToComplete(recordFutures); | ||
| recordFutures.forEach( | ||
| r -> { | ||
| try { | ||
| ImportTaskResult result = (ImportTaskResult) r.get(); | ||
| boolean allSaved = | ||
| result.getTargets().stream() | ||
| .allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED)); | ||
| if (allSaved) successCount.incrementAndGet(); | ||
| else failureCount.incrementAndGet(); | ||
| } catch (InterruptedException | ExecutionException e) { | ||
| throw new RuntimeException(e); | ||
| } | ||
| }); | ||
| } finally { | ||
| recordExecutor.shutdown(); | ||
|
|
||
| for (ImportRow importRow : dataChunk.getSourceData()) { | ||
| ImportTaskResult result = processStorageRecord(dataChunk.getDataChunkId(), importRow); | ||
| boolean allSaved = | ||
| result.getTargets().stream() | ||
| .allMatch(t -> t.getStatus().equals(ImportTargetResultStatus.SAVED)); | ||
| if (allSaved) successCount.incrementAndGet(); | ||
| else failureCount.incrementAndGet(); | ||
thongdk8 marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| Instant endTime = Instant.now(); | ||
| int totalDuration = (int) Duration.between(startTime, endTime).toMillis(); | ||
|
|
@@ -480,20 +478,4 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun | |
| .status(ImportDataChunkStatusState.COMPLETE) | ||
| .build(); | ||
| } | ||
|
|
||
| /** | ||
| * Waits for all futures in the provided list to complete. Any exceptions during execution are | ||
| * logged but not propagated. | ||
| * | ||
| * @param futures the list of {@link Future} objects to wait for | ||
| */ | ||
| private void waitForFuturesToComplete(List<Future<?>> futures) { | ||
| for (Future<?> future : futures) { | ||
| try { | ||
| future.get(); | ||
| } catch (Exception e) { | ||
| LOGGER.error(e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of using a
Semaphore, I think we can use anExecutorServicewith a bounded blocking queue, as shown below:What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yes, we can use bounded blocking queue like you mentioned, but as it is using
CallerRunsPolicyso when the thread pool is completely saturated (all threads busy + queue full),CallerRunsPolicyruns the task in thecalling thread(main thread in our case) instead of the pool threads, that is not what we expect as all the processing tasks are executed in the pool is more natural I think.There are other policies as well, but seems there is no suitable one unless we create a custom one, so I think using semaphore with a fixed pool here is ok. WDYT?