Skip to content

Commit 5f45d9d

Browse files
feat!: add remove_files API (#1353)
This PR adds the ability to remove files in a Transaction. To accomplish this we re-use FilteredEngineData returned in the scan plan to remove file. In order to not drop fields some additional metadata is not passed through via "fileLevelConstants" struct, and transformed back. This does not add FFI Bindings yet. IMPORTANT: One edge case this introduced is the ability to potentially drop stats if a table only has `parsed_stats` available in checkpoints, as kernel does not currently read them. We should discuss if we are OK to merge given this constraint. Testing: Add unit tests to ensure serialized fields are round-tripped and that scan planning ignores the files. BREAKING_CHANGE: Schema for scan rows is changed but this should mostly be backwards compatible. --------- Co-authored-by: Zach Schuermann <[email protected]>
1 parent 75a31db commit 5f45d9d

File tree

9 files changed

+830
-17
lines changed

9 files changed

+830
-17
lines changed

kernel/src/actions/mod.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,13 @@ static LOG_ADD_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
9797
)]))
9898
});
9999

100+
static LOG_REMOVE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
101+
Arc::new(StructType::new_unchecked([StructField::nullable(
102+
REMOVE_NAME,
103+
Remove::to_schema(),
104+
)]))
105+
});
106+
100107
static LOG_COMMIT_INFO_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
101108
Arc::new(StructType::new_unchecked([StructField::nullable(
102109
COMMIT_INFO_NAME,
@@ -137,6 +144,10 @@ pub(crate) fn get_log_add_schema() -> &'static SchemaRef {
137144
&LOG_ADD_SCHEMA
138145
}
139146

147+
pub(crate) fn get_log_remove_schema() -> &'static SchemaRef {
148+
&LOG_REMOVE_SCHEMA
149+
}
150+
140151
pub(crate) fn get_log_commit_info_schema() -> &'static SchemaRef {
141152
&LOG_COMMIT_INFO_SCHEMA
142153
}

kernel/src/engine/arrow_expression/evaluate_expression.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,9 +225,9 @@ pub fn evaluate_expression(
225225
(Struct(fields), Some(DataType::Struct(output_schema))) => {
226226
evaluate_struct_expression(fields, batch, output_schema)
227227
}
228-
(Struct(_), _) => Err(Error::generic(
229-
"Data type is required to evaluate struct expressions",
230-
)),
228+
(Struct(_), dt) => Err(Error::Generic(format!(
229+
"Struct expression expects a DataType::Struct result, but got {dt:?}"
230+
))),
231231
(Transform(transform), Some(DataType::Struct(output_schema))) => {
232232
evaluate_transform_expression(transform, batch, output_schema)
233233
}

kernel/src/scan/log_replay.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,11 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
272272
}
273273
}
274274

275+
pub(crate) static FILE_CONSTANT_VALUES_NAME: &str = "fileConstantValues";
276+
pub(crate) static BASE_ROW_ID_NAME: &str = "baseRowId";
277+
pub(crate) static DEFAULT_ROW_COMMIT_VERSION_NAME: &str = "defaultRowCommitVersion";
278+
pub(crate) static TAGS_NAME: &str = "tags";
279+
275280
// NB: If you update this schema, ensure you update the comment describing it in the doc comment
276281
// for `scan_row_schema` in scan/mod.rs! You'll also need to update ScanFileVisitor as the
277282
// indexes will be off, and [`get_add_transform_expr`] below to match it.
@@ -280,15 +285,24 @@ pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(||
280285
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
281286
let file_constant_values = StructType::new_unchecked([
282287
StructField::nullable("partitionValues", partition_values),
283-
StructField::nullable("baseRowId", DataType::LONG),
288+
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
289+
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
290+
StructField::nullable(
291+
"tags",
292+
MapType::new(
293+
DataType::STRING,
294+
DataType::STRING,
295+
/*valueContainsNull*/ true,
296+
),
297+
),
284298
]);
285299
Arc::new(StructType::new_unchecked([
286300
StructField::nullable("path", DataType::STRING),
287301
StructField::nullable("size", DataType::LONG),
288302
StructField::nullable("modificationTime", DataType::LONG),
289303
StructField::nullable("stats", DataType::STRING),
290304
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
291-
StructField::nullable("fileConstantValues", file_constant_values),
305+
StructField::nullable(FILE_CONSTANT_VALUES_NAME, file_constant_values),
292306
]))
293307
});
294308

