Skip to content

Commit 3efdae7

Browse files
authored
feat: Add row tracking support (delta-io#1375)
1 parent 9a9f28a commit 3efdae7

File tree

11 files changed

+1011
-473
lines changed

11 files changed

+1011
-473
lines changed

kernel/examples/common/src/lib.rs

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use delta_kernel::{
77
arrow::array::RecordBatch,
88
engine::default::{executor::tokio::TokioBackgroundExecutor, DefaultEngine},
99
scan::Scan,
10-
schema::Schema,
10+
schema::MetadataColumnSpec,
1111
DeltaResult, SnapshotRef,
1212
};
1313

@@ -61,9 +61,18 @@ pub struct ScanArgs {
6161
#[arg(long)]
6262
pub schema_only: bool,
6363

64-
/// Comma separated list of columns to select
65-
#[arg(long, value_delimiter=',', num_args(0..))]
66-
pub columns: Option<Vec<String>>,
64+
/// Comma separated list of columns to select. Must be passed as a single string, leading and
65+
/// trailing spaces for each column name will be trimmed
66+
#[arg(long)]
67+
pub columns: Option<String>,
68+
69+
/// Include a _metadata.row_index field
70+
#[arg(long)]
71+
pub with_row_index: bool,
72+
73+
/// Include a _metadata.row_id field if row-tracking is enabled
74+
#[arg(long)]
75+
pub with_row_id: bool,
6776
}
6877

6978
pub trait ParseWithExamples<T> {
@@ -180,27 +189,26 @@ pub fn get_scan(snapshot: SnapshotRef, args: &ScanArgs) -> DeltaResult<Option<Sc
180189
return Ok(None);
181190
}
182191

183-
let read_schema_opt = args
184-
.columns
185-
.clone()
186-
.map(|cols| -> DeltaResult<_> {
187-
let table_schema = snapshot.schema();
188-
let selected_fields = cols.iter().map(|col| {
189-
table_schema
190-
.field(col)
191-
.cloned()
192-
.ok_or(delta_kernel::Error::Generic(format!(
193-
"Table has no such column: {col}"
194-
)))
195-
});
196-
Schema::try_from_results(selected_fields).map(Arc::new)
197-
})
198-
.transpose()?;
192+
let mut scan_schema = snapshot.schema();
193+
if let Some(cols) = args.columns.as_ref() {
194+
let cols: Vec<&str> = cols.split(",").map(str::trim).collect();
195+
scan_schema = scan_schema.project_as_struct(&cols)?.into();
196+
}
197+
198+
if args.with_row_index {
199+
scan_schema = scan_schema
200+
.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)?
201+
.into();
202+
}
203+
204+
if args.with_row_id {
205+
scan_schema = scan_schema
206+
.add_metadata_column("_metadata.row_index", MetadataColumnSpec::RowIndex)?
207+
.into();
208+
}
209+
199210
Ok(Some(
200-
snapshot
201-
.scan_builder()
202-
.with_schema_opt(read_schema_opt)
203-
.build()?,
211+
snapshot.scan_builder().with_schema(scan_schema).build()?,
204212
))
205213
}
206214

kernel/src/scan/field_classifiers.rs

Lines changed: 11 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ use crate::table_changes::{
66
};
77
use crate::transforms::FieldTransformSpec;
88

9-
/// Trait for classifying fields during StateInfo construction.
10-
/// Allows different scan types (regular, CDF) to customize field handling.
9+
/// Trait for classifying fields during StateInfo construction. Allows different scan types
10+
/// (regular, CDF) to customize field handling. Note that the default set of field handling occurs
11+
/// in [`StateInfo::try_new`](crate::scan::state_info::StateInfo::try_new). A
12+
/// `TransformFieldClassifier` can be used to override the behavior implemented in that method.
1113
pub(crate) trait TransformFieldClassifier {
1214
/// Classify a field and return its transform spec.
1315
/// Returns None if the field is physical (should be read from parquet).
@@ -16,32 +18,19 @@ pub(crate) trait TransformFieldClassifier {
1618
&self,
1719
field: &StructField,
1820
field_index: usize,
19-
partition_columns: &[String],
2021
last_physical_field: &Option<String>,
2122
) -> Option<FieldTransformSpec>;
2223
}
2324

