diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index 566190b3dc..ab6ee14f74 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -70,7 +70,11 @@ use crate::table::state::DeltaTableState; use crate::table::{Constraint, GeneratedColumn}; use crate::{open_table, open_table_with_storage_options, DeltaTable}; -pub use self::session::*; +pub(crate) use self::session::session_state_from_session; +pub use self::session::{ + create_session, DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig, + DeltaSessionContext, +}; pub(crate) use find_files::*; pub(crate) const PATH_COLUMN: &str = "__delta_rs_path"; diff --git a/crates/core/src/delta_datafusion/session.rs b/crates/core/src/delta_datafusion/session.rs index 0b6f08453e..db9ace4d43 100644 --- a/crates/core/src/delta_datafusion/session.rs +++ b/crates/core/src/delta_datafusion/session.rs @@ -1,7 +1,14 @@ +use std::sync::Arc; + use datafusion::{ catalog::Session, common::{exec_datafusion_err, Result as DataFusionResult}, - execution::{SessionState, SessionStateBuilder}, + execution::{ + disk_manager::DiskManagerBuilder, + memory_pool::FairSpillPool, + runtime_env::{RuntimeEnv, RuntimeEnvBuilder}, + SessionState, SessionStateBuilder, + }, prelude::{SessionConfig, SessionContext}, sql::planner::ParserOptions, }; @@ -67,18 +74,67 @@ impl From for SessionConfig { } } -/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults +/// A builder for configuring DataFusion RuntimeEnv with Delta-specific defaults +#[derive(Default)] +pub struct DeltaRuntimeEnvBuilder { + inner: RuntimeEnvBuilder, +} + +impl DeltaRuntimeEnvBuilder { + pub fn new() -> Self { + Self { + inner: RuntimeEnvBuilder::new(), + } + } + + pub fn with_max_spill_size(mut self, size: usize) -> Self { + let memory_pool = FairSpillPool::new(size); + self.inner = self.inner.with_memory_pool(Arc::new(memory_pool)); + self + } + + pub fn with_max_temp_directory_size(mut self, size: u64) -> Self { + let disk_manager = DiskManagerBuilder::default().with_max_temp_directory_size(size); + self.inner = self.inner.with_disk_manager_builder(disk_manager); + self + } + + pub fn build(self) -> Arc { + self.inner.build_arc().unwrap() + } +} + +/// A wrapper for DataFusion's SessionContext with Delta-specific defaults +/// +/// This provides a way of creating DataFusion sessions with consistent +/// Delta Lake configuration (case-sensitive identifiers, Delta planner, etc.) pub struct DeltaSessionContext { inner: SessionContext, } impl DeltaSessionContext { + /// Create a new DeltaSessionContext with default configuration pub fn new() -> Self { - let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into()); + let config = DeltaSessionConfig::default().into(); + let runtime_env = RuntimeEnvBuilder::new().build_arc().unwrap(); + Self::new_with_config_and_runtime(config, runtime_env) + } + + /// Create a DeltaSessionContext with a custom RuntimeEnv + pub fn with_runtime_env(runtime_env: Arc) -> Self { + let config = DeltaSessionConfig::default().into(); + Self::new_with_config_and_runtime(config, runtime_env) + } + + fn new_with_config_and_runtime(config: SessionConfig, runtime_env: Arc) -> Self { let planner = DeltaPlanner::new(); - let state = SessionStateBuilder::new_from_existing(ctx.state()) + let state = SessionStateBuilder::new() + .with_default_features() + .with_config(config) + .with_runtime_env(runtime_env) .with_query_planner(planner) .build(); + let inner = SessionContext::new_with_state(state); Self { inner } } @@ -86,6 +142,10 @@ impl DeltaSessionContext { pub fn into_inner(self) -> SessionContext { self.inner } + + pub fn state(&self) -> SessionState { + self.inner.state() + } } impl Default for DeltaSessionContext { diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index c57bd55905..5b028464e4 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -29,9 +29,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use datafusion::catalog::Session; use datafusion::execution::context::SessionState; -use datafusion::execution::memory_pool::FairSpillPool; -use datafusion::execution::runtime_env::RuntimeEnvBuilder; -use datafusion::execution::SessionStateBuilder; use delta_kernel::engine::arrow_conversion::TryIntoArrow as _; use delta_kernel::expressions::Scalar; use delta_kernel::table_properties::DataSkippingNumIndexedCols; @@ -51,7 +48,7 @@ use uuid::Uuid; use super::write::writer::{PartitionWriter, PartitionWriterConfig}; use super::{CustomExecuteHandler, Operation}; -use crate::delta_datafusion::DeltaTableProvider; +use crate::delta_datafusion::{DeltaRuntimeEnvBuilder, DeltaSessionContext, DeltaTableProvider}; use crate::errors::{DeltaResult, DeltaTableError}; use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL}; use crate::kernel::{resolve_snapshot, EagerSnapshot}; @@ -215,8 +212,6 @@ pub struct OptimizeBuilder<'a> { preserve_insertion_order: bool, /// Maximum number of concurrent tasks (default is number of cpus) max_concurrent_tasks: usize, - /// Maximum number of bytes allowed in memory before spilling to disk - max_spill_size: usize, /// Optimize type optimize_type: OptimizeType, /// Datafusion session state relevant for executing the input plan @@ -234,6 +229,33 @@ impl super::Operation for OptimizeBuilder<'_> { } } +/// Create a SessionState configured for optimize operations with custom spill settings. +/// +/// This is the recommended way to configure memory and disk limits for optimize operations. +/// The created SessionState should be passed to [`OptimizeBuilder`] via [`with_session_state`](OptimizeBuilder::with_session_state). +/// +/// # Arguments +/// * `max_spill_size` - Maximum bytes in memory before spilling to disk. If `None`, uses DataFusion's default memory pool. +/// * `max_temp_directory_size` - Maximum disk space for temporary spill files. If `None`, uses DataFusion's default disk manager. +pub fn create_session_state_for_optimize( + max_spill_size: Option, + max_temp_directory_size: Option, +) -> SessionState { + if max_spill_size.is_none() && max_temp_directory_size.is_none() { + return DeltaSessionContext::new().state(); + } + + let mut builder = DeltaRuntimeEnvBuilder::new(); + if let Some(spill_size) = max_spill_size { + builder = builder.with_max_spill_size(spill_size); + } + if let Some(directory_size) = max_temp_directory_size { + builder = builder.with_max_temp_directory_size(directory_size); + } + + DeltaSessionContext::with_runtime_env(builder.build()).state() +} + impl<'a> OptimizeBuilder<'a> { /// Create a new [`OptimizeBuilder`] pub(crate) fn new(log_store: LogStoreRef, snapshot: Option) -> Self { @@ -246,7 +268,6 @@ impl<'a> OptimizeBuilder<'a> { commit_properties: CommitProperties::default(), preserve_insertion_order: false, max_concurrent_tasks: num_cpus::get(), - max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB. optimize_type: OptimizeType::Compact, min_commit_interval: None, session: None, @@ -296,16 +317,6 @@ impl<'a> OptimizeBuilder<'a> { self } - /// Max spill size - #[deprecated( - since = "0.29.0", - note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`" - )] - pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self { - self.max_spill_size = max_spill_size; - self - } - /// Min commit interval pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self { self.min_commit_interval = Some(min_commit_interval); @@ -348,17 +359,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> { let session = this .session .and_then(|session| session.as_any().downcast_ref::().cloned()) - .unwrap_or_else(|| { - let memory_pool = FairSpillPool::new(this.max_spill_size); - let runtime = RuntimeEnvBuilder::new() - .with_memory_pool(Arc::new(memory_pool)) - .build_arc() - .unwrap(); - SessionStateBuilder::new() - .with_default_features() - .with_runtime_env(runtime) - .build() - }); + .unwrap_or_else(|| create_session_state_for_optimize(None, None)); let plan = create_merge_plan( &this.log_store, this.optimize_type, diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index a2aad56a98..3e2f61dc95 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -117,6 +117,8 @@ class RawDeltaTable: partition_filters: PartitionFilterType | None, target_size: int | None, max_concurrent_tasks: int | None, + max_spill_size: int | None, + max_temp_directory_size: int | None, min_commit_interval: int | None, writer_properties: WriterProperties | None, commit_properties: CommitProperties | None, @@ -129,6 +131,7 @@ class RawDeltaTable: target_size: int | None, max_concurrent_tasks: int | None, max_spill_size: int | None, + max_temp_directory_size: int | None, min_commit_interval: int | None, writer_properties: WriterProperties | None, commit_properties: CommitProperties | None, diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 6f1e722fc1..8a5bd4833b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1961,6 +1961,8 @@ def compact( partition_filters: FilterConjunctionType | None = None, target_size: int | None = None, max_concurrent_tasks: int | None = None, + max_spill_size: int | None = None, + max_temp_directory_size: int | None = None, min_commit_interval: int | timedelta | None = None, writer_properties: WriterProperties | None = None, post_commithook_properties: PostCommitHookProperties | None = None, @@ -1983,6 +1985,8 @@ def compact( max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. + max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default. + max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default. min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. @@ -2016,6 +2020,8 @@ def compact( self.table._stringify_partition_values(partition_filters), target_size, max_concurrent_tasks, + max_spill_size, + max_temp_directory_size, min_commit_interval, writer_properties, commit_properties, @@ -2030,7 +2036,8 @@ def z_order( partition_filters: FilterConjunctionType | None = None, target_size: int | None = None, max_concurrent_tasks: int | None = None, - max_spill_size: int = 20 * 1024 * 1024 * 1024, + max_spill_size: int | None = None, + max_temp_directory_size: int | None = None, min_commit_interval: int | timedelta | None = None, writer_properties: WriterProperties | None = None, post_commithook_properties: PostCommitHookProperties | None = None, @@ -2050,7 +2057,8 @@ def z_order( max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. - max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB. + max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default. + max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default. min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you want a commit per partition. @@ -2086,6 +2094,7 @@ def z_order( target_size, max_concurrent_tasks, max_spill_size, + max_temp_directory_size, min_commit_interval, writer_properties, commit_properties, diff --git a/python/src/lib.rs b/python/src/lib.rs index 10c40a51be..8724a64b29 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -34,9 +34,21 @@ use deltalake::lakefs::LakeFSCustomExecuteHandler; use deltalake::logstore::LogStoreRef; use deltalake::logstore::{IORuntime, ObjectStoreRef}; use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy}; -use deltalake::operations::optimize::OptimizeType; -use deltalake::operations::update_table_metadata::TableMetadataUpdate; -use deltalake::operations::vacuum::VacuumMode; +use deltalake::operations::delete::DeleteBuilder; +use deltalake::operations::drop_constraints::DropConstraintBuilder; +use deltalake::operations::filesystem_check::FileSystemCheckBuilder; +use deltalake::operations::load_cdf::CdfLoadBuilder; +use deltalake::operations::optimize::{ + create_session_state_for_optimize, OptimizeBuilder, OptimizeType, +}; +use deltalake::operations::restore::RestoreBuilder; +use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder; +use deltalake::operations::update::UpdateBuilder; +use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder; +use deltalake::operations::update_table_metadata::{ + TableMetadataUpdate, UpdateTableMetadataBuilder, +}; +use deltalake::operations::vacuum::{VacuumBuilder, VacuumMode}; use deltalake::operations::write::WriteBuilder; use deltalake::operations::CustomExecuteHandler; use deltalake::parquet::basic::{Compression, Encoding}; @@ -581,6 +593,8 @@ impl RawDeltaTable { partition_filters = None, target_size = None, max_concurrent_tasks = None, + max_spill_size = None, + max_temp_directory_size = None, min_commit_interval = None, writer_properties=None, commit_properties=None, @@ -593,6 +607,8 @@ impl RawDeltaTable { partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, + max_spill_size: Option, + max_temp_directory_size: Option, min_commit_interval: Option, writer_properties: Option, commit_properties: Option, @@ -604,6 +620,12 @@ impl RawDeltaTable { .optimize() .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); + if max_spill_size.is_some() || max_temp_directory_size.is_some() { + let session = + create_session_state_for_optimize(max_spill_size, max_temp_directory_size); + cmd = cmd.with_session_state(Arc::new(session)); + } + if let Some(size) = target_size { cmd = cmd.with_target_size(size); } @@ -646,7 +668,8 @@ impl RawDeltaTable { partition_filters = None, target_size = None, max_concurrent_tasks = None, - max_spill_size = 20 * 1024 * 1024 * 1024, + max_spill_size = None, + max_temp_directory_size = None, min_commit_interval = None, writer_properties=None, commit_properties=None, @@ -659,7 +682,8 @@ impl RawDeltaTable { partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, - max_spill_size: usize, + max_spill_size: Option, + max_temp_directory_size: Option, min_commit_interval: Option, writer_properties: Option, commit_properties: Option, @@ -670,9 +694,14 @@ impl RawDeltaTable { let mut cmd = DeltaOps(table.clone()) .optimize() .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) - .with_max_spill_size(max_spill_size) .with_type(OptimizeType::ZOrder(z_order_columns)); + if max_spill_size.is_some() || max_temp_directory_size.is_some() { + let session = + create_session_state_for_optimize(max_spill_size, max_temp_directory_size); + cmd = cmd.with_session_state(Arc::new(session)); + } + if let Some(size) = target_size { cmd = cmd.with_target_size(size); } diff --git a/python/src/query.rs b/python/src/query.rs index 6827369ef1..6a48f4f2ef 100644 --- a/python/src/query.rs +++ b/python/src/query.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use deltalake::{ datafusion::prelude::SessionContext, delta_datafusion::{ - DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionConfig, DeltaTableProvider, + DataFusionMixins, DeltaScanConfigBuilder, DeltaSessionContext, DeltaTableProvider, }, }; use pyo3::prelude::*; @@ -25,8 +25,8 @@ pub(crate) struct PyQueryBuilder { impl PyQueryBuilder { #[new] pub fn new() -> Self { - let config = DeltaSessionConfig::default().into(); - let ctx = SessionContext::new_with_config(config); + let delta_ctx = DeltaSessionContext::new(); + let ctx = delta_ctx.into_inner(); PyQueryBuilder { ctx } } diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index bfe8842b34..0f99de834e 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -266,3 +266,49 @@ def test_optimize_schema_evolved_3185(tmp_path): assert dt.version() == 2 last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" + + +def test_compact_with_spill_parameters( + tmp_path: pathlib.Path, + sample_table: Table, +): + write_deltalake(tmp_path, sample_table, mode="append") + write_deltalake(tmp_path, sample_table, mode="append") + write_deltalake(tmp_path, sample_table, mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + old_num_files = len(dt.file_uris()) + + dt.optimize.compact( + max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB + max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB + ) + + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + assert dt.version() == old_version + 1 + assert len(dt.file_uris()) <= old_num_files + + +def test_z_order_with_spill_parameters( + tmp_path: pathlib.Path, + sample_table: Table, +): + write_deltalake(tmp_path, sample_table, mode="append") + write_deltalake(tmp_path, sample_table, mode="append") + write_deltalake(tmp_path, sample_table, mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + dt.optimize.z_order( + columns=["sold", "price"], + max_spill_size=100 * 1024 * 1024 * 1024, # 100 GB + max_temp_directory_size=500 * 1024 * 1024 * 1024, # 500 GB + ) + + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + assert dt.version() == old_version + 1 + assert len(dt.file_uris()) == 1