Skip to content

Conversation

abhiaagarwal
Copy link
Contributor

@abhiaagarwal abhiaagarwal commented Oct 16, 2025

Description

This was based on me taking a closer look at the PartitionWriter code, and realizing it could be made a bit more efficient by deferring the parquet writing to a background task and removing the async locking associated with AsyncShareableBuffer.

Instead, it switches the underlying writer to use a ParquetObjectWriter, which internally uses a BufWriter, which keeps a certain capacity of bytes in-memory until reaching a threshold, where it starts a multipart write. Additionally, once a writer has enough bytes in-progress/about to be flushed, we finish its writing on a background tokio task instead of yielding while putting new writes on the new file.

Related Issue(s)

Probably closes #3578, as it still uses a small buffer internally through ParquetObjectWriter but adaptively flushes via a multipart upload using BufWriter for streaming writes. I'm also hoping this solves the issue I faced in #3855, though I'll aim to test this in production.

Documentation

ParquetObjectWriter
BufWriter

@github-actions github-actions bot added the binding/rust Issues for the Rust crate label Oct 16, 2025
@abhiaagarwal abhiaagarwal changed the title In-flight PartitionWriter feat: in-flight PartitionWriter Oct 16, 2025
@abhiaagarwal abhiaagarwal marked this pull request as draft October 16, 2025 01:48
Copy link

ACTION NEEDED

delta-rs follows the Conventional Commits specification for release automation.

The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification.

Copy link

codecov bot commented Oct 16, 2025

Codecov Report

❌ Patch coverage is 81.37255% with 19 lines in your changes missing coverage. Please review.
✅ Project coverage is 73.78%. Comparing base (e6d9a2a) to head (464e2ba).
⚠️ Report is 1 commits behind head on main.

Files with missing lines Patch % Lines
crates/core/src/operations/write/writer.rs 81.18% 10 Missing and 9 partials ⚠️
Additional details and impacted files
@@           Coverage Diff           @@
##             main    #3857   +/-   ##
=======================================
  Coverage   73.78%   73.78%           
=======================================
  Files         152      151    -1     
  Lines       39138    39139    +1     
  Branches    39138    39139    +1     
=======================================
+ Hits        28877    28880    +3     
+ Misses       8987     8981    -6     
- Partials     1274     1278    +4     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@rtyler rtyler self-assigned this Oct 16, 2025
Comment on lines 88 to 96
let upload_future = multi_part_upload.put_part(PutPayload::from(part));

// wait until one spot frees up before spawning new task
if tasks.len() >= max_concurrent_tasks {
tasks.join_next().await;
}
tasks.spawn(upload_future);
offset = end;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have talked about similar approaches with @ion-elgreco in the past. My belief based on what I have seen is that parallelizing I/O operations in this way doesn't buy too much since the upload of one part/file typically is going to go at line speed, splitting into parts will be line speed / num parts

My hunch is that if we're able to parallelize the parquet encoding then we'd see some good improvements.

Do you have some measurements to share to help inform the discussion?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this code was copied from the original implementation, I don't have any opinion on whether this works or not. I know ParquetObjectWriter does something similar internally.

The more benefit of this PR is that rather than yielding on writing the new parquet file to the store before starting to write the second one, we can kick off the write job for the first file in the background and immediately start writing to the second one.

As for benchmarks, I'm writing a python-based one. But I might just give this PR a whirl in my dev environment anyways for my own curiosity with AWS :)

@abhiaagarwal abhiaagarwal force-pushed the abhi/async-write-buffer branch 2 times, most recently from 2ec9c98 to c771b97 Compare October 18, 2025 15:32
@github-actions github-actions bot added the binding/python Issues for the Python package label Oct 18, 2025
@abhiaagarwal
Copy link
Contributor Author