24-
/// Regular scan field classifier for standard Delta table scans.
25-
/// Handles partition columns as metadata-derived fields.
26-
pub(crate) struct ScanTransformFieldClassifier;
27-
impl TransformFieldClassifier for ScanTransformFieldClassifier {
25+
// Empty classifier, always returns None
26+
impl TransformFieldClassifier for () {
2827
fn classify_field(
2928
&self,
30-
field: &StructField,
31-
field_index: usize,
32-
partition_columns: &[String],
33-
last_physical_field: &Option<String>,
29+
_: &StructField,
30+
_: usize,
31+
_: &Option<String>,
3432
) -> Option<FieldTransformSpec> {
35-
if partition_columns.contains(field.name()) {
36-
// Partition column: needs transform to inject metadata
37-
Some(FieldTransformSpec::MetadataDerivedColumn {
38-
field_index,
39-
insert_after: last_physical_field.clone(),
40-
})
41-
} else {
42-
// Regular physical field - no transform needed
43-
None
44-
}
33+
None
4534
}
4635
}
4736

@@ -53,7 +42,6 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier {
5342
&self,
5443
field: &StructField,
5544
field_index: usize,
56-
partition_columns: &[String],
5745
last_physical_field: &Option<String>,
5846
) -> Option<FieldTransformSpec> {
5947
match field.name().as_str() {
@@ -70,13 +58,7 @@ impl TransformFieldClassifier for CdfTransformFieldClassifier {
7058
insert_after: last_physical_field.clone(),
7159
})
7260
}
73-
// Defer to default classifier for partition columns and physical fields
74-
_ => ScanTransformFieldClassifier.classify_field(
75-
field,
76-
field_index,
77-
partition_columns,
78-
last_physical_field,
79-
),
61+
_ => None,
8062
}
8163
}
8264
}

kernel/src/scan/log_replay.rs

Lines changed: 112 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ use std::collections::{HashMap, HashSet};
33
use std::sync::{Arc, LazyLock};
44

55
use super::data_skipping::DataSkippingFilter;
6-
use super::{PhysicalPredicate, ScanMetadata, StateInfo};
6+
use super::state_info::StateInfo;
7+
use super::{PhysicalPredicate, ScanMetadata};
78
use crate::actions::deletion_vector::DeletionVectorDescriptor;
89
use crate::actions::get_log_add_schema;
910
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
@@ -105,8 +106,9 @@ impl AddRemoveDedupVisitor<'_> {
105106
const ADD_PATH_INDEX: usize = 0; // Position of "add.path" in getters
106107
const ADD_PARTITION_VALUES_INDEX: usize = 1; // Position of "add.partitionValues" in getters
107108
const ADD_DV_START_INDEX: usize = 2; // Start position of add deletion vector columns
108-
const REMOVE_PATH_INDEX: usize = 5; // Position of "remove.path" in getters
109-
const REMOVE_DV_START_INDEX: usize = 6; // Start position of remove deletion vector columns
109+
const BASE_ROW_ID_INDEX: usize = 5; // Position of add.baseRowId in getters
110+
const REMOVE_PATH_INDEX: usize = 6; // Position of "remove.path" in getters
111+
const REMOVE_DV_START_INDEX: usize = 7; // Start position of remove deletion vector columns
110112

