Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/DeltaLake/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ internal unsafe partial struct VacuumOptions
[NativeTypeName("bool")]
public byte enforce_retention_duration;

[NativeTypeName("uint32_t")]
public uint vacuum_mode;

[NativeTypeName("struct Map *")]
public Map* custom_metadata;
}
Expand Down
1 change: 1 addition & 0 deletions src/DeltaLake/Bridge/Table.cs
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,7 @@ internal virtual async Task VacuumAsync(DeltaLake.Table.VacuumOptions options, I
dry_run = BoolAsByte(options.DryRun),
retention_hours = new UIntPtr(options.RetentionHours ?? 0),
enforce_retention_duration = BoolAsByte(options.RetentionHours != null),
vacuum_mode = (uint)options.VacuumMode,
custom_metadata = options.CustomMetadata != null ? scope.Dictionary(_runtime, options.CustomMetadata!) : null,
};

Expand Down
1 change: 1 addition & 0 deletions src/DeltaLake/Bridge/include/delta-lake-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ typedef struct VacuumOptions {
bool dry_run;
uint64_t retention_hours;
bool enforce_retention_duration;
uint32_t vacuum_mode;
struct Map *custom_metadata;
} VacuumOptions;

Expand Down
14 changes: 12 additions & 2 deletions src/DeltaLake/Bridge/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use deltalake::{
kernel::{transaction::CommitProperties, StructType},
operations::{
constraints::ConstraintBuilder, delete::DeleteBuilder, merge::MergeBuilder,
update::UpdateBuilder, vacuum::VacuumBuilder,
update::UpdateBuilder, vacuum::{VacuumBuilder, VacuumMode},
optimize::OptimizeBuilder,
write::WriteBuilder,
},
Expand Down Expand Up @@ -263,6 +263,7 @@ pub struct VacuumOptions {
dry_run: bool,
retention_hours: u64,
enforce_retention_duration: bool,
vacuum_mode: u32,
custom_metadata: *mut Map,
}

Expand Down Expand Up @@ -1401,18 +1402,24 @@ pub extern "C" fn table_vacuum(
options: NonNull<VacuumOptions>,
callback: GenericErrorCallback,
) {
let (dry_run, retention_hours, enforce_retention_duration, custom_metadata) = unsafe {
let (dry_run, retention_hours, enforce_retention_duration, vacuum_mode, custom_metadata) = unsafe {
let options = options.as_ref();
let retention_hours = if options.retention_hours > 0 {
Some(options.retention_hours)
} else {
None
};
let custom_metadata = Map::into_hash_map(options.custom_metadata);
let vacuum_mode = match options.vacuum_mode {
0 => VacuumMode::Lite,
1 => VacuumMode::Full,
_ => VacuumMode::Full,
};
(
options.dry_run,
retention_hours,
options.enforce_retention_duration,
vacuum_mode,
custom_metadata,
)
};
Expand All @@ -1428,6 +1435,7 @@ pub extern "C" fn table_vacuum(
dry_run,
retention_hours,
enforce_retention_duration,
vacuum_mode,
custom_metadata,
)
.await
Expand Down Expand Up @@ -1495,6 +1503,7 @@ async fn vacuum(
dry_run: bool,
retention_hours: Option<u64>,
enforce_retention_duration: bool,
vacuum_mode: VacuumMode,
custom_metadata: Option<HashMap<String, String>>,
) -> Result<Vec<String>, deltalake::DeltaTableError> {
if table.state.is_none() {
Expand All @@ -1503,6 +1512,7 @@ async fn vacuum(

let mut cmd = VacuumBuilder::new(table.log_store(), table.state.clone().unwrap())
.with_enforce_retention_duration(enforce_retention_duration)
.with_mode(vacuum_mode)
.with_dry_run(dry_run);

if let Some(retention_period) = retention_hours {
Expand Down
22 changes: 22 additions & 0 deletions src/DeltaLake/Table/VacuumOptions.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Collections.Generic;

namespace DeltaLake.Table
{
/// <summary>
Expand All @@ -20,5 +21,26 @@ public class VacuumOptions
/// Custom metadata to add to the operations
/// </summary>
public Dictionary<string, string>? CustomMetadata { get; init; } = new Dictionary<string, string>();

/// <summary>
/// The vacuum mode to use.
/// </summary>
public VacuumMode VacuumMode { get; init; } = VacuumMode.Lite;
}

/// <summary>
/// The vacuum mode to use
/// </summary>
public enum VacuumMode
{
/// <summary>
/// Run the operation in lite mode (only remove files which are referenced in the `_delta_log` associated with `remove` action)
/// </summary>
Lite,

/// <summary>
/// Run the operation in full mode (remove _all_ data files no longer actively referenced in the `_delta_log` table)
/// </summary>
Full,
}
}
47 changes: 47 additions & 0 deletions tests/DeltaLake.Tests/Table/VacuumTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Apache.Arrow;
using DeltaLake.Table;

namespace DeltaLake.Tests.Table;

public sealed class VacuumTests
{
[Fact]
public async Task VacuumTestWithCustomOptions()
{
await BaseVacuumTest(new VacuumOptions
{
VacuumMode = VacuumMode.Full
});
}

[Fact]
public async Task VacuumTestWithDefaultOptions()
{
await BaseVacuumTest(new VacuumOptions());
}

private async Task BaseVacuumTest(VacuumOptions options)
{
using var source = new CancellationTokenSource(30_000);
var data = await TableHelpers.SetupTable($"memory:///{Guid.NewGuid():N}", 10_000);
using var table = data.table;

await table.VacuumAsync(options, source.Token);

long count = 0;
await foreach (var recordBatch in table.QueryAsync(new("SELECT COUNT(*) FROM deltatable"), source.Token))
{
switch (recordBatch.Column(0))
{
case Int32Array integers:
count = integers.GetValue(0)!.Value;
break;
case Int64Array longs:
count = longs.GetValue(0)!.Value;
break;
}
}

Assert.Equal(10_000, count);
}
}
Loading