Skip to content

Conversation

@thongdk8
Copy link
Contributor

@thongdk8 thongdk8 commented Jun 3, 2025

Description

The current import process parallelizes at the record level, which creates inefficiencies in thread pool utilization. The main thread must wait for all record processing threads to complete before moving to the next data chunk, causing the thread pool to be underutilized when waiting for the slowest batch to finish. Also the current implementation is sensitive to the params of tx_size, chunk size and number of threads, for example, if tx_size=10, chunk_size=30, and threads=4 then we dont utilized all the threads as the chunk split into 3 tasks and run it in the pool, it also happend if the the chunk_size is not diviable by tx_size * threads

This PR moves parallelism from the record level to the data chunk level, allowing continuous processing of data chunks through the same thread pool using a blocking queue. Improved overall import performance. PTAL. Thank you.

Before:

  • Data chunks are split into multiple record batches
  • Thread pool processes record batches in parallel
  • Main thread blocks until all record batches complete
  • Thread pool sits idle while waiting for the slowest batch

After:

  • Thread pool processes entire data chunks in parallel
  • Continuous chunk processing via blocking queue
  • Improved thread pool utilization
  • Eliminated synchronization bottlenecks between chunks
  • Reduced sensitivity to parameter constraints

Related issues and/or PRs

NA

Changes made

  • Move parallelism level for the importing process from the record level to the data chunk level

Checklist

The following is a best-effort checklist. If any items in this checklist are not applicable to this PR or are dependent on other, unmerged PRs, please still mark the checkboxes after you have read and understood each item.

  • I have commented my code, particularly in hard-to-understand areas.
  • I have updated the documentation to reflect the changes.
  • I have considered whether similar issues could occur in other products, components, or modules if this PR is for bug fixes.
  • Any remaining open issues linked to this PR are documented and up-to-date (Jira, GitHub, etc.).
  • Tests (unit, integration, etc.) have been added for the changes.
  • My changes generate no new warnings.
  • Any dependent changes in other PRs have been merged and published.

Additional notes (optional)

NA

Release notes

NA

@thongdk8 thongdk8 requested a review from Copilot June 3, 2025 08:29
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the import process to parallelize at the data-chunk level rather than the record level, improving thread-pool utilization and reducing synchronization bottlenecks.

  • Introduces separate executors for reading and processing chunks, coordinated via a blocking queue.
  • Uses a Semaphore and Phaser to limit and track concurrent task submissions.
  • Removes per-record and per-transaction thread pools, processing those sequentially within each chunk.

Reviewed Changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated no comments.

File Description
data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java Split reader/processor executors; added Semaphore/Phaser; refactored chunk submission and shutdown logic.
data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java Expanded tests for chunk-level parallelism; introduced TestImportProcessor for concurrency tracking.
Comments suppressed due to low confidence (3)

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java:102

  • A CompletionException from readerFuture.join() isn’t caught, so an exceptional reader task can bypass the phaser.arriveAndAwaitAdvance() and leave the phaser in an inconsistent state. Wrap join() in a try-catch for CompletionException (or catch RuntimeException) to ensure the main phaser party always deregisters.
readerFuture.join();

data-loader/core/src/test/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessorTest.java:67

  • The TableColumnDataTypes type is not imported, causing a compilation error. Add the corresponding import for this mock type.
@Mock private TableColumnDataTypes tableColumnDataTypes;

data-loader/core/src/main/java/com/scalar/db/dataloader/core/dataimport/processor/ImportProcessor.java:116

  • [nitpick] In the Javadoc, backticks are used around ExecutorService; for consistency with project style, use <code>ExecutorService</code> tags instead of Markdown-style backticks.