------------------------------------------------------------------------------------------------- benchmark 'optimize': 8 tests --------------------------------------------------------------------------------------------------
Name (time in ms)                                          Min                   Max                  Mean              StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_benchmark_optimize[5] (0001_writer-)             697.8591 (1.0)        783.6447 (1.0)        730.9472 (1.0)       33.9840 (1.39)       715.5890 (1.0)       45.2321 (2.69)          1;0  1.3681 (1.0)           5           1
test_benchmark_optimize[5] (0002_main)                733.2482 (1.05)       826.2599 (1.05)       766.5344 (1.05)      41.3734 (1.69)       739.8363 (1.03)      63.7684 (3.79)          1;0  1.3046 (0.95)          5           1
test_benchmark_optimize[1] (0001_writer-)           2,296.7281 (3.29)     2,818.7333 (3.60)     2,646.8827 (3.62)     202.7652 (8.27)     2,713.1137 (3.79)     173.5503 (10.31)         1;1  0.3778 (0.28)          5           1
test_benchmark_optimize[1] (0002_main)              2,379.6139 (3.41)     2,511.3386 (3.20)     2,425.0909 (3.32)      50.5637 (2.06)     2,412.8268 (3.37)      46.4805 (2.76)          1;0  0.4124 (0.30)          5           1
test_benchmark_optimize_minio[5] (0001_writer-)     2,901.4343 (4.16)     3,096.3666 (3.95)     2,996.6734 (4.10)      89.5425 (3.65)     2,983.5820 (4.17)     168.9837 (10.04)         2;0  0.3337 (0.24)          5           1
test_benchmark_optimize_minio[5] (0002_main)        2,945.3638 (4.22)     3,008.1905 (3.84)     2,965.4372 (4.06)      24.5123 (1.0)      2,958.0412 (4.13)      16.8391 (1.0)           1;1  0.3372 (0.25)          5           1
test_benchmark_optimize_minio[1] (0002_main)        5,607.0593 (8.03)     5,673.8537 (7.24)     5,631.1967 (7.70)      25.2358 (1.03)     5,625.3738 (7.86)      22.5687 (1.34)          1;0  0.1776 (0.13)          5           1
test_benchmark_optimize_minio[1] (0001_writer-)     5,739.4105 (8.22)     5,972.5491 (7.62)     5,805.5987 (7.94)      96.2346 (3.93)     5,780.0473 (8.08)      96.6722 (5.74)          1;0  0.1722 (0.13)          5           1
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

----------------------------------------------------------------------------------------------- benchmark 'write': 4 tests ----------------------------------------------------------------------------------------------
Name (time in ms)                                  Min                   Max                  Mean             StdDev                Median                 IQR            Outliers     OPS            Rounds  Iterations
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
test_benchmark_write (0001_writer-)           266.1552 (1.0)        347.1750 (1.03)       289.3167 (1.0)      34.6414 (3.54)       269.6400 (1.0)       41.8909 (2.53)          1;0  3.4564 (1.0)           5           1
test_benchmark_write (0002_main)              315.7212 (1.19)       337.5128 (1.0)        323.9466 (1.12)      9.7868 (1.0)        319.3020 (1.18)      16.5525 (1.0)           1;0  3.0869 (0.89)          5           1
test_benchmark_write_minio (0001_writer-)     914.5869 (3.44)       999.5264 (2.96)       948.1451 (3.28)     33.1406 (3.39)       944.4835 (3.50)      45.2663 (2.73)          2;0  1.0547 (0.31)          5           1
test_benchmark_write_minio (0002_main)        973.5743 (3.66)     1,105.6136 (3.28)     1,035.1443 (3.58)     60.9761 (6.23)     1,026.6726 (3.81)     115.4668 (6.98)          2;0  0.9660 (0.28)          5           1
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

about a ~5-10% increase in write performance

