Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions src/DeltaLake/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,43 @@ internal unsafe partial struct ProtocolResponse
public DeltaTableError* error;
}

internal unsafe partial struct OptimizeOptions
{
[NativeTypeName("bool")]
public byte has_max_concurrent_tasks;
[NativeTypeName("uint32_t")]
public uint max_concurrent_tasks;

[NativeTypeName("bool")]
public byte has_max_spill_size;
[NativeTypeName("uint64_t")]
public ulong max_spill_size;

[NativeTypeName("bool")]
public byte has_min_commit_interval;
[NativeTypeName("uint64_t")]
public ulong min_commit_interval;

[NativeTypeName("bool")]
public byte has_preserve_insertion_order;
[NativeTypeName("bool")]
public byte preserve_insertion_order;

[NativeTypeName("bool")]
public byte has_target_size;
[NativeTypeName("uint64_t")]
public ulong target_size;

[NativeTypeName("const struct ByteArrayRef *")]
public ByteArrayRef* zorder_columns;

[NativeTypeName("uintptr_t")]
public UIntPtr zorder_columns_count;

[NativeTypeName("uint32_t")]
public uint optimize_type;
}

internal unsafe partial struct VacuumOptions
{
[NativeTypeName("bool")]
Expand Down Expand Up @@ -424,6 +461,9 @@ internal static unsafe partial class Methods
[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_checkpoint([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("const struct CancellationToken *")] CancellationToken* cancellation_token, [NativeTypeName("TableEmptyCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_optimize([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("struct OptimizeOptions * _Nonnull")] OptimizeOptions* options, [NativeTypeName("GenericErrorCallback")] IntPtr callback);

[DllImport("delta_rs_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void table_vacuum([NativeTypeName("struct Runtime * _Nonnull")] Runtime* runtime, [NativeTypeName("struct RawDeltaTable * _Nonnull")] RawDeltaTable* table, [NativeTypeName("struct VacuumOptions * _Nonnull")] VacuumOptions* options, [NativeTypeName("GenericErrorCallback")] IntPtr callback);

Expand Down
56 changes: 56 additions & 0 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading.Tasks;
using Apache.Arrow;
Expand Down Expand Up @@ -614,6 +615,61 @@ internal virtual async Task RestoreAsync(RestoreOptions options, ICancellationTo
}
}

internal virtual async Task OptimizeAsync(DeltaLake.Table.OptimizeOptions options, ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<bool>();
using (var scope = new Scope())
{
unsafe
{
var interopOptions = new Interop.OptimizeOptions
{
has_max_concurrent_tasks = BoolAsByte(options.MaxConcurrentTasks.HasValue),
max_concurrent_tasks = options.MaxConcurrentTasks.GetValueOrDefault(),

has_max_spill_size = BoolAsByte(options.MaxSpillSize.HasValue),
max_spill_size = options.MaxSpillSize.GetValueOrDefault(),

has_min_commit_interval = BoolAsByte(options.MinCommitInterval.HasValue),
min_commit_interval = (ulong)options.MinCommitInterval.GetValueOrDefault().Ticks,

has_preserve_insertion_order = BoolAsByte(options.PreserveInsertionOrder.HasValue),
preserve_insertion_order = BoolAsByte(options.PreserveInsertionOrder.GetValueOrDefault()),

has_target_size = BoolAsByte(options.TargetSize.HasValue),
target_size = options.TargetSize.GetValueOrDefault(),

zorder_columns = scope.ArrayPointer(options.ZOrderColumns?.Select(x => scope.ByteArray(x)).ToArray() ?? System.Array.Empty<Interop.ByteArrayRef>()),
zorder_columns_count = (nuint)(options.ZOrderColumns?.Count ?? 0),

optimize_type = (uint)options.OptimizeType,
};

Methods.table_optimize(
_runtime.Ptr,
_ptr,
scope.Pointer(interopOptions),
scope.FunctionPointer<GenericErrorCallback>((_success, fail) =>
{
if (cancellationToken.IsCancellationRequested)
{
tsc.TrySetCanceled(cancellationToken);
}
else if (fail != null)
{
tsc.TrySetException(DeltaRuntimeException.FromDeltaTableError(_runtime.Ptr, fail));
}
else
{
_ = Task.Run(() => tsc.TrySetResult(true));
}
}));
}

await tsc.Task.ConfigureAwait(false);
}
}

internal virtual async Task VacuumAsync(DeltaLake.Table.VacuumOptions options, ICancellationToken cancellationToken)
{
var tsc = new TaskCompletionSource<bool>();
Expand Down
21 changes: 21 additions & 0 deletions src/DeltaLake/Bridge/include/delta-lake-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,22 @@ typedef struct ProtocolResponse {
const struct DeltaTableError *error;
} ProtocolResponse;

typedef struct OptimizeOptions {
bool has_max_concurrent_tasks;
uint32_t max_concurrent_tasks;
bool has_max_spill_size;
uint64_t max_spill_size;
bool has_min_commit_interval;
uint64_t min_commit_interval;
bool has_preserve_insertion_order;
bool preserve_insertion_order;
bool has_target_size;
uint64_t target_size;
const struct ByteArrayRef *zorder_columns;
uintptr_t zorder_columns_count;
uint32_t optimize_type;
} OptimizeOptions;

typedef struct VacuumOptions {
bool dry_run;
uint64_t retention_hours;
Expand Down Expand Up @@ -379,6 +395,11 @@ void table_checkpoint(struct Runtime *_Nonnull runtime,
const struct CancellationToken *cancellation_token,
TableEmptyCallback callback);

void table_optimize(struct Runtime *_Nonnull runtime,
struct RawDeltaTable *_Nonnull table,
struct OptimizeOptions *_Nonnull options,
GenericErrorCallback callback);

void table_vacuum(struct Runtime *_Nonnull runtime,
struct RawDeltaTable *_Nonnull table,
struct VacuumOptions *_Nonnull options,
Expand Down
129 changes: 129 additions & 0 deletions src/DeltaLake/Bridge/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use deltalake::{
operations::{
constraints::ConstraintBuilder, delete::DeleteBuilder, merge::MergeBuilder,
transaction::CommitProperties, update::UpdateBuilder, vacuum::VacuumBuilder,
optimize::OptimizeBuilder,
write::WriteBuilder,
},
protocol::SaveMode,
Expand Down Expand Up @@ -234,6 +235,29 @@ pub struct MetadataOrError {
error: *const DeltaTableError,
}

#[repr(C)]
pub struct OptimizeOptions {
has_max_concurrent_tasks: bool,
max_concurrent_tasks: u32,

has_max_spill_size: bool,
max_spill_size: u64,

has_min_commit_interval: bool,
min_commit_interval: u64,

has_preserve_insertion_order: bool,
preserve_insertion_order: bool,

has_target_size: bool,
target_size: u64,

zorder_columns: *const ByteArrayRef,
zorder_columns_count: usize,

optimize_type: u32,
}

#[repr(C)]
pub struct VacuumOptions {
dry_run: bool,
Expand Down Expand Up @@ -1307,6 +1331,69 @@ pub extern "C" fn table_checkpoint(
);
}

#[no_mangle]
pub extern "C" fn table_optimize(
mut runtime: NonNull<Runtime>,
mut table: NonNull<RawDeltaTable>,
options: NonNull<OptimizeOptions>,
callback: GenericErrorCallback,
) {
let (
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
preserve_insertion_order,
target_size,
zorder_columns,
optimize_type,
) = unsafe {
let options = options.as_ref();
let zorder_columns = std::slice::from_raw_parts(options.zorder_columns, options.zorder_columns_count);
(
if options.has_max_concurrent_tasks { Some(options.max_concurrent_tasks) } else { None },
if options.has_max_spill_size { Some(options.max_spill_size) } else { None },
if options.has_min_commit_interval { Some(options.min_commit_interval) } else { None },
if options.has_preserve_insertion_order { Some(options.preserve_insertion_order) } else { None },
if options.has_target_size { Some(options.target_size) } else { None },
zorder_columns
.iter()
.map(|b| b.to_owned_string())
.collect::<Vec<String>>(),
options.optimize_type,
)
};
run_async_with_cancellation!(
runtime,
table,
None::<&CancellationToken>,
rt,
tbl,
{
match optimize(
&mut tbl.table,
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
preserve_insertion_order,
target_size,
zorder_columns,
optimize_type,
).await {
Ok(num_files_removed) => {
unsafe {
callback(num_files_removed as usize as *const c_void, std::ptr::null());
}
}
Err(err) => {
let error = DeltaTableError::from_error(rt, err);
unsafe { callback(std::ptr::null_mut(), Box::into_raw(Box::new(error))) }
}
}
},
{ callback(std::ptr::null(), std::ptr::null()) }
);
}

#[no_mangle]
pub extern "C" fn table_vacuum(
mut runtime: NonNull<Runtime>,
Expand Down Expand Up @@ -1361,6 +1448,48 @@ pub extern "C" fn table_vacuum(
);
}

async fn optimize(
table: &mut deltalake::DeltaTable,
max_concurrent_tasks: Option<u32>,
max_spill_size: Option<u64>,
min_commit_interval: Option<u64>,
preserve_insertion_order: Option<bool>,
target_size: Option<u64>,
zorder_columns : Vec<String>,
optimize_type: u32,
) -> Result<u64, deltalake::DeltaTableError> {
if table.state.is_none() {
return Err(deltalake::DeltaTableError::NoMetadata);
}

let mut cmd = OptimizeBuilder::new(table.log_store(), table.state.clone().unwrap());
if let Some(tasks) = max_concurrent_tasks {
cmd = cmd.with_max_concurrent_tasks(tasks as usize);
}
if let Some(spill) = max_spill_size {
cmd = cmd.with_max_spill_size(spill as usize);
}
if let Some(interval_ticks) = min_commit_interval {
cmd = cmd.with_min_commit_interval(std::time::Duration::from_nanos(interval_ticks * 100)); // .NET ticks are 100ns units
}
if let Some(preserve) = preserve_insertion_order {
cmd = cmd.with_preserve_insertion_order(preserve);
}
if let Some(target) = target_size {
cmd = cmd.with_target_size(target as i64);
}

let opt_type = match optimize_type {
2 => deltalake::operations::optimize::OptimizeType::ZOrder(zorder_columns),
_ => deltalake::operations::optimize::OptimizeType::Compact,
};
cmd = cmd.with_type(opt_type);

let (result, metrics) = cmd.await?;
table.state = result.state;
Ok(metrics.num_files_removed)
}

async fn vacuum(
table: &mut deltalake::DeltaTable,
dry_run: bool,
Expand Down
8 changes: 8 additions & 0 deletions src/DeltaLake/Interfaces/ITable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ public interface ITable : IDisposable
/// <returns>A <see cref="Task"/>representing applying the constraint operation.</returns>
Task CheckpointAsync(CancellationToken cancellationToken);

/// <summary>
/// Optimize an existing table
/// </summary>
/// <param name="options">Options for the operation</param>
/// <param name="cancellationToken">A <see cref="System.Threading.CancellationToken">cancellation token</see>.</param>
/// <returns>A <see cref="Task"/>representing applying the optimize operation.</returns>
Task OptimizeAsync(OptimizeOptions options, CancellationToken cancellationToken);

/// <summary>
/// Vacuum an existing table
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/DeltaLake/Table/DeltaTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,12 @@ public async Task CheckpointAsync(CancellationToken cancellationToken)
await table.CheckpointAsync(cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task OptimizeAsync(OptimizeOptions options, CancellationToken cancellationToken)
{
await table.OptimizeAsync(options, cancellationToken).ConfigureAwait(false);
}

/// <inheritdoc/>
public async Task VacuumAsync(VacuumOptions options, CancellationToken cancellationToken)
{
Expand Down
Loading
Loading