Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ use crate::table::state::DeltaTableState;
use crate::table::{Constraint, GeneratedColumn};
use crate::{open_table, open_table_with_storage_options, DeltaTable};

pub use self::session::*;
pub(crate) use self::session::session_state_from_session;
pub use self::session::{
create_session, DeltaParserOptions, DeltaRuntimeEnvBuilder, DeltaSessionConfig,
DeltaSessionContext,
};
pub(crate) use find_files::*;

pub(crate) const PATH_COLUMN: &str = "__delta_rs_path";
Expand Down
68 changes: 64 additions & 4 deletions crates/core/src/delta_datafusion/session.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
use std::sync::Arc;

use datafusion::{
catalog::Session,
common::{exec_datafusion_err, Result as DataFusionResult},
execution::{SessionState, SessionStateBuilder},
execution::{
disk_manager::DiskManagerBuilder,
memory_pool::FairSpillPool,
runtime_env::{RuntimeEnv, RuntimeEnvBuilder},
SessionState, SessionStateBuilder,
},
prelude::{SessionConfig, SessionContext},
sql::planner::ParserOptions,
};
Expand Down Expand Up @@ -67,25 +74,78 @@ impl From<DeltaSessionConfig> for SessionConfig {
}
}

/// A wrapper for Deltafusion's SessionContext to capture sane default table defaults
/// A builder for configuring DataFusion RuntimeEnv with Delta-specific defaults
#[derive(Default)]
pub struct DeltaRuntimeEnvBuilder {
inner: RuntimeEnvBuilder,
}

impl DeltaRuntimeEnvBuilder {
pub fn new() -> Self {
Self {
inner: RuntimeEnvBuilder::new(),
}
}

pub fn with_max_spill_size(mut self, size: usize) -> Self {
let memory_pool = FairSpillPool::new(size);
self.inner = self.inner.with_memory_pool(Arc::new(memory_pool));
self
}

pub fn with_max_temp_directory_size(mut self, size: u64) -> Self {
let disk_manager = DiskManagerBuilder::default().with_max_temp_directory_size(size);
self.inner = self.inner.with_disk_manager_builder(disk_manager);
self
}

pub fn build(self) -> Arc<RuntimeEnv> {
self.inner.build_arc().unwrap()
}
}

/// A wrapper for DataFusion's SessionContext with Delta-specific defaults
///
/// This provides a way of creating DataFusion sessions with consistent
/// Delta Lake configuration (case-sensitive identifiers, Delta planner, etc.)
pub struct DeltaSessionContext {
inner: SessionContext,
}

impl DeltaSessionContext {
/// Create a new DeltaSessionContext with default configuration
pub fn new() -> Self {
let ctx = SessionContext::new_with_config(DeltaSessionConfig::default().into());
let config = DeltaSessionConfig::default().into();
let runtime_env = RuntimeEnvBuilder::new().build_arc().unwrap();
Self::new_with_config_and_runtime(config, runtime_env)
}

/// Create a DeltaSessionContext with a custom RuntimeEnv
pub fn with_runtime_env(runtime_env: Arc<RuntimeEnv>) -> Self {
let config = DeltaSessionConfig::default().into();
Self::new_with_config_and_runtime(config, runtime_env)
}

fn new_with_config_and_runtime(config: SessionConfig, runtime_env: Arc<RuntimeEnv>) -> Self {
let planner = DeltaPlanner::new();
let state = SessionStateBuilder::new_from_existing(ctx.state())
let state = SessionStateBuilder::new()
.with_default_features()
.with_config(config)
.with_runtime_env(runtime_env)
.with_query_planner(planner)
.build();

let inner = SessionContext::new_with_state(state);
Self { inner }
}

pub fn into_inner(self) -> SessionContext {
self.inner
}

pub fn state(&self) -> SessionState {
self.inner.state()
}
}

impl Default for DeltaSessionContext {
Expand Down
57 changes: 29 additions & 28 deletions crates/core/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use datafusion::catalog::Session;
use datafusion::execution::context::SessionState;
use datafusion::execution::memory_pool::FairSpillPool;
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::SessionStateBuilder;
use delta_kernel::engine::arrow_conversion::TryIntoArrow as _;
use delta_kernel::expressions::Scalar;
use delta_kernel::table_properties::DataSkippingNumIndexedCols;
Expand All @@ -51,7 +48,7 @@ use uuid::Uuid;

use super::write::writer::{PartitionWriter, PartitionWriterConfig};
use super::{CustomExecuteHandler, Operation};
use crate::delta_datafusion::DeltaTableProvider;
use crate::delta_datafusion::{DeltaRuntimeEnvBuilder, DeltaSessionContext, DeltaTableProvider};
use crate::errors::{DeltaResult, DeltaTableError};
use crate::kernel::transaction::{CommitBuilder, CommitProperties, DEFAULT_RETRIES, PROTOCOL};
use crate::kernel::{resolve_snapshot, EagerSnapshot};
Expand Down Expand Up @@ -215,8 +212,6 @@ pub struct OptimizeBuilder<'a> {
preserve_insertion_order: bool,
/// Maximum number of concurrent tasks (default is number of cpus)
max_concurrent_tasks: usize,
/// Maximum number of bytes allowed in memory before spilling to disk
max_spill_size: usize,
/// Optimize type
optimize_type: OptimizeType,
/// Datafusion session state relevant for executing the input plan
Expand All @@ -234,6 +229,33 @@ impl super::Operation for OptimizeBuilder<'_> {
}
}

