perf: parallel partition writers via per-stream JoinSet#4193
perf: parallel partition writers via per-stream JoinSet#4193rtyler merged 8 commits intodelta-io:mainfrom
Conversation
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #4193 +/- ##
==========================================
+ Coverage 76.74% 76.81% +0.07%
==========================================
Files 166 166
Lines 47830 48209 +379
Branches 47830 48209 +379
==========================================
+ Hits 36706 37031 +325
- Misses 9287 9320 +33
- Partials 1837 1858 +21 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
84312ec to
739d8e5
Compare
ion-elgreco
left a comment
There was a problem hiding this comment.
This is great! But I have some concerns/theories, which I would like to see validated :)
- Before we have one writer, but we would push multiple partition streams to a single channel. Partition streams were not defined by partition columns so the data could come in an unsorted order if you look at it from a Delta partition columns perspective. This means we open multiple partition_writers inside Delta writer. Even though we stream to disk, we still have multiple open so in theory this could also grow in memory if the amount of partitions grows. Could you check what sorting the datafusion plan by the partition columns while still keeping the old behavior of having one writer?
- Now we have a writer task per actual Delta partition stream, but repartitioning in datafusion should also come at a cost I assume. I can believe their repartitioning is less memory intensive than what we do in our writer but I still like to see this in a benchmark. So can you rerun the benchmarks but also with a memory profile graph, and peak rss?
- Since datafusion drives the partition stream according to our delta lake partition columns, we might be able to increase performance by opening a PartitionWriter directly instead of a DeltaWriter. Can you also test this?
ion-elgreco
left a comment
There was a problem hiding this comment.
Thanks for the benchmarks! Just curious in your examples the partitions are evenly distributed? Can you have a look at larger evenly distributed and skewed distributions?
Curious what it does there because maybe for skewed source data it might make sense to have the old behavior
|
Looking at our DeltaWriter, I think we can actually just call DeltaWriter::write_partition, and you manually give the correct values here inside the writer task. I believe this will yield same performance while avoiding the divide_by_partition_values! |
I ran some additional benchmarks covering both when data is evenly distributed and when it's Results:
DataFusion's repartitioning is very efficient when the data is evenly spread. And when it's not, the memory is increased and might come from DataFusion's repartition buffers. |
After |
Hmm but what are the chances of hash collision? Hey! @alamb , in terms of repartitioning in datafusion how likely is it that a partition stream contains parts from another stream? |
The docs and code imply that this doesn't happen, and that each partition stream has exclusively data from that partition. I posted the question as well on discord for extra verification, so lets see. If that's true we can use write_partition directly |
I think there is no strong guarantee using And, If we have 8 cores on a machine, then we have In any case, it is worth checking if we require |
|
@ion-elgreco - I did some research into this and here's my understanding. Hash repartitioning gives bucket exclusivity, not partition value exclusivity. All rows for a given partition key land in the same stream, but a stream can contain multiple partition values when distinct keys > N (normal bucket collision from % N). I don't think we can peek the first row and assume the whole stream is one partition and still need divide_by_partition_values. The value here is that no two streams can write the same partition key concurrently. Let see what responses the DF discord gets too. |
| )?) | ||
| } | ||
| }; | ||
| writer.write(&part.record_batch).await?; |
There was a problem hiding this comment.
As best as I can tell this is also a potential serialization point. Is there a reason why the JoinSet approach wasn't applied here as well? 🤔
There was a problem hiding this comment.
The parallelism is set at the stream level (one task per stream via JoinSet), while within the stream, writes are ordered (sequential) to keep each partition writer's state consistent. If we wanted parallelism here, it would require a synchronization (like a lock) that would hurt the performance. And the performance gain would be minimal because after hash repartitioning, each batch typically contains just a small number of partition fragments, so the loop is very short. I can add a comment if you want!
| // Keep the previous single-writer fan-in path for unpartitioned tables. | ||
| if partition_columns.is_empty() { |
There was a problem hiding this comment.
These functions are quite lengthy, I don't see a compelling reason to keep this divergent branch around even if the section below is only going to end up with a JoinSet containing one task?
Was there severely different performance that would justify the pretty big deviation between these two code paths?
There was a problem hiding this comment.
You're right, I wanted to split the paths intentionally between partition / no partition. That said, we can refactor duplicated code into helpers!
| // Keep the previous single-writer fan-in path for unpartitioned tables. | ||
| if partition_columns.is_empty() { |
There was a problem hiding this comment.
Same concern about fairly redundant but different branches between writers with partitions and those without
| while let Some(mut normal_batch) = normal_stream.try_next().await? { | ||
| let mut idx: Option<usize> = None; | ||
| for (i_field, field) in | ||
| normal_batch.schema_ref().fields().iter().enumerate() | ||
| { | ||
| if field.name() == CDC_COLUMN_NAME { | ||
| idx = Some(i_field); | ||
| break; | ||
| } | ||
| } | ||
| normal_batch.remove_column(idx.ok_or(DeltaTableError::generic( | ||
| "idx of _change_type col not found. This shouldn't have happened.", | ||
| ))?); | ||
|
|
||
| txn.send(normal_batch).await.map_err(|_| { | ||
| DeltaTableError::Generic( | ||
| "normal writer closed unexpectedly".to_string(), | ||
| ) | ||
| })?; | ||
| } |
There was a problem hiding this comment.
I'm not a datafusion wizard. 🪄 Is there a strong reason why this couldn't/shouldn't be operations on the normal_df that get handled by Datafusion in a way that adds some potential performance or parallelism gains?
I don't have a great mental model of how the normal_df plan would be executed by execute_stream but I would assume it does something (🤞) in parallel, which would mean that there's another potential serialization point here where the stream gets collapsed to a single task only to fill up txn one by one 🤔
There was a problem hiding this comment.
I wrapped up this code (with JoinSet) to ensure that we follow the same path as before (it already existed, u).
We have two options if we want to improve this:
- Don't rely on Datafusion and use Arrow compute, replace the per-batch Datafusion with direct
arrow::compute::filter_record_batchusing a boolean mask on the _change_type column. - Rely more on DataFusion plan restructuring: instead of the upstream
UNION ALLthat merges normal + CDC rows with a_change_typetag, keep them as two separate execution plans from the start to avoid downstream splitting.
If thats the case, and num_partitions is always set by the default available cpu cores. Then you should see roughly the same performance without hash repartioning, right? Do you mind running the same benchmarks without the repartitioning step? |
02e5046 to
776f696
Compare
- Replace the single-writer (N partition streams > mpsc channel > 1 writer) with per-partition-stream concurrent writers using JoinSet - Hash repartition by partition columns ensures each stream writes to disjoint Delta partitions, avoiding duplicate small files - Unpartitioned tables coalesce to a single stream, preserving file count - Abort remaining tasks on error via JoinSet::abort_all() Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
Signed-off-by: Florian Valeye <florian.valeye@gmail.com>
776f696 to
2689355
Compare
Signed-off-by: Ethan Urbanski <ethan@urbanskitech.com>
9611956 to
9d5b2d3
Compare
ion said in slack that this was cool once the refactoring was done
To the moon!! |


Summary
Introducing parallelized partitioned writes in the DataFusion write path:
JoinSet), then merge produced add actions.Tried to keep behavior stable while improving partitioned write.
Benchmark
Local benchmark on tables to see the improvements
Partitioned writes are faster in these runs.
Unpartitioned path is functionally unchanged.
Notes
abort_all()), but files could have already written (will be vacuumed later)