2525import java .util .List ;
2626import java .util .concurrent .BlockingQueue ;
2727import java .util .concurrent .CompletableFuture ;
28- import java .util .concurrent .ExecutionException ;
2928import java .util .concurrent .ExecutorService ;
3029import java .util .concurrent .Executors ;
31- import java .util .concurrent .Future ;
3230import java .util .concurrent .LinkedBlockingQueue ;
31+ import java .util .concurrent .Phaser ;
32+ import java .util .concurrent .Semaphore ;
3333import java .util .concurrent .TimeUnit ;
3434import java .util .concurrent .atomic .AtomicInteger ;
3535import lombok .RequiredArgsConstructor ;
@@ -62,41 +62,77 @@ public abstract class ImportProcessor {
6262 * @param reader the {@link BufferedReader} used to read the source file
6363 */
6464 public void process (int dataChunkSize , int transactionBatchSize , BufferedReader reader ) {
65- ExecutorService dataChunkExecutor = Executors .newSingleThreadExecutor ();
65+ ExecutorService dataChunkReaderExecutor = Executors .newSingleThreadExecutor ();
66+ ExecutorService dataChunkProcessorExecutor =
67+ Executors .newFixedThreadPool (params .getImportOptions ().getMaxThreads ());
6668 BlockingQueue <ImportDataChunk > dataChunkQueue =
6769 new LinkedBlockingQueue <>(params .getImportOptions ().getDataChunkQueueSize ());
6870
71+ // Semaphore controls concurrent task submissions, small buffer to be two times of threads
72+ Semaphore taskSemaphore = new Semaphore (params .getImportOptions ().getMaxThreads () * 2 );
73+ // Phaser tracks task completion (start with 1 for the main thread)
74+ Phaser phaser = new Phaser (1 );
75+
6976 try {
7077 CompletableFuture <Void > readerFuture =
7178 CompletableFuture .runAsync (
72- () -> readDataChunks (reader , dataChunkSize , dataChunkQueue ), dataChunkExecutor );
79+ () -> readDataChunks (reader , dataChunkSize , dataChunkQueue ), dataChunkReaderExecutor );
7380
7481 while (!(dataChunkQueue .isEmpty () && readerFuture .isDone ())) {
7582 ImportDataChunk dataChunk = dataChunkQueue .poll (100 , TimeUnit .MILLISECONDS );
7683 if (dataChunk != null ) {
77- processDataChunk (dataChunk , transactionBatchSize );
84+ // Acquire semaphore permit (blocks if no permits available)
85+ taskSemaphore .acquire ();
86+ // Register with phaser before submitting
87+ phaser .register ();
88+
89+ dataChunkProcessorExecutor .submit (
90+ () -> {
91+ try {
92+ processDataChunk (dataChunk , transactionBatchSize );
93+ } finally {
94+ // Always release semaphore and arrive at phaser
95+ taskSemaphore .release ();
96+ phaser .arriveAndDeregister ();
97+ }
98+ });
7899 }
79100 }
80101
81102 readerFuture .join ();
103+ // Wait for all tasks to complete
104+ phaser .arriveAndAwaitAdvance ();
82105 } catch (InterruptedException e ) {
83106 Thread .currentThread ().interrupt ();
84107 throw new RuntimeException (
85108 CoreError .DATA_LOADER_DATA_CHUNK_PROCESS_FAILED .buildMessage (e .getMessage ()), e );
86109 } finally {
87- dataChunkExecutor .shutdown ();
88- try {
89- if (!dataChunkExecutor .awaitTermination (60 , TimeUnit .SECONDS )) {
90- dataChunkExecutor .shutdownNow ();
91- }
92- } catch (InterruptedException e ) {
93- dataChunkExecutor .shutdownNow ();
94- Thread .currentThread ().interrupt ();
95- }
110+ shutdownExecutorGracefully (dataChunkReaderExecutor );
111+ shutdownExecutorGracefully (dataChunkProcessorExecutor );
96112 notifyAllDataChunksCompleted ();
97113 }
98114 }
99115
116+ /**
117+ * Shuts down the given `ExecutorService` gracefully. This method attempts to cleanly shut down
118+ * the executor by first invoking `shutdown` and waiting for termination for up to 60 seconds. If
119+ * the executor does not terminate within this time, it forces a shutdown using `shutdownNow`. If
120+ * interrupted, it forces a shutdown and interrupts the current thread.
121+ *
122+ * @param es the `ExecutorService` to be shut down gracefully
123+ */
124+ private void shutdownExecutorGracefully (ExecutorService es ) {
125+ es .shutdown ();
126+ try {
127+ if (!es .awaitTermination (60 , TimeUnit .SECONDS )) {
128+ es .shutdownNow ();
129+ }
130+ } catch (InterruptedException e ) {
131+ es .shutdownNow ();
132+ Thread .currentThread ().interrupt ();
133+ }
134+ }
135+
100136 /**
101137 * Reads and processes data in chunks from the provided reader.
102138 *
@@ -373,46 +409,26 @@ private ImportDataChunkStatus processDataChunkWithTransactions(
373409 Instant startTime = Instant .now ();
374410 List <ImportTransactionBatch > transactionBatches =
375411 splitIntoTransactionBatches (dataChunk , transactionBatchSize );
376- ExecutorService transactionBatchExecutor =
377- Executors .newFixedThreadPool (params .getImportOptions ().getMaxThreads ());
378- List <Future <?>> transactionBatchFutures = new ArrayList <>();
379412 AtomicInteger successCount = new AtomicInteger (0 );
380413 AtomicInteger failureCount = new AtomicInteger (0 );
381- try {
382- for (ImportTransactionBatch transactionBatch : transactionBatches ) {
383- Future <?> transactionBatchFuture =
384- transactionBatchExecutor .submit (
385- () -> processTransactionBatch (dataChunk .getDataChunkId (), transactionBatch ));
386- transactionBatchFutures .add (transactionBatchFuture );
387- }
388414
389- waitForFuturesToComplete (transactionBatchFutures );
390- transactionBatchFutures .forEach (
391- batchResult -> {
392- try {
393- ImportTransactionBatchResult importTransactionBatchResult =
394- (ImportTransactionBatchResult ) batchResult .get ();
395- importTransactionBatchResult
396- .getRecords ()
397- .forEach (
398- batchRecords -> {
399- if (batchRecords .getTargets ().stream ()
400- .allMatch (
401- targetResult ->
402- targetResult
403- .getStatus ()
404- .equals (ImportTargetResultStatus .SAVED ))) {
405- successCount .incrementAndGet ();
406- } else {
407- failureCount .incrementAndGet ();
408- }
409- });
410- } catch (InterruptedException | ExecutionException e ) {
411- throw new RuntimeException (e );
412- }
413- });
414- } finally {
415- transactionBatchExecutor .shutdown ();
415+ for (ImportTransactionBatch transactionBatch : transactionBatches ) {
416+ ImportTransactionBatchResult importTransactionBatchResult =
417+ processTransactionBatch (dataChunk .getDataChunkId (), transactionBatch );
418+
419+ importTransactionBatchResult
420+ .getRecords ()
421+ .forEach (
422+ batchRecords -> {
423+ if (batchRecords .getTargets ().stream ()
424+ .allMatch (
425+ targetResult ->
426+ targetResult .getStatus ().equals (ImportTargetResultStatus .SAVED ))) {
427+ successCount .incrementAndGet ();
428+ } else {
429+ failureCount .incrementAndGet ();
430+ }
431+ });
416432 }
417433 Instant endTime = Instant .now ();
418434 int totalDuration = (int ) Duration .between (startTime , endTime ).toMillis ();
@@ -440,32 +456,17 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun
440456 Instant startTime = Instant .now ();
441457 AtomicInteger successCount = new AtomicInteger (0 );
442458 AtomicInteger failureCount = new AtomicInteger (0 );
443- ExecutorService recordExecutor =
444- Executors .newFixedThreadPool (params .getImportOptions ().getMaxThreads ());
445- List <Future <?>> recordFutures = new ArrayList <>();
446- try {
447- for (ImportRow importRow : dataChunk .getSourceData ()) {
448- Future <?> recordFuture =
449- recordExecutor .submit (
450- () -> processStorageRecord (dataChunk .getDataChunkId (), importRow ));
451- recordFutures .add (recordFuture );
459+
460+ for (ImportRow importRow : dataChunk .getSourceData ()) {
461+ ImportTaskResult result = processStorageRecord (dataChunk .getDataChunkId (), importRow );
462+ boolean allSaved =
463+ result .getTargets ().stream ()
464+ .allMatch (t -> t .getStatus ().equals (ImportTargetResultStatus .SAVED ));
465+ if (allSaved ) {
466+ successCount .incrementAndGet ();
467+ } else {
468+ failureCount .incrementAndGet ();
452469 }
453- waitForFuturesToComplete (recordFutures );
454- recordFutures .forEach (
455- r -> {
456- try {
457- ImportTaskResult result = (ImportTaskResult ) r .get ();
458- boolean allSaved =
459- result .getTargets ().stream ()
460- .allMatch (t -> t .getStatus ().equals (ImportTargetResultStatus .SAVED ));
461- if (allSaved ) successCount .incrementAndGet ();
462- else failureCount .incrementAndGet ();
463- } catch (InterruptedException | ExecutionException e ) {
464- throw new RuntimeException (e );
465- }
466- });
467- } finally {
468- recordExecutor .shutdown ();
469470 }
470471 Instant endTime = Instant .now ();
471472 int totalDuration = (int ) Duration .between (startTime , endTime ).toMillis ();
@@ -480,20 +481,4 @@ private ImportDataChunkStatus processDataChunkWithoutTransactions(ImportDataChun
480481 .status (ImportDataChunkStatusState .COMPLETE )
481482 .build ();
482483 }
483-
484- /**
485- * Waits for all futures in the provided list to complete. Any exceptions during execution are
486- * logged but not propagated.
487- *
488- * @param futures the list of {@link Future} objects to wait for
489- */
490- private void waitForFuturesToComplete (List <Future <?>> futures ) {
491- for (Future <?> future : futures ) {
492- try {
493- future .get ();
494- } catch (Exception e ) {
495- LOGGER .error (e .getMessage ());
496- }
497- }
498- }
499484}
0 commit comments