@@ -307,6 +321,8 @@ fn get_add_transform_expr() -> ExpressionRef {
307321
Arc::new(Expression::Struct(vec![
308322
column_expr_ref!("add.partitionValues"),
309323
column_expr_ref!("add.baseRowId"),
324+
column_expr_ref!("add.defaultRowCommitVersion"),
325+
column_expr_ref!("add.tags"),
310326
])),
311327
]))
312328
});
@@ -325,8 +341,10 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
325341
column_expr_ref!("size"),
326342
column_expr_ref!("modificationTime"),
327343
column_expr_ref!("stats"),
344+
column_expr_ref!("fileConstantValues.tags"),
328345
column_expr_ref!("deletionVector"),
329346
column_expr_ref!("fileConstantValues.baseRowId"),
347+
column_expr_ref!("fileConstantValues.defaultRowCommitVersion"),
330348
],
331349
))]))
332350
});

kernel/src/scan/mod.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, EmptyColumnResol
2121
use crate::listed_log_files::ListedLogFiles;
2222
use crate::log_replay::{ActionsBatch, HasSelectionVector};
2323
use crate::log_segment::LogSegment;
24+
use crate::scan::log_replay::BASE_ROW_ID_NAME;
2425
use crate::scan::state::{DvInfo, Stats};
2526
use crate::scan::state_info::StateInfo;
2627
use crate::schema::{
@@ -450,6 +451,8 @@ impl Scan {
450451
_existing_predicate: Option<PredicateRef>,
451452
) -> DeltaResult<Box<dyn Iterator<Item = DeltaResult<ScanMetadata>>>> {
452453
static RESTORED_ADD_SCHEMA: LazyLock<DataType> = LazyLock::new(|| {
454+
use crate::scan::log_replay::DEFAULT_ROW_COMMIT_VERSION_NAME;
455+
453456
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
454457
DataType::struct_type_unchecked(vec![StructField::nullable(
455458
"add",
@@ -459,8 +462,13 @@ impl Scan {
459462
StructField::not_null("size", DataType::LONG),
460463
StructField::nullable("modificationTime", DataType::LONG),
461464
StructField::nullable("stats", DataType::STRING),
465+
StructField::nullable(
466+
"tags",
467+
MapType::new(DataType::STRING, DataType::STRING, true),
468+
),
462469
StructField::nullable("deletionVector", DeletionVectorDescriptor::to_schema()),
463-
StructField::nullable("baseRowId", DataType::LONG),
470+
StructField::nullable(BASE_ROW_ID_NAME, DataType::LONG),
471+
StructField::nullable(DEFAULT_ROW_COMMIT_VERSION_NAME, DataType::LONG),
464472
]),
465473
)])
466474
});
@@ -691,7 +699,9 @@ impl Scan {
691699
/// },
692700
/// fileConstantValues: {
693701
/// partitionValues: map<string, string>,
694-
/// baseRowId: long
702+
/// tags: map<string, string>,
703+
/// baseRowId: long,
704+
/// defaultRowCommitVersion: long,
695705
/// }
696706
/// }
697707
/// ```

kernel/src/scan/state.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,7 @@ impl<T> RowVisitor for ScanFileVisitor<'_, T> {
179179
}
180180
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
181181
require!(
182-
getters.len() == 11,
182+
getters.len() == 13,
183183
Error::InternalError(format!(
184184
"Wrong number of ScanFileVisitor getters: {}",
185185
getters.len()

kernel/src/transaction/mod.rs

Lines changed: 154 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::borrow::Cow;
12
use std::collections::HashSet;
23
use std::iter;
34
use std::ops::Deref;
@@ -8,8 +9,8 @@ use url::Url;
89
use crate::actions::deletion_vector::DeletionVectorPath;
910
use crate::actions::{
1011
as_log_add_schema, domain_metadata::scan_domain_metadatas, get_log_commit_info_schema,
11-
get_log_domain_metadata_schema, get_log_txn_schema, CommitInfo, DomainMetadata, SetTransaction,
12-
INTERNAL_DOMAIN_PREFIX,
12+
get_log_domain_metadata_schema, get_log_remove_schema, get_log_txn_schema, CommitInfo,
13+
DomainMetadata, SetTransaction, INTERNAL_DOMAIN_PREFIX,
1314
};
1415
#[cfg(feature = "catalog-managed")]
1516
use crate::committer::FileSystemCommitter;
@@ -19,12 +20,16 @@ use crate::error::Error;
1920
use crate::expressions::{ArrayData, Transform, UnaryExpressionOp::ToJson};
2021
use crate::path::LogRoot;
2122
use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor};
23+
use crate::scan::log_replay::{
24+
BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, FILE_CONSTANT_VALUES_NAME, TAGS_NAME,
25+
};
26+
use crate::scan::scan_row_schema;
2227
use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType};
2328
use crate::snapshot::SnapshotRef;
2429
use crate::utils::current_time_ms;
2530
use crate::{
2631
DataType, DeltaResult, Engine, EngineData, Expression, ExpressionRef, IntoEngineData,
27-
RowVisitor, Version,
32+
RowVisitor, SchemaTransform, Version,
2833
};
2934
use delta_kernel_derive::internal_api;
3035

@@ -127,6 +132,7 @@ pub struct Transaction {
127132
operation: Option<String>,
128133
engine_info: Option<String>,
129134
add_files_metadata: Vec<Box<dyn EngineData>>,
135+
remove_files_metadata: Vec<FilteredEngineData>,
130136
// NB: hashmap would require either duplicating the appid or splitting SetTransaction
131137
// key/payload. HashSet requires Borrow<&str> with matching Eq, Ord, and Hash. Plus,
132138
// HashSet::insert drops the to-be-inserted value without returning the existing one, which
@@ -181,6 +187,7 @@ impl Transaction {
181187
operation: None,
182188
engine_info: None,
183189
add_files_metadata: vec![],
190+
remove_files_metadata: vec![],
184191
set_transactions: vec![],
185192
commit_timestamp,
186193
domain_metadata_additions: vec![],
@@ -260,14 +267,17 @@ impl Transaction {
260267
let domain_metadata_actions =
261268
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;
262269

263-
// Step 5: Chain all our actions to be handed off to the Committer
270+
// Step 5: Generate remove actions
271+
let remove_actions = self.generate_remove_actions(engine)?;
272+
264273
let actions = iter::once(commit_info_action)
265274
.chain(add_actions)
266275
.chain(set_transaction_actions)
267276
.chain(domain_metadata_actions);
268-
// Convert EngineData to FilteredEngineData with all rows selected
277+
269278
let filtered_actions = actions
270-
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected));
279+
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected))
280+
.chain(remove_actions);
271281

272282
// Step 6: Commit via the committer
273283
#[cfg(feature = "catalog-managed")]
@@ -693,6 +703,144 @@ impl Transaction {
693703
error,
694704
}
695705
}
706+
/// Remove files from the table in this transaction. This API generally enables the engine to
707+
/// delete data (at file-level granularity) from the table. Note that this API can be called
708+
/// multiple times to remove multiple batches.
709+
///
710+
/// The expected schema for `remove_metadata` is given by [`scan_row_schema`]. It is expected
711+
/// this will be the result of passing [`FilteredEngineData`] returned from a scan
712+
/// with the selection vector modified to select rows for removal (selected rows in the selection vector are the ones to be removed).
713+
///
714+
/// # Example
715+
///
716+
/// ```no_run
717+
/// # use std::sync::Arc;
718+
/// # use delta_kernel::Engine;
719+
/// # use delta_kernel::snapshot::Snapshot;
720+
/// # #[cfg(feature = "catalog-managed")]
721+
/// # use delta_kernel::committer::FileSystemCommitter;
722+
/// # fn example(engine: Arc<dyn Engine>, table_url: url::Url) -> delta_kernel::DeltaResult<()> {
723+
/// # #[cfg(feature = "catalog-managed")]
724+
/// # {
725+
/// // Create a snapshot and transaction
726+
/// let snapshot = Snapshot::builder_for(table_url).build(engine.as_ref())?;
727+
/// let mut txn = snapshot.clone().transaction(Box::new(FileSystemCommitter::new()))?;
728+
///
729+
/// // Get file metadata from a scan
730+
/// let scan = snapshot.scan_builder().build()?;
731+
/// let scan_metadata = scan.scan_metadata(engine.as_ref())?;
732+
///
733+
/// // Remove specific files based on scan metadata
734+
/// for metadata in scan_metadata {
735+
/// let metadata = metadata?;
736+
/// // In practice, you would modify the selection vector to choose which files to remove
737+
/// let files_to_remove = metadata.scan_files;
738+
/// txn.remove_files(files_to_remove);
739+
/// }
740+
///
741+
/// // Commit the transaction
742+
/// txn.commit(engine.as_ref())?;
743+
/// # }
744+
/// # Ok(())
745+
/// # }
746+
/// ```
747+
pub fn remove_files(&mut self, remove_metadata: FilteredEngineData) {
748+
self.remove_files_metadata.push(remove_metadata);
749+
}
750+
751+
fn generate_remove_actions<'a>(
752+
&'a self,
753+
engine: &dyn Engine,
754+
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
755+
// This is a workaround due to the fact that expression evaluation happens
756+
// on the whole EngineData instead of accounting for filtered rows, which can lead to null values in
757+
// required fields.
758+
// TODO: Move this to a common place (dedupe from data_skipping.rs) or remove when evaluations work
759+
// on FilteredEngineData directly.
760+
struct NullableStatsTransform;
761+
impl<'a> SchemaTransform<'a> for NullableStatsTransform {
762+
fn transform_struct_field(
763+
&mut self,
764+
field: &'a StructField,
765+
) -> Option<Cow<'a, StructField>> {
766+
use Cow::*;
767+
let field = match self.transform(&field.data_type)? {
768+
Borrowed(_) if field.is_nullable() => Borrowed(field),
769+
data_type => Owned(StructField {
770+
name: field.name.clone(),
771+
data_type: data_type.into_owned(),
772+
nullable: true,
773+
metadata: field.metadata.clone(),
774+
}),
775+
};
776+
Some(field)
777+
}
778+
}
779+
780+
let input_schema = scan_row_schema();
781+
let target_schema = NullableStatsTransform
782+
.transform_struct(get_log_remove_schema())
783+
.ok_or_else(|| Error::generic("Failed to transform remove schema"))?
784+
.into_owned();
785+
let evaluation_handler = engine.evaluation_handler();
786+
787+
// Create the transform expression once, since it only contains literals and column references
788+
let transform = Expression::transform(
789+
Transform::new_top_level()
790+
// deletionTimestamp
791+
.with_inserted_field(
792+
Some("path"),
793+
Expression::literal(self.commit_timestamp).into(),
794+
)
795+
// dataChange
796+
.with_inserted_field(Some("path"), Expression::literal(self.data_change).into())
797+
.with_inserted_field(
798+
// extended_file_metadata
799+
Some("path"),
800+
Expression::literal(true).into(),
801+
)
802+
.with_inserted_field(
803+
Some("path"),
804+
Expression::column([FILE_CONSTANT_VALUES_NAME, "partitionValues"]).into(),
805+
)
806+
// tags
807+
.with_inserted_field(
808+
Some("stats"),
809+
Expression::column([FILE_CONSTANT_VALUES_NAME, TAGS_NAME]).into(),
810+
)
811+
.with_inserted_field(
812+
Some("deletionVector"),
813+
Expression::column([FILE_CONSTANT_VALUES_NAME, BASE_ROW_ID_NAME]).into(),
814+
)
815+
.with_inserted_field(
816+
Some("deletionVector"),
817+
Expression::column([
818+
FILE_CONSTANT_VALUES_NAME,
819+
DEFAULT_ROW_COMMIT_VERSION_NAME,
820+
])
821+
.into(),
822+
)
823+
.with_dropped_field(FILE_CONSTANT_VALUES_NAME)
824+
.with_dropped_field("modificationTime"),
825+
);
826+
let expr = Arc::new(Expression::struct_from([transform]));
827+
let file_action_eval = Arc::new(evaluation_handler.new_expression_evaluator(
828+
input_schema.clone(),
829+
expr.clone(),
830+
target_schema.clone().into(),
831+
)?);
832+
833+
Ok(self
834+
.remove_files_metadata
835+
.iter()
836+
.map(move |file_metadata_batch| {
837+
let updated_engine_data = file_action_eval.evaluate(file_metadata_batch.data())?;
838+
FilteredEngineData::try_new(
839+
updated_engine_data,
840+
file_metadata_batch.selection_vector().to_vec(),
841+
)
842+
}))
843+
}
696844
}
697845

698846
/// WriteContext is data derived from a [`Transaction`] that can be provided to writers in order to

0 commit comments

Comments
 (0)