Signed-off-by: Abhi Agarwal <[email protected]>
Signed-off-by: Abhi Agarwal <[email protected]>
Signed-off-by: Abhi Agarwal <[email protected]>
Signed-off-by: Abhi Agarwal <[email protected]>
Signed-off-by: Abhi Agarwal <[email protected]>
@abhiaagarwal abhiaagarwal force-pushed the abhi/async-write-buffer branch from 52b7d3d to 826025c Compare October 18, 2025 22:43
@abhiaagarwal abhiaagarwal marked this pull request as ready for review October 18, 2025 22:43
@abhiaagarwal abhiaagarwal changed the title feat: in-flight PartitionWriter feat: in-flight, streaming PartitionWriter Oct 18, 2025
Signed-off-by: Abhi Agarwal <[email protected]>
Signed-off-by: Abhi Agarwal <[email protected]>
@ion-elgreco ion-elgreco self-assigned this Oct 19, 2025
async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> {
match self {
LazyArrowWriter::Initialized(path, object_store, config) => {
let writer = ParquetObjectWriter::from_buf_writer(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice, didn't even know Parquet crate had this streamed write functionality

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Me neither, it's pretty new and I only found it after starting this PR

@ion-elgreco
Copy link
Collaborator

@abhiaagarwal we might still suffer from smaller files, we create a Writer for each datafusion partition output and then call close at the end of the execution of each inner plan. Depending on how datafusion partitioned this, it might still be below the file-size even though combined it's above the filesize

ion-elgreco
ion-elgreco previously approved these changes Oct 19, 2025
Copy link
Collaborator

@ion-elgreco ion-elgreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff @abhiaagarwal ^^, this should definitely improve also memory pressure

@abhiaagarwal
Copy link
Contributor Author

@abhiaagarwal we might still suffer from smaller files, we create a Writer for each datafusion partition output and then call close at the end of the execution of each inner plan. Depending on how datafusion partitioned this, it might still be below the file-size even though combined it's above the filesize

Yeah, my suspicion is that when the datafusion plan is created it uses target_file_size and then when it executes it also uses target_file_size, where the best thing would be to use it only for the planning and not the writing on the compact/zorder plan.

@ion-elgreco
Copy link
Collaborator

@abhiaagarwal we might still suffer from smaller files, we create a Writer for each datafusion partition output and then call close at the end of the execution of each inner plan. Depending on how datafusion partitioned this, it might still be below the file-size even though combined it's above the filesize

Yeah, my suspicion is that when the datafusion plan is created it uses target_file_size and then when it executes it also uses target_file_size, where the best thing would be to use it only for the planning and not the writing on the compact/zorder plan.

I think we can solve it if we have a single global writer here:

for i in 0..plan.properties().output_partitioning().partition_count() {
let inner_plan = plan.clone();
let inner_schema = schema.clone();
let config = WriterConfig::new(
inner_schema.clone(),
partition_columns.clone(),
writer_properties.clone(),
target_file_size,
write_batch_size,
writer_stats_config.num_indexed_cols,
writer_stats_config.stats_columns.clone(),
);
let mut writer = DeltaWriter::new(object_store.clone(), config);
let checker_stream = checker.clone();
let scan_start = std::time::Instant::now();
let mut stream = inner_plan.execute(i, session.task_ctx())?;
let handle: tokio::task::JoinHandle<
DeltaResult<(Vec<Action>, WriteExecutionPlanMetrics)>,
> = tokio::task::spawn(async move {
let mut write_time_ms = 0;
while let Some(maybe_batch) = stream.next().await {
let batch = maybe_batch?;
let write_start = std::time::Instant::now();
checker_stream.check_batch(&batch).await?;
writer.write(&batch).await?;

Instead a new writer for each inner plan

@ion-elgreco ion-elgreco enabled auto-merge (squash) October 19, 2025 14:24
@ion-elgreco ion-elgreco merged commit 3583056 into delta-io:main Oct 19, 2025
27 of 29 checks passed
ion-elgreco pushed a commit to ion-elgreco/delta-rs that referenced this pull request Oct 19, 2025
# Description

This was based on me taking a closer look at the `PartitionWriter` code,
and realizing it could be made a bit more efficient by deferring the
parquet writing to a background task and removing the async locking
associated with `AsyncShareableBuffer`.

Instead, it switches the underlying writer to use a
`ParquetObjectWriter`, which internally uses a `BufWriter`, which keeps
a certain capacity of bytes in-memory until reaching a threshold, where
it starts a multipart write. Additionally, once a writer has enough
bytes in-progress/about to be flushed, we finish its writing on a
background tokio task instead of yielding while putting new writes on
the new file.

# Related Issue(s)

Probably closes delta-io#3578, as it still uses a small buffer internally
through `ParquetObjectWriter` but adaptively flushes via a multipart
upload using `BufWriter` for streaming writes. I'm also hoping this
solves the issue I faced in delta-io#3855, though I'll aim to test this in
production.

- closes delta-io#3675 (superseded)

# Documentation


[`ParquetObjectWriter`](https://docs.rs/parquet/56.2.0/parquet/arrow/async_writer/struct.ParquetObjectWriter.html)

[`BufWriter`](https://docs.rs/object_store/0.12.4/object_store/buffered/struct.BufWriter.html)

---------

Signed-off-by: Abhi Agarwal <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

binding/python Issues for the Python package binding/rust Issues for the Rust crate

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor delta writer to not use buffer

4 participants