feat: Parallelize DataAssetsWorkflow with virtual threads (#25808)#25817
feat: Parallelize DataAssetsWorkflow with virtual threads (#25808)#25817
Conversation
f491a98 to
5778830
Compare
There was a problem hiding this comment.
Pull request overview
This PR parallelizes the DataAssetsWorkflow in the Data Insights pipeline using Java 21 virtual threads to improve performance. The workflow processes 8,292 entities with a ~2.6x speedup (from ~94 seconds to ~36 seconds) by converting sequential entity enrichment into concurrent processing with semaphore-based concurrency control.
Changes:
- Introduced parallel entity processing using
Executors.newVirtualThreadPerTaskExecutor()with a concurrency budget calculated asMath.max(4, Math.min(cores * 2, poolSize / 2))to balance CPU parallelism with database connection pool capacity - Added
enrichSingle()method toDataInsightsEntityEnricherProcessorfor independent single-entity enrichment in parallel contexts - Made
updateStats()methods synchronized across sink and processor classes to ensure thread-safe statistics accumulation during concurrent processing - Implemented graceful shutdown support with
stop()methods that propagate stop signals to active workflows and shut down executors
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| DataAssetsWorkflow.java | Core parallelization logic with virtual thread executor, semaphore-based concurrency control, ConcurrentLinkedQueue for bulk operations, and graceful shutdown support |
| DataInsightsEntityEnricherProcessor.java | New enrichSingle() method for per-entity enrichment without batch error wrapping, and synchronized updateStats() for thread safety |
| DataInsightsElasticSearchProcessor.java | Synchronized updateStats() method to prevent race conditions in concurrent stats updates |
| DataInsightsOpenSearchProcessor.java | Synchronized updateStats() method to prevent race conditions in concurrent stats updates |
| ElasticSearchIndexSink.java | Synchronized updateStats() method to prevent race conditions in concurrent stats updates |
| OpenSearchIndexSink.java | Synchronized updateStats() method to prevent race conditions in concurrent stats updates |
| DataInsightsApp.java | Override stop() method to propagate shutdown signals to active DataAssetsWorkflow instance |
...ps/bundles/insights/workflows/dataAssets/processors/DataInsightsEntityEnricherProcessor.java
Outdated
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Outdated
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Outdated
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Outdated
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Outdated
Show resolved
Hide resolved
.../org/openmetadata/service/apps/bundles/insights/workflows/dataAssets/DataAssetsWorkflow.java
Show resolved
Hide resolved
5778830 to
bd87379
Compare
|
Hi there 👋 Thanks for your contribution! The OpenMetadata team will review the PR shortly! Once it has been labeled as Let us know if you need any help! |
|
@manerow are you looking at the recent chagnes to search idnexing, it uses quartz to distribute across OM servers too. That will give you more leverage in truly making distributed indexing. |
Co-authored-by: manerow <manerow@users.noreply.github.com>
cf80bff to
76c73df
Compare
Code Review ✅ Approved 2 resolved / 2 findingsWell-structured parallelization using virtual threads with proper concurrency control. Both previous findings addressed: executor race condition fixed in finally block, Future.get() timeout intentionally omitted per author's design rationale (stop/shutdownNow provides recovery). No new issues found. ✅ 2 resolved✅ Bug: Executor null-ed before try-with-resources close
OptionsAuto-apply is off → Gitar will not commit updates to this branch. Comment with these commands to change:
Was this helpful? React with 👍 / 👎 | Gitar |
|
@harshach Thanks for the pointers. Distributed indexing with Quartz: I've looked at the Virtual threads with a semaphore parallelize that enrichment within a single server for ~100 lines of code and no new config. This isn't a replacement for distributed processing, it's the first step. Distribution would decide which entities each server handles; virtual threads speed up the work within each node. The two layers are complementary, and adapting the Quartz coordination to split entity types across OM instances would be a natural follow-up. This PR is the first step, distributed coordination would be the next. Incremental indexing: Agreed, already tracked in #25809. The plan is to filter entities by |
|
|
@manerow if you are planning on doing distributed job in another PR, that works for me. Even with virutal threads if its doing long enough of days of loopback then we will lock those tables for a while. here its better distribute based on no.of days that user want tor eindex from |
|
@harshach Sounds good. I'll create a task for the distributed approach with date-range partitioning for backfills and tackle it in a separate PR. |



Fixes #25808
This PR parallelizes the
DataAssetsWorkflowin the Data Insights pipeline using Java 21 virtual threads, reducing wall-clock time by ~2.6x on a dataset of 8,292 entities.I worked on improving the performance of the Data Insights pipeline because the
DataAssetsWorkflowwas executing sequentially and spending significant time in blocking database calls during entity enrichment. Since enrichment is heavily I/O-bound (multiple DB round-trips per entity), virtual threads allow efficient concurrency without exhausting platform threads or the DB connection pool.What changed
DataAssetsWorkflownow processes entities concurrently using a virtual-thread-per-task executor with a semaphore-based concurrency budget:cores × 2poolSize / 2This scales with machine capacity while keeping half of the DB pool free for REST/API traffic and other jobs.
Added
enrichSingle()toDataInsightsEntityEnricherProcessorso individual entities can be enriched independently on virtual threads.Enriched results are collected in a
ConcurrentLinkedQueueand bulk-flushed to the search index after each batch.Made
updateStats()methods synchronized across:DataInsightsElasticSearchProcessorDataInsightsOpenSearchProcessorDataInsightsEntityEnricherProcessorElasticSearchIndexSinkOpenSearchIndexSinkto ensure thread-safe stat accumulation.
Added graceful stop support:
DataInsightsApp.stop()now propagates to the activeDataAssetsWorkflow, which shuts down its executor and sets a stopped flag.Why virtual threads instead of reusing
SearchIndexApp’s producer-consumer modelSearchIndexExecutoris optimized for:Its bottleneck is Elasticsearch I/O, and it uses platform thread pools, blocking queues, adaptive batching, and async bulk sinks.
DataAssetsWorkflowdiffers:I/O-bound enrichment per entity
Each entity performs 3–5+ blocking DB calls (version history + owner/team resolution).
1:N data amplification
One entity can produce 30+ daily snapshot documents, making fixed queue sizing awkward.
4-stage pipeline
Less complexity
Virtual threads + semaphore add ~100 LOC with no queue tuning, no adaptive batching, and no new configuration surface.
Concurrency Budget Design (Brief Rationale)
The budget is intentionally based on CPU cores, not just DB pool size.
Formula:
Why
cores × 2is the primary driver:cores × 2does not increase real throughput.Why
poolSize / 2is a cap, not the signal:JDBI
onDemandacquires/releases connections per call.Connections are typically held for only 1–5ms.
Pool exhaustion is not the limiting factor in practice.
poolSize / 2acts as a safety belt, reserving capacity for:Example budgets:
Benchmark confirmation:
Equivalent performance confirms that carrier thread pinning (CPU-bound parallelism), not pool size, is the true concurrency limit.
Performance Results
Dataset: 8,292 entities (
load-test-data.sh --quick)Environment: Clean Docker, identical dataset and config.
Both runs produced identical results with zero failures.
How did you test your changes?
Full A/B test in a clean Docker environment.
Ran both
mainand feature branch on the same dataset (8,292 entities).Triggered Data Insights pipeline.
Compared:
Verified identical indexed document counts and zero failures.
Verified that the updated concurrency budget (16 threads, down from 75) produces identical results and equivalent performance, confirming that carrier thread pinning on MySQL was the actual concurrency limit.
Type of change:
Checklist:
Fixes #25808: Parallelize DataAssetsWorkflow using Java 21 virtual threads