111113
fn new(
112114
seen: &mut HashSet<FileActionKey>,
@@ -195,10 +197,19 @@ impl AddRemoveDedupVisitor<'_> {
195197
if self.deduplicator.check_and_record_seen(file_key) || !is_add {
196198
return Ok(false);
197199
}
200+
let base_row_id: Option<i64> =
201+
getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
198202
let transform = self
199203
.transform_spec
200204
.as_ref()
201-
.map(|transform| get_transform_expr(transform, partition_values, &self.physical_schema))
205+
.map(|transform| {
206+
get_transform_expr(
207+
transform,
208+
partition_values,
209+
&self.physical_schema,
210+
base_row_id,
211+
)
212+
})
202213
.transpose()?;
203214
if transform.is_some() {
204215
// fill in any needed `None`s for previous rows
@@ -215,13 +226,15 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
215226
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
216227
const STRING: DataType = DataType::STRING;
217228
const INTEGER: DataType = DataType::INTEGER;
229+
const LONG: DataType = DataType::LONG;
218230
let ss_map: DataType = MapType::new(STRING, STRING, true).into();
219231
let types_and_names = vec![
220232
(STRING, column_name!("add.path")),
221233
(ss_map, column_name!("add.partitionValues")),
222234
(STRING, column_name!("add.deletionVector.storageType")),
223235
(STRING, column_name!("add.deletionVector.pathOrInlineDv")),
224236
(INTEGER, column_name!("add.deletionVector.offset")),
237+
(LONG, column_name!("add.baseRowId")),
225238
(STRING, column_name!("remove.path")),
226239
(STRING, column_name!("remove.deletionVector.storageType")),
227240
(STRING, column_name!("remove.deletionVector.pathOrInlineDv")),
@@ -236,13 +249,13 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
236249
} else {
237250
// All checkpoint actions are already reconciled and Remove actions in checkpoint files
238251
// only serve as tombstones for vacuum jobs. So we only need to examine the adds here.
239-
(&names[..5], &types[..5])
252+
(&names[..6], &types[..6])
240253
}
241254
}
242255

243256
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
244257
let is_log_batch = self.deduplicator.is_log_batch();
245-
let expected_getters = if is_log_batch { 9 } else { 5 };
258+
let expected_getters = if is_log_batch { 10 } else { 6 };
246259
require!(
247260
getters.len() == expected_getters,
248261
Error::InternalError(format!(
@@ -266,8 +279,10 @@ impl RowVisitor for AddRemoveDedupVisitor<'_> {
266279
pub(crate) static SCAN_ROW_SCHEMA: LazyLock<Arc<StructType>> = LazyLock::new(|| {
267280
// Note that fields projected out of a nullable struct must be nullable
268281
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
269-
let file_constant_values =
270-
StructType::new_unchecked([StructField::nullable("partitionValues", partition_values)]);
282+
let file_constant_values = StructType::new_unchecked([
283+
StructField::nullable("partitionValues", partition_values),
284+
StructField::nullable("baseRowId", DataType::LONG),
285+
]);
271286
Arc::new(StructType::new_unchecked([
272287
StructField::nullable("path", DataType::STRING),
273288
StructField::nullable("size", DataType::LONG),
@@ -290,9 +305,10 @@ fn get_add_transform_expr() -> ExpressionRef {
290305
column_expr_ref!("add.modificationTime"),
291306
column_expr_ref!("add.stats"),
292307
column_expr_ref!("add.deletionVector"),
293-
Arc::new(Expression::Struct(vec![column_expr_ref!(
294-
"add.partitionValues"
295-
)])),
308+
Arc::new(Expression::Struct(vec![
309+
column_expr_ref!("add.partitionValues"),
310+
column_expr_ref!("add.baseRowId"),
311+
])),
296312
]))
297313
});
298314
EXPR.clone()
@@ -311,6 +327,7 @@ pub(crate) fn get_scan_metadata_transform_expr() -> ExpressionRef {
311327
column_expr_ref!("modificationTime"),
312328
column_expr_ref!("stats"),
313329
column_expr_ref!("deletionVector"),
330+
column_expr_ref!("fileConstantValues.baseRowId"),
314331
],
315332
))]))
316333
});
@@ -377,15 +394,19 @@ mod tests {
377394
use std::{collections::HashMap, sync::Arc};
378395

379396
use crate::actions::get_log_schema;
380-
use crate::expressions::Scalar;
397+
use crate::expressions::{BinaryExpressionOp, Scalar, VariadicExpressionOp};
381398
use crate::log_replay::ActionsBatch;
382399
use crate::scan::state::{DvInfo, Stats};
400+
use crate::scan::state_info::tests::{
401+
assert_transform_spec, get_simple_state_info, get_state_info,
402+
};
403+
use crate::scan::state_info::StateInfo;
383404
use crate::scan::test_utils::{
384-
add_batch_simple, add_batch_with_partition_col, add_batch_with_remove,
385-
run_with_validate_callback,
405+
add_batch_for_row_id, add_batch_simple, add_batch_with_partition_col,
406+
add_batch_with_remove, run_with_validate_callback,
386407
};
387-
use crate::scan::{PhysicalPredicate, StateInfo};
388-
use crate::table_features::ColumnMappingMode;
408+
use crate::scan::PhysicalPredicate;
409+
use crate::schema::MetadataColumnSpec;
389410
use crate::Expression as Expr;
390411
use crate::{
391412
engine::sync::SyncEngine,
@@ -473,15 +494,8 @@ mod tests {
473494
StructField::new("value", DataType::INTEGER, true),
474495
StructField::new("date", DataType::DATE, true),
475496
]));
476-
let partition_cols = ["date".to_string()];
477-
let state_info = StateInfo::try_new(
478-
schema.clone(),
479-
&partition_cols,
480-
ColumnMappingMode::None,
481-
None,
482-
crate::scan::field_classifiers::ScanTransformFieldClassifier,
483-
)
484-
.unwrap();
497+
let partition_cols = vec!["date".to_string()];
498+
let state_info = get_simple_state_info(schema, partition_cols).unwrap();
485499
let batch = vec![add_batch_with_partition_col()];
486500
let iter = scan_action_iter(
487501
&SyncEngine::new(),
@@ -525,4 +539,77 @@ mod tests {
525539
validate_transform(transforms[3].as_ref(), 17510);
526540
}
527541
}
542+
543+
#[test]
544+
fn test_row_id_transform() {
545+
let schema: SchemaRef = Arc::new(StructType::new_unchecked([StructField::new(
546+
"value",
547+
DataType::INTEGER,
548+
true,
549+
)]));
550+
let state_info = get_state_info(
551+
schema.clone(),
552+
vec![],
553+
None,
554+
[
555+
("delta.enableRowTracking", "true"),
556+
(
557+
"delta.rowTracking.materializedRowIdColumnName",
558+
"row_id_col",
559+
),
560+
]
561+
.iter()
562+
.map(|(k, v)| (k.to_string(), v.to_string()))
563+
.collect(),
564+
vec![("row_id", MetadataColumnSpec::RowId)],
565+
)
566+
.unwrap();
567+
568+
let transform_spec = state_info.transform_spec.as_ref().unwrap();
569+
assert_transform_spec(
570+
transform_spec,
571+
false,
572+
"row_id_col",
573+
"row_indexes_for_row_id_0",
574+
);
575+
576+
let batch = vec![add_batch_for_row_id(get_log_schema().clone())];
577+
let iter = scan_action_iter(
578+
&SyncEngine::new(),
579+
batch
580+
.into_iter()
581+
.map(|batch| Ok(ActionsBatch::new(batch as _, true))),
582+
Arc::new(state_info),
583+
);
584+
585+
for res in iter {
586+
let scan_metadata = res.unwrap();
587+
let transforms = scan_metadata.scan_file_transforms;
588+
assert_eq!(transforms.len(), 1, "Should have 1 transform");
589+
if let Some(Expr::Transform(transform_expr)) = transforms[0].as_ref().map(Arc::as_ref) {
590+
assert!(transform_expr.input_path.is_none());
591+
let row_id_transform = transform_expr
592+
.field_transforms
593+
.get("row_id_col")
594+
.expect("Should have row_id_col transform");
595+
assert!(row_id_transform.is_replace);
596+
assert_eq!(row_id_transform.exprs.len(), 1);
597+
let expr = &row_id_transform.exprs[0];
598+
let expeceted_expr = Arc::new(Expr::variadic(
599+
VariadicExpressionOp::Coalesce,
600+
vec![
601+
Expr::column(["row_id_col"]),
602+
Expr::binary(
603+
BinaryExpressionOp::Plus,
604+
Expr::literal(42i64),
605+
Expr::column(["row_indexes_for_row_id_0"]),
606+
),
607+
],
608+
));
609+
assert_eq!(expr, &expeceted_expr);
610+
} else {
611+
panic!("Should have been a transform expression");
612+
}
613+
}
614+
}
528615
}

0 commit comments

Comments
 (0)