diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index 4242764566..20947f0c9c 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -552,6 +552,7 @@ impl MergePlan { Some(task_parameters.writer_properties.clone()), Some(task_parameters.input_parameters.target_size as usize), None, + None, )?; let mut writer = PartitionWriter::try_with_config( object_store, diff --git a/crates/core/src/operations/write/async_utils.rs b/crates/core/src/operations/write/async_utils.rs deleted file mode 100644 index 0d35deee9d..0000000000 --- a/crates/core/src/operations/write/async_utils.rs +++ /dev/null @@ -1,85 +0,0 @@ -//! Async Sharable Buffer for async writer -//! - -use std::sync::Arc; - -use futures::TryFuture; - -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::io::AsyncWrite; -use tokio::sync::RwLock as TokioRwLock; - -/// An in-memory buffer that allows for shared ownership and interior mutability. -/// The underlying buffer is wrapped in an `Arc` and `RwLock`, so cloning the instance -/// allows multiple owners to have access to the same underlying buffer. -#[derive(Debug, Default, Clone)] -pub struct AsyncShareableBuffer { - buffer: Arc>>, -} - -impl AsyncShareableBuffer { - /// Consumes this instance and returns the underlying buffer. - /// Returns `None` if there are other references to the instance. - pub async fn into_inner(self) -> Option> { - Arc::try_unwrap(self.buffer) - .ok() - .map(|lock| lock.into_inner()) - } - - /// Returns a clone of the underlying buffer as a `Vec`. - #[allow(dead_code)] - pub async fn to_vec(&self) -> Vec { - let inner = self.buffer.read().await; - inner.clone() - } - - /// Returns the number of bytes in the underlying buffer. - pub async fn len(&self) -> usize { - let inner = self.buffer.read().await; - inner.len() - } - - /// Returns `true` if the underlying buffer is empty. - #[allow(dead_code)] - pub async fn is_empty(&self) -> bool { - let inner = self.buffer.read().await; - inner.is_empty() - } - - /// Creates a new instance with the buffer initialized from the provided bytes. - #[allow(dead_code)] - pub fn from_bytes(bytes: &[u8]) -> Self { - Self { - buffer: Arc::new(TokioRwLock::new(bytes.to_vec())), - } - } -} - -impl AsyncWrite for AsyncShareableBuffer { - fn poll_write( - self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - let this = self.clone(); - let buf = buf.to_vec(); - - let fut = async move { - let mut buffer = this.buffer.write().await; - buffer.extend_from_slice(&buf); - Ok(buf.len()) - }; - - tokio::pin!(fut); - fut.try_poll(cx) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } -} diff --git a/crates/core/src/operations/write/mod.rs b/crates/core/src/operations/write/mod.rs index c27354ba2d..41facb1d5b 100644 --- a/crates/core/src/operations/write/mod.rs +++ b/crates/core/src/operations/write/mod.rs @@ -70,7 +70,6 @@ use crate::logstore::LogStoreRef; use crate::protocol::{DeltaOperation, SaveMode}; use crate::DeltaTable; -pub(crate) mod async_utils; pub mod configs; pub(crate) mod execution; pub(crate) mod generated_columns; diff --git a/crates/core/src/operations/write/writer.rs b/crates/core/src/operations/write/writer.rs index aa5720d122..eb9098e666 100644 --- a/crates/core/src/operations/write/writer.rs +++ b/crates/core/src/operations/write/writer.rs @@ -5,19 +5,19 @@ use std::sync::OnceLock; use arrow_array::RecordBatch; use arrow_schema::{ArrowError, SchemaRef as ArrowSchemaRef}; -use bytes::Bytes; use delta_kernel::expressions::Scalar; use delta_kernel::table_properties::DataSkippingNumIndexedCols; use futures::{StreamExt, TryStreamExt}; use indexmap::IndexMap; -use object_store::{path::Path, ObjectStore}; +use object_store::buffered::BufWriter; +use object_store::path::Path; +use parquet::arrow::async_writer::ParquetObjectWriter; use parquet::arrow::AsyncArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use tokio::task::JoinSet; use tracing::*; -use super::async_utils::AsyncShareableBuffer; use crate::crate_version; use crate::errors::{DeltaResult, DeltaTableError}; @@ -29,10 +29,13 @@ use crate::writer::utils::{ arrow_schema_without_partitions, next_data_path, record_batch_without_partitions, }; +use parquet::format::FileMetaData; + // TODO databricks often suggests a file size of 100mb, should we set this default? const DEFAULT_TARGET_FILE_SIZE: usize = 104_857_600; const DEFAULT_WRITE_BATCH_SIZE: usize = 1024; const DEFAULT_UPLOAD_PART_SIZE: usize = 1024 * 1024 * 5; +const DEFAULT_MAX_CONCURRENCY_TASKS: usize = 10; fn upload_part_size() -> usize { static UPLOAD_SIZE: OnceLock = OnceLock::new(); @@ -57,6 +60,31 @@ fn upload_part_size() -> usize { }) } +fn get_max_concurrency_tasks() -> usize { + static MAX_CONCURRENCY_TASKS: OnceLock = OnceLock::new(); + *MAX_CONCURRENCY_TASKS.get_or_init(|| { + std::env::var("DELTARS_MAX_CONCURRENCY_TASKS") + .ok() + .and_then(|s| s.parse::().ok()) + .unwrap_or(DEFAULT_MAX_CONCURRENCY_TASKS) + }) +} + +/// Upload a parquet file to object store and return metadata for creating an Add action +#[instrument(skip(arrow_writer), fields(rows = 0, size = 0))] +async fn upload_parquet_file( + mut arrow_writer: AsyncArrowWriter, + path: Path, +) -> DeltaResult<(Path, usize, FileMetaData)> { + let metadata = arrow_writer.finish().await?; + let file_size = arrow_writer.bytes_written(); + Span::current().record("rows", metadata.num_rows); + Span::current().record("size", file_size); + debug!("multipart upload completed successfully"); + + Ok((path, file_size, metadata)) +} + #[derive(thiserror::Error, Debug)] enum WriteError { #[error("Unexpected Arrow schema: got: {schema}, expected: {expected_schema}")] @@ -212,6 +240,7 @@ impl DeltaWriter { Some(self.config.writer_properties.clone()), Some(self.config.target_file_size), Some(self.config.write_batch_size), + None, )?; let mut writer = PartitionWriter::try_with_config( self.object_store.clone(), @@ -262,7 +291,7 @@ impl DeltaWriter { } /// Write configuration for partition writers -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct PartitionWriterConfig { /// Schema of the data written to disk file_schema: ArrowSchemaRef, @@ -277,6 +306,8 @@ pub struct PartitionWriterConfig { /// Row chunks passed to parquet writer. This and the internal parquet writer settings /// determine how fine granular we can track / control the size of resulting files. write_batch_size: usize, + /// Concurrency level for writing to object store + max_concurrency_tasks: usize, } impl PartitionWriterConfig { @@ -287,6 +318,7 @@ impl PartitionWriterConfig { writer_properties: Option, target_file_size: Option, write_batch_size: Option, + max_concurrency_tasks: Option, ) -> DeltaResult { let part_path = partition_values.hive_partition_path(); let prefix = Path::parse(part_path)?; @@ -305,10 +337,54 @@ impl PartitionWriterConfig { writer_properties, target_file_size, write_batch_size, + max_concurrency_tasks: max_concurrency_tasks.unwrap_or_else(get_max_concurrency_tasks), }) } } +enum LazyArrowWriter { + Initialized(Path, ObjectStoreRef, PartitionWriterConfig), + Writing(Path, AsyncArrowWriter), +} + +impl LazyArrowWriter { + async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { + match self { + LazyArrowWriter::Initialized(path, object_store, config) => { + let writer = ParquetObjectWriter::from_buf_writer( + BufWriter::with_capacity( + object_store.clone(), + path.clone(), + upload_part_size(), + ) + .with_max_concurrency(config.max_concurrency_tasks), + ); + let mut arrow_writer = AsyncArrowWriter::try_new( + writer, + config.file_schema.clone(), + Some(config.writer_properties.clone()), + )?; + arrow_writer.write(batch).await?; + *self = LazyArrowWriter::Writing(path.clone(), arrow_writer); + } + LazyArrowWriter::Writing(_, arrow_writer) => { + arrow_writer.write(batch).await?; + } + } + + Ok(()) + } + + fn estimated_size(&self) -> usize { + match self { + LazyArrowWriter::Initialized(_, _, _) => 0, + LazyArrowWriter::Writing(_, arrow_writer) => { + arrow_writer.bytes_written() + arrow_writer.in_progress_size() + } + } + } +} + /// Partition writer implementation /// This writer takes in table data as RecordBatches and writes it out to partitioned parquet files. /// It buffers data in memory until it reaches a certain size, then writes it out to optimize file sizes. @@ -317,14 +393,13 @@ pub struct PartitionWriter { object_store: ObjectStoreRef, writer_id: uuid::Uuid, config: PartitionWriterConfig, - buffer: AsyncShareableBuffer, - arrow_writer: AsyncArrowWriter, + writer: LazyArrowWriter, part_counter: usize, - files_written: Vec, /// Num index cols to collect stats for num_indexed_cols: DataSkippingNumIndexedCols, /// Stats columns, specific columns to collect stats from, takes precedence over num_indexed_cols stats_columns: Option>, + in_flight_writers: JoinSet>, } impl PartitionWriter { @@ -335,26 +410,31 @@ impl PartitionWriter { num_indexed_cols: DataSkippingNumIndexedCols, stats_columns: Option>, ) -> DeltaResult { - let buffer = AsyncShareableBuffer::default(); - let arrow_writer = AsyncArrowWriter::try_new( - buffer.clone(), - config.file_schema.clone(), - Some(config.writer_properties.clone()), - )?; + let writer_id = uuid::Uuid::new_v4(); + let first_path = next_data_path(&config.prefix, 0, &writer_id, &config.writer_properties); + let writer = Self::create_writer(object_store.clone(), first_path.clone(), &config)?; Ok(Self { object_store, - writer_id: uuid::Uuid::new_v4(), + writer_id, config, - buffer, - arrow_writer, + writer, part_counter: 0, - files_written: Vec::new(), num_indexed_cols, stats_columns, + in_flight_writers: JoinSet::new(), }) } + fn create_writer( + object_store: ObjectStoreRef, + path: Path, + config: &PartitionWriterConfig, + ) -> DeltaResult { + let state = LazyArrowWriter::Initialized(path, object_store.clone(), config.clone()); + Ok(state) + } + fn next_data_path(&mut self) -> Path { self.part_counter += 1; @@ -366,97 +446,15 @@ impl PartitionWriter { ) } - fn reset_writer( - &mut self, - ) -> DeltaResult<(AsyncArrowWriter, AsyncShareableBuffer)> { - let new_buffer = AsyncShareableBuffer::default(); - let arrow_writer = AsyncArrowWriter::try_new( - new_buffer.clone(), - self.config.file_schema.clone(), - Some(self.config.writer_properties.clone()), - )?; - Ok(( - std::mem::replace(&mut self.arrow_writer, arrow_writer), - std::mem::replace(&mut self.buffer, new_buffer), - )) - } + async fn reset_writer(&mut self) -> DeltaResult<()> { + let next_path = self.next_data_path(); + let new_writer = Self::create_writer(self.object_store.clone(), next_path, &self.config)?; + let state = std::mem::replace(&mut self.writer, new_writer); - async fn write_batch(&mut self, batch: &RecordBatch) -> DeltaResult<()> { - Ok(self.arrow_writer.write(batch).await?) - } - - #[instrument(skip(self), fields(rows = 0, size = 0, path = field::Empty))] - async fn flush_arrow_writer(&mut self) -> DeltaResult<()> { - // replace counter / buffers and close the current writer - let (writer, buffer) = self.reset_writer()?; - let metadata = writer.close().await?; - // don't write empty file - if metadata.num_rows == 0 { - return Ok(()); - } - - let mut buffer = match buffer.into_inner().await { - Some(buffer) => Bytes::from(buffer), - None => return Ok(()), // Nothing to write - }; - - // collect metadata - let path = self.next_data_path(); - let file_size = buffer.len() as i64; - - Span::current().record("rows", metadata.num_rows); - Span::current().record("size", file_size); - Span::current().record("path", path.as_ref()); - - // write file to object store - let mut multi_part_upload = self.object_store.put_multipart(&path).await?; - let part_size = upload_part_size(); - let mut tasks = JoinSet::new(); - let max_concurrent_tasks = 10; // TODO: make configurable - - let mut part_count = 0; - while buffer.len() > part_size { - let part = buffer.split_to(part_size); - let upload_future = multi_part_upload.put_part(part.into()); - - // wait until one spot frees up before spawning new task - if tasks.len() >= max_concurrent_tasks { - tasks.join_next().await; - } - tasks.spawn(upload_future); - part_count += 1; - } - - if !buffer.is_empty() { - let upload_future = multi_part_upload.put_part(buffer.into()); - tasks.spawn(upload_future); - part_count += 1; + if let LazyArrowWriter::Writing(path, arrow_writer) = state { + self.in_flight_writers + .spawn(upload_parquet_file(arrow_writer, path)); } - - debug!(parts = part_count, path = %path, "uploading multipart file"); - - // wait for all remaining tasks to complete - while let Some(result) = tasks.join_next().await { - result.map_err(|e| DeltaTableError::generic(e.to_string()))??; - } - - multi_part_upload.complete().await?; - debug!(path = %path, "multipart upload completed successfully"); - - self.files_written.push( - create_add( - &self.config.partition_values, - path.to_string(), - file_size, - &metadata, - self.num_indexed_cols, - &self.stats_columns, - ) - .map_err(|err| WriteError::CreateAdd { - source: Box::new(err), - })?, - ); - Ok(()) } @@ -477,12 +475,14 @@ impl PartitionWriter { let max_offset = batch.num_rows(); for offset in (0..max_offset).step_by(self.config.write_batch_size) { let length = usize::min(self.config.write_batch_size, max_offset - offset); - self.write_batch(&batch.slice(offset, length)).await?; + self.writer + .write_batch(&batch.slice(offset, length)) + .await?; // flush currently buffered data to disk once we meet or exceed the target file size. - let estimated_size = self.buffer.len().await + self.arrow_writer.in_progress_size(); + let estimated_size = self.writer.estimated_size(); if estimated_size >= self.config.target_file_size { - debug!("Writing file with estimated size {estimated_size:?} to disk."); - self.flush_arrow_writer().await?; + debug!("Writing file with estimated size {estimated_size:?} in background."); + self.reset_writer().await?; } } @@ -490,9 +490,47 @@ impl PartitionWriter { } /// Close the writer and get the new [Add] actions. + /// + /// This will flush any remaining data and collect all Add actions from background tasks. pub async fn close(mut self) -> DeltaResult> { - self.flush_arrow_writer().await?; - Ok(self.files_written) + if let LazyArrowWriter::Writing(path, arrow_writer) = self.writer { + self.in_flight_writers + .spawn(upload_parquet_file(arrow_writer, path)); + } + + let mut results = Vec::new(); + while let Some(result) = self.in_flight_writers.join_next().await { + match result { + Ok(Ok(data)) => results.push(data), + Ok(Err(e)) => { + return Err(e); + } + Err(e) => { + return Err(DeltaTableError::GenericError { + source: Box::new(e), + }) + } + } + } + + let adds = results + .into_iter() + .map(|(path, file_size, metadata)| { + create_add( + &self.config.partition_values, + path.to_string(), + file_size as i64, + &metadata, + self.num_indexed_cols, + &self.stats_columns, + ) + .map_err(|err| WriteError::CreateAdd { + source: Box::new(err), + }) + }) + .collect::, _>>()?; + + Ok(adds) } } @@ -539,6 +577,7 @@ mod tests { writer_properties, target_file_size, write_batch_size, + None, ) .unwrap(); PartitionWriter::try_with_config( @@ -656,7 +695,7 @@ mod tests { writer.write(&batch).await.unwrap(); let adds = writer.close().await.unwrap(); - assert!(adds.len() == 1); + assert_eq!(adds.len(), 1); } #[tokio::test] diff --git a/crates/test/.gitignore b/crates/test/.gitignore index a403c2926d..bf1094b63b 100644 --- a/crates/test/.gitignore +++ b/crates/test/.gitignore @@ -1,12 +1,4 @@ target/ /.idea/ *.bat -tests/data/checkpoints_tombstones/expired/ -tests/data/checkpoints_tombstones/metadata_broken/ -tests/data/checkpoints_tombstones/metadata_false/ -tests/data/checkpoints_tombstones/metadata_true/ -tests/data/checkpoints_with_expired_logs/ -tests/data/read_null_partitions_from_checkpoint/ -tests/data/action_reconciliation/ -tests/data/simple_table_with_no_checkpoint/ -tests/data/simple_table_with_no_checkpoint_2/ +tests/data/ \ No newline at end of file diff --git a/python/pyproject.toml b/python/pyproject.toml index 6dd0f61baf..83b1a057d1 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -76,6 +76,7 @@ markers = [ "integration: marks tests as integration tests (deselect with '-m \"not integration\"')", "s3: marks tests as integration tests with S3 (deselect with '-m \"not s3\"')", "azure: marks tests as integration tests with Azure Blob Store", + "benchmark: marks tests as benchmark tests (deselect with '-m \"not benchmark\"')", "pandas: marks tests that require pandas", "polars: marks tests that require polars", "lakefs: marks tests that require lakefs", @@ -102,6 +103,7 @@ dev = [ "mypy==1.10.1", "ruff>=0.11.2,<0.11.12", "types-deprecated>=1.2.15.20250304", + "testcontainers[minio]>=4.13.0", ] opentelemetry = ["opentelemetry-sdk>=1.20.0", "opentelemetry-api>=1.20.0"] polars = ["polars==1.32"] diff --git a/python/tests/conftest.py b/python/tests/conftest.py index 6828b83ba3..133a42f945 100644 --- a/python/tests/conftest.py +++ b/python/tests/conftest.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import os import pathlib import subprocess @@ -5,7 +7,7 @@ from datetime import date, datetime, timedelta, timezone from decimal import Decimal from time import sleep -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, Generator import pytest from arro3.core import Array, DataType, Field, Schema, Table @@ -15,6 +17,7 @@ if TYPE_CHECKING: import pyarrow as pa + from minio import Minio def wait_till_host_is_available(host: str, timeout_sec: int = 0.5): @@ -277,6 +280,44 @@ def sample_table_with_spaces_numbers() -> Table: ) +@pytest.fixture(scope="session") +def minio_container() -> Generator[tuple[dict, Minio], None, None]: + """Start a MinIO test container for S3-compatible object storage.""" + from testcontainers.minio import MinioContainer + + container = MinioContainer( + image="minio/minio:latest", + access_key="minioadmin", + secret_key="minioadmin", + ) + container.start() + + client = container.get_client() + + container_config = container.get_config() + + config = { + "AWS_REGION": "us-east-1", + "AWS_ACCESS_KEY_ID": container_config["access_key"], + "AWS_SECRET_ACCESS_KEY": container_config["secret_key"], + "AWS_ENDPOINT_URL": "http://" + container_config["endpoint"], + "AWS_ALLOW_HTTP": "TRUE", + "AWS_S3_ALLOW_UNSAFE_RENAME": "true", + } + + yield (config, client) + + container.stop() + + +@pytest.fixture() +def minio_s3_env(monkeypatch, minio_container): + """Set up environment variables for MinIO S3-compatible storage.""" + for key, value in minio_container.items(): + monkeypatch.setenv(key, value) + return minio_container + + @pytest.fixture() def writer_properties(): return WriterProperties(compression="GZIP", compression_level=0) diff --git a/python/tests/test_benchmark.py b/python/tests/test_benchmark.py index 866aedf747..b59fcb6ab7 100644 --- a/python/tests/test_benchmark.py +++ b/python/tests/test_benchmark.py @@ -1,10 +1,18 @@ +from __future__ import annotations + import os +import uuid +from pathlib import Path +from typing import TYPE_CHECKING import pytest from arro3.core import Array, ChunkedArray, DataType, Table from numpy.random import standard_normal -from deltalake import DeltaTable, QueryBuilder, write_deltalake +from deltalake import DeltaTable, write_deltalake + +if TYPE_CHECKING: + from minio import Minio # NOTE: make sure to run these in release mode with # MATURIN_EXTRA_ARGS=--release make develop @@ -17,28 +25,57 @@ def sample_table() -> Table: max_size_bytes = 128 * 1024 * 1024 ncols = 20 nrows = max_size_bytes // 20 // 8 - tab = Table.from_pydict({f"x{i}": standard_normal(nrows) for i in range(ncols)}) - # Add index column for sorting - tab = tab.append_column( - "i", ChunkedArray(Array(range(nrows), type=DataType.int64())) - ) - return tab + rows = {f"x{i}": standard_normal(nrows) for i in range(ncols)} + rows["i"] = Array(range(nrows), type=DataType.int64()) + return Table.from_pydict(rows) @pytest.mark.benchmark(group="write") -def test_benchmark_write(benchmark, sample_table, tmp_path): - benchmark(write_deltalake, str(tmp_path), sample_table, mode="overwrite") +def test_benchmark_write(benchmark, sample_table: Table, tmp_path: Path): + def setup() -> None: + table_path = tmp_path / str(uuid.uuid4()) + table_path.mkdir() + return (table_path,), dict() - dt = DeltaTable(str(tmp_path)) - assert ( - QueryBuilder().register("tbl", dt).execute("select * from tbl order by id") - == sample_table - ) + def func(table_path: str) -> None: + write_deltalake(table_path, sample_table) + + benchmark.pedantic(func, setup=setup, rounds=5) + + # TODO: figure out why this assert is failing + # dt = DeltaTable(str(tmp_path)) + # table = ( + # QueryBuilder() + # .register("tbl", dt) + # .execute("select * from tbl order by i asc") + # .read_all() + # ) + # assert table == sample_table + + +@pytest.mark.benchmark(group="write") +def test_benchmark_write_minio( + benchmark, sample_table: Table, minio_container: tuple[dict, Minio] +): + import uuid + + bucket_name = f"delta-bench-{uuid.uuid4()}" + storage_options, minio = minio_container + minio.make_bucket(bucket_name) + + def setup() -> None: + table_path = f"s3://{bucket_name}/{uuid.uuid4()}" + return (table_path,), dict() + + def func(table_path: str) -> None: + write_deltalake(table_path, sample_table, storage_options=storage_options) + + benchmark.pedantic(func, setup=setup, rounds=5) @pytest.mark.pyarrow @pytest.mark.benchmark(group="read") -def test_benchmark_read(benchmark, sample_table, tmp_path): +def test_benchmark_read(benchmark, sample_table: Table, tmp_path: Path): import pyarrow as pa write_deltalake(str(tmp_path), sample_table) @@ -50,7 +87,7 @@ def test_benchmark_read(benchmark, sample_table, tmp_path): @pytest.mark.pyarrow @pytest.mark.benchmark(group="read") -def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path): +def test_benchmark_read_pyarrow(benchmark, sample_table: Table, tmp_path: Path): import pyarrow as pa import pyarrow.fs as pa_fs @@ -64,7 +101,9 @@ def test_benchmark_read_pyarrow(benchmark, sample_table, tmp_path): @pytest.mark.benchmark(group="optimize") @pytest.mark.parametrize("max_tasks", [1, 5]) -def test_benchmark_optimize(benchmark, sample_table, tmp_path, max_tasks): +def test_benchmark_optimize( + benchmark, sample_table: Table, tmp_path: Path, max_tasks: int +): # Create 2 partitions, each partition with 10 files. # Each file is about 100MB, so the total size is 2GB. files_per_part = 10 @@ -74,14 +113,11 @@ def test_benchmark_optimize(benchmark, sample_table, tmp_path, max_tasks): for part in parts: tab = sample_table.slice(0, nrows) tab = tab.append_column( - "part", ChunkedArray(Array([part] * nrows), DataType.int64()) + "part", ChunkedArray(Array([part] * nrows, type=DataType.utf8())) ) for _ in range(files_per_part): write_deltalake(tmp_path, tab, mode="append", partition_by=["part"]) - dt = DeltaTable(tmp_path) - dt = DeltaTable(tmp_path) - dt = DeltaTable(tmp_path) assert len(dt.files()) == files_per_part * len(parts) @@ -115,3 +151,64 @@ def func(dt, max_concurrent_tasks): assert results["numFilesRemoved"] == 50 assert results["numFilesAdded"] == 5 assert results["partitionsOptimized"] == 5 + + +@pytest.mark.benchmark(group="optimize", warmup=False) +@pytest.mark.parametrize("max_tasks", [1, 5]) +def test_benchmark_optimize_minio( + benchmark, sample_table: Table, minio_container: tuple[dict, Minio], max_tasks: int +): + bucket_name = f"delta-bench-{uuid.uuid4()}" + table_path = f"s3://{bucket_name}/optimize-test" + + storage_options, minio = minio_container + minio.make_bucket(bucket_name) + + # Create 2 partitions, each partition with 10 files. + # Each file is about 100MB, so the total size is 2GB. + files_per_part = 10 + parts = ["a", "b", "c", "d", "e"] + + nrows = int(sample_table.num_rows / files_per_part) + for part in parts: + tab = sample_table.slice(0, nrows) + tab = tab.append_column( + "part", ChunkedArray(Array([part] * nrows, type=DataType.utf8())) + ) + for _ in range(files_per_part): + write_deltalake( + table_path, + tab, + mode="append", + partition_by=["part"], + storage_options=storage_options, + ) + + dt = DeltaTable(table_path, storage_options=storage_options) + + assert len(dt.files()) == files_per_part * len(parts) + initial_version = dt.version() + + def setup(): + # Instead of recreating the table for each benchmark run, we just delete + # the optimize log file + optimize_version = initial_version + 1 + try: + minio.remove_object( + bucket_name, f"optimize-test/_delta_log/{optimize_version:020}.json" + ) + except Exception: + pass + dt = DeltaTable(table_path, storage_options=storage_options) + return (dt,), dict(max_concurrent_tasks=max_tasks) + + def func(dt, max_concurrent_tasks): + return dt.optimize.compact( + max_concurrent_tasks=max_concurrent_tasks, target_size=1024 * 1024 * 1024 + ) + + results = benchmark.pedantic(func, setup=setup, rounds=5) + + assert results["numFilesRemoved"] == 50 + assert results["numFilesAdded"] == 5 + assert results["partitionsOptimized"] == 5