/// Create a SessionState configured for optimize operations with custom spill settings.
///
/// This is the recommended way to configure memory and disk limits for optimize operations.
/// The created SessionState should be passed to [`OptimizeBuilder`] via [`with_session_state`](OptimizeBuilder::with_session_state).
///
/// # Arguments
/// * `max_spill_size` - Maximum bytes in memory before spilling to disk. If `None`, uses DataFusion's default memory pool.
/// * `max_temp_directory_size` - Maximum disk space for temporary spill files. If `None`, uses DataFusion's default disk manager.
pub fn create_session_state_for_optimize(
max_spill_size: Option<usize>,
max_temp_directory_size: Option<u64>,
) -> SessionState {
if max_spill_size.is_none() && max_temp_directory_size.is_none() {
return DeltaSessionContext::new().state();
}

let mut builder = DeltaRuntimeEnvBuilder::new();
if let Some(spill_size) = max_spill_size {
builder = builder.with_max_spill_size(spill_size);
}
if let Some(directory_size) = max_temp_directory_size {
builder = builder.with_max_temp_directory_size(directory_size);
}

DeltaSessionContext::with_runtime_env(builder.build()).state()
}

impl<'a> OptimizeBuilder<'a> {
/// Create a new [`OptimizeBuilder`]
pub(crate) fn new(log_store: LogStoreRef, snapshot: Option<EagerSnapshot>) -> Self {
Expand All @@ -246,7 +268,6 @@ impl<'a> OptimizeBuilder<'a> {
commit_properties: CommitProperties::default(),
preserve_insertion_order: false,
max_concurrent_tasks: num_cpus::get(),
max_spill_size: 20 * 1024 * 1024 * 1024, // 20 GB.
optimize_type: OptimizeType::Compact,
min_commit_interval: None,
session: None,
Expand Down Expand Up @@ -296,16 +317,6 @@ impl<'a> OptimizeBuilder<'a> {
self
}

/// Max spill size
#[deprecated(
since = "0.29.0",
note = "Pass in a `SessionState` configured with a `RuntimeEnv` and a `FairSpillPool`"
)]
pub fn with_max_spill_size(mut self, max_spill_size: usize) -> Self {
self.max_spill_size = max_spill_size;
self
}

/// Min commit interval
pub fn with_min_commit_interval(mut self, min_commit_interval: Duration) -> Self {
self.min_commit_interval = Some(min_commit_interval);
Expand Down Expand Up @@ -348,17 +359,7 @@ impl<'a> std::future::IntoFuture for OptimizeBuilder<'a> {
let session = this
.session
.and_then(|session| session.as_any().downcast_ref::<SessionState>().cloned())
.unwrap_or_else(|| {
let memory_pool = FairSpillPool::new(this.max_spill_size);
let runtime = RuntimeEnvBuilder::new()
.with_memory_pool(Arc::new(memory_pool))
.build_arc()
.unwrap();
SessionStateBuilder::new()
.with_default_features()
.with_runtime_env(runtime)
.build()
});
.unwrap_or_else(|| create_session_state_for_optimize(None, None));
let plan = create_merge_plan(
&this.log_store,
this.optimize_type,
Expand Down
3 changes: 3 additions & 0 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,8 @@ class RawDeltaTable:
partition_filters: PartitionFilterType | None,
target_size: int | None,
max_concurrent_tasks: int | None,
max_spill_size: int | None,
max_temp_directory_size: int | None,
min_commit_interval: int | None,
writer_properties: WriterProperties | None,
commit_properties: CommitProperties | None,
Expand All @@ -129,6 +131,7 @@ class RawDeltaTable:
target_size: int | None,
max_concurrent_tasks: int | None,
max_spill_size: int | None,
max_temp_directory_size: int | None,
min_commit_interval: int | None,
writer_properties: WriterProperties | None,
commit_properties: CommitProperties | None,
Expand Down
13 changes: 11 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1961,6 +1961,8 @@ def compact(
partition_filters: FilterConjunctionType | None = None,
target_size: int | None = None,
max_concurrent_tasks: int | None = None,
max_spill_size: int | None = None,
max_temp_directory_size: int | None = None,
min_commit_interval: int | timedelta | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
Expand All @@ -1983,6 +1985,8 @@ def compact(
max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
Expand Down Expand Up @@ -2016,6 +2020,8 @@ def compact(
self.table._stringify_partition_values(partition_filters),
target_size,
max_concurrent_tasks,
max_spill_size,
max_temp_directory_size,
min_commit_interval,
writer_properties,
commit_properties,
Expand All @@ -2030,7 +2036,8 @@ def z_order(
partition_filters: FilterConjunctionType | None = None,
target_size: int | None = None,
max_concurrent_tasks: int | None = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
max_spill_size: int | None = None,
max_temp_directory_size: int | None = None,
min_commit_interval: int | timedelta | None = None,
writer_properties: WriterProperties | None = None,
post_commithook_properties: PostCommitHookProperties | None = None,
Expand All @@ -2050,7 +2057,8 @@ def z_order(
max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. Defaults to 20GB.
max_spill_size: the maximum number of bytes allowed in memory before spilling to disk. If not specified, uses DataFusion's default.
max_temp_directory_size: the maximum disk space for temporary spill files. If not specified, uses DataFusion's default.
min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
Expand Down Expand Up @@ -2086,6 +2094,7 @@ def z_order(
target_size,
max_concurrent_tasks,
max_spill_size,
max_temp_directory_size,
min_commit_interval,
writer_properties,
commit_properties,
Expand Down
41 changes: 35 additions & 6 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,21 @@ use deltalake::lakefs::LakeFSCustomExecuteHandler;
use deltalake::logstore::LogStoreRef;
use deltalake::logstore::{IORuntime, ObjectStoreRef};
use deltalake::operations::convert_to_delta::{ConvertToDeltaBuilder, PartitionStrategy};
use deltalake::operations::optimize::OptimizeType;
use deltalake::operations::update_table_metadata::TableMetadataUpdate;
use deltalake::operations::vacuum::VacuumMode;
use deltalake::operations::delete::DeleteBuilder;
use deltalake::operations::drop_constraints::DropConstraintBuilder;
use deltalake::operations::filesystem_check::FileSystemCheckBuilder;
use deltalake::operations::load_cdf::CdfLoadBuilder;
use deltalake::operations::optimize::{
create_session_state_for_optimize, OptimizeBuilder, OptimizeType,
};
use deltalake::operations::restore::RestoreBuilder;
use deltalake::operations::set_tbl_properties::SetTablePropertiesBuilder;
use deltalake::operations::update::UpdateBuilder;
use deltalake::operations::update_field_metadata::UpdateFieldMetadataBuilder;
use deltalake::operations::update_table_metadata::{
TableMetadataUpdate, UpdateTableMetadataBuilder,
};
use deltalake::operations::vacuum::{VacuumBuilder, VacuumMode};
use deltalake::operations::write::WriteBuilder;
use deltalake::operations::CustomExecuteHandler;
use deltalake::parquet::basic::{Compression, Encoding};
Expand Down Expand Up @@ -581,6 +593,8 @@ impl RawDeltaTable {
partition_filters = None,
target_size = None,
max_concurrent_tasks = None,
max_spill_size = None,
max_temp_directory_size = None,
min_commit_interval = None,
writer_properties=None,
commit_properties=None,
Expand All @@ -593,6 +607,8 @@ impl RawDeltaTable {
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
target_size: Option<u64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: Option<usize>,
max_temp_directory_size: Option<u64>,
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
commit_properties: Option<PyCommitProperties>,
Expand All @@ -604,6 +620,12 @@ impl RawDeltaTable {
.optimize()
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));

if max_spill_size.is_some() || max_temp_directory_size.is_some() {
let session =
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
cmd = cmd.with_session_state(Arc::new(session));
}

if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
Expand Down Expand Up @@ -646,7 +668,8 @@ impl RawDeltaTable {
partition_filters = None,
target_size = None,
max_concurrent_tasks = None,
max_spill_size = 20 * 1024 * 1024 * 1024,
max_spill_size = None,
max_temp_directory_size = None,
min_commit_interval = None,
writer_properties=None,
commit_properties=None,
Expand All @@ -659,7 +682,8 @@ impl RawDeltaTable {
partition_filters: Option<Vec<(PyBackedStr, PyBackedStr, PartitionFilterValue)>>,
target_size: Option<u64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: usize,
max_spill_size: Option<usize>,
max_temp_directory_size: Option<u64>,
min_commit_interval: Option<u64>,
writer_properties: Option<PyWriterProperties>,
commit_properties: Option<PyCommitProperties>,
Expand All @@ -670,9 +694,14 @@ impl RawDeltaTable {
let mut cmd = DeltaOps(table.clone())
.optimize()
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
.with_max_spill_size(max_spill_size)
.with_type(OptimizeType::ZOrder(z_order_columns));

if max_spill_size.is_some() || max_temp_directory_size.is_some() {
let session =
create_session_state_for_optimize(max_spill_size, max_temp_directory_size);
cmd = cmd.with_session_state(Arc::new(session));
}

if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
Expand Down
Loading
Loading