/**
 * Shuts down the given `ExecutorService` gracefully. This method attempts to cleanly shut down

@thongdk8 thongdk8 self-assigned this Jun 3, 2025
@thongdk8 thongdk8 marked this pull request as ready for review June 3, 2025 08:32
Copy link
Contributor

@komamitsu komamitsu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! 👍

@thongdk8 BTW, I guess there is a slight concern with the changes of the PR in the following corner case:

  • There are 100 data chunks
  • Only the first data chunk contains huge import rows (e.g., 1000 times larger than ones of other chunks). Yes, it's a data skew
  • The thread for the first chunk is sequentially handling rows for a while, although other threads quickly completed other chunks and are waiting for the first thread

In this case, the original implementation might work better (thanks to the record level parallelism). But, it should be a rare case and we can ignore such concern, right?

() -> readDataChunks(reader, dataChunkSize, dataChunkQueue), dataChunkReaderExecutor);

while (!(dataChunkQueue.isEmpty() && readerFuture.isDone())) {
ImportDataChunk dataChunk = dataChunkQueue.poll(100, TimeUnit.MILLISECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not related to this PR, but why we need to wake up every 100 ms? I just think longer timeout or using take() might be better.

Copy link
Contributor Author

@thongdk8 thongdk8 Jun 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The queue is buffered, so the timeout of 100ms would be fine I think.
We can replace it with take, but it seems a bit unsafe since the take will block the loop (deadlock) and you would never get a chance to recheck the while conditions unless a new item arrives. For example, if the queue is empty but the reader is on going for exiting its job (the reader future is going to be done), although this case is hard to be happenned, but still has a posibility. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're correct. I didn't consider the exit condition. Let's keep it as is.

@thongdk8
Copy link
Contributor Author

thongdk8 commented Jun 4, 2025

@komamitsu Thank you for reviewing. Regarding the case you mentioned, I agree that in the mentioned case the behavior will be like you mentioned, but it is a rare case yes, and it happens only when the chunk size is large and close to the number of total records, and for the last batch of chunks processed by the pool. So I think we can ignore such concern as you said.

new LinkedBlockingQueue<>(params.getImportOptions().getDataChunkQueueSize());

// Semaphore controls concurrent task submissions, small buffer to be two times of threads
Semaphore taskSemaphore = new Semaphore(params.getImportOptions().getMaxThreads() * 2);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using a Semaphore, I think we can use an ExecutorService with a bounded blocking queue, as shown below:

    ExecutorService dataChunkProcessorExecutor =
        new ThreadPoolExecutor(
            params.getImportOptions().getMaxThreads(),
            params.getImportOptions().getMaxThreads(),
            0L,
            TimeUnit.MILLISECONDS,
            new ArrayBlockingQueue<>(params.getImportOptions().getMaxThreads() * 2),
            Executors.defaultThreadFactory(),
            new ThreadPoolExecutor.CallerRunsPolicy());

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, we can use bounded blocking queue like you mentioned, but as it is using CallerRunsPolicy so when the thread pool is completely saturated (all threads busy + queue full), CallerRunsPolicy runs the task in the calling thread (main thread in our case) instead of the pool threads, that is not what we expect as all the processing tasks are executed in the pool is more natural I think.
There are other policies as well, but seems there is no suitable one unless we create a custom one, so I think using semaphore with a fixed pool here is ok. WDYT?

…ataimport/processor/ImportProcessor.java

Co-authored-by: Toshihiro Suzuki <[email protected]>
@thongdk8 thongdk8 requested a review from brfrn169 June 5, 2025 04:03
Copy link
Collaborator

@brfrn169 brfrn169 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!

Copy link
Contributor

@ypeckstadt ypeckstadt left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thank you.

Copy link
Contributor

@inv-jishnu inv-jishnu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
Thank you.

Copy link
Contributor

@feeblefakie feeblefakie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Thank you!

Copy link
Contributor

@Torch3333 Torch3333 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thank you!

@ypeckstadt ypeckstadt merged commit 10cc86f into master Jun 9, 2025
55 checks passed
@ypeckstadt ypeckstadt deleted the data-loader/ref/move-parallelism-to-datachunk-level-in-import-processor branch June 9, 2025 04:49
feeblefakie pushed a commit that referenced this pull request Jun 9, 2025
…l to the data chunk level (#2728)

Co-authored-by: Toshihiro Suzuki <[email protected]>
Co-authored-by: Peckstadt Yves <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants