Skip to content

Commit cfd3e26

Browse files
aleksandarskrbicnicklanzachschuermann
authored
feat: pass ColumnMappingMode to physical_name (#1403)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-incubator/delta-kernel-rs/blob/main/CONTRIBUTING.md 2. Run `cargo t --all-features --all-targets` to get started testing, and run `cargo fmt`. 3. Ensure you have added or run the appropriate tests for your PR. 4. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 5. Be sure to keep the PR description updated to reflect all changes. --> Fixes #1120 <!-- PR title formatting: This project uses conventional commits: https://www.conventionalcommits.org/ Each PR corresponds to a commit on the `main` branch, with the title of the PR (typically) being used for the commit message on main. In order to ensure proper formatting in the CHANGELOG please ensure your PR title adheres to the conventional commit specification. Examples: - new feature PR: "feat: new API for snapshot.update()" - bugfix PR: "fix: correctly apply DV in read-table example" --> ## What changes are proposed in this pull request? This PR makes StructField::physical_name() aware of ColumnMappingMode by adding it as a required parameter. Previously, physical_name() would always return the physical name from metadata if present, regardless of the column mapping mode. This was incorrect behavior - when column mapping mode is None, the logical name should be used even if physical name metadata exists in the field. The key changes are: 1. Updated StructField::physical_name() to take column_mapping_mode: ColumnMappingMode parameter 2. When mode is None, always return the logical name 3. When mode is Id or Name, return the physical name from metadata (or logical name as fallback) 4. Added column_mapping_mode field to StateInfo struct to avoid threading it through many function calls 5. Updated all call sites throughout the codebase to pass the column mapping mode 6. Refactored AddRemoveDedupVisitor to accept Arc<StateInfo> instead of 8 individual parameters, following feedback to avoid excessive parameter passing <!-- Uncomment this section if there are any changes affecting public APIs: ### This PR affects the following public APIs If there are breaking changes, please ensure the `breaking-changes` label gets added by CI, and describe why the changes are needed. Note that _new_ public APIs are not considered breaking. --> --------- Co-authored-by: Nick Lanham <[email protected]> Co-authored-by: Zach Schuermann <[email protected]>
1 parent 4ea0ef6 commit cfd3e26

File tree

7 files changed

+86
-42
lines changed

7 files changed

+86
-42
lines changed

kernel/src/scan/log_replay.rs

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ use crate::kernel_predicates::{DefaultKernelPredicateEvaluator, KernelPredicateE
1313
use crate::log_replay::{ActionsBatch, FileActionDeduplicator, FileActionKey, LogReplayProcessor};
1414
use crate::scan::Scalar;
1515
use crate::schema::ToSchema as _;
16-
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, SchemaRef, StructField, StructType};
17-
use crate::transforms::{get_transform_expr, parse_partition_values, TransformSpec};
16+
use crate::schema::{ColumnNamesAndTypes, DataType, MapType, StructField, StructType};
17+
use crate::transforms::{get_transform_expr, parse_partition_values};
1818
use crate::utils::require;
1919
use crate::{DeltaResult, Engine, Error, ExpressionEvaluator};
2020

@@ -93,9 +93,7 @@ impl ScanLogReplayProcessor {
9393
struct AddRemoveDedupVisitor<'seen> {
9494
deduplicator: FileActionDeduplicator<'seen>,
9595
selection_vector: Vec<bool>,
96-
logical_schema: SchemaRef,
97-
physical_schema: SchemaRef,
98-
transform_spec: Option<Arc<TransformSpec>>,
96+
state_info: Arc<StateInfo>,
9997
partition_filter: Option<PredicateRef>,
10098
row_transform_exprs: Vec<Option<ExpressionRef>>,
10199
}
@@ -113,9 +111,7 @@ impl AddRemoveDedupVisitor<'_> {
113111
fn new(
114112
seen: &mut HashSet<FileActionKey>,
115113
selection_vector: Vec<bool>,
116-
logical_schema: SchemaRef,
117-
physical_schema: SchemaRef,
118-
transform_spec: Option<Arc<TransformSpec>>,
114+
state_info: Arc<StateInfo>,
119115
partition_filter: Option<PredicateRef>,
120116
is_log_batch: bool,
121117
) -> AddRemoveDedupVisitor<'_> {
@@ -129,9 +125,7 @@ impl AddRemoveDedupVisitor<'_> {
129125
Self::REMOVE_DV_START_INDEX,
130126
),
131127
selection_vector,
132-
logical_schema,
133-
physical_schema,
134-
transform_spec,
128+
state_info,
135129
partition_filter,
136130
row_transform_exprs: Vec::new(),
137131
}
@@ -179,12 +173,16 @@ impl AddRemoveDedupVisitor<'_> {
179173
// WARNING: It's not safe to partition-prune removes (just like it's not safe to data skip
180174
// removes), because they are needed to suppress earlier incompatible adds we might
181175
// encounter if the table's schema was replaced after the most recent checkpoint.
182-
let partition_values = match &self.transform_spec {
176+
let partition_values = match &self.state_info.transform_spec {
183177
Some(transform) if is_add => {
184178
let partition_values =
185179
getters[Self::ADD_PARTITION_VALUES_INDEX].get(i, "add.partitionValues")?;
186-
let partition_values =
187-
parse_partition_values(&self.logical_schema, transform, &partition_values)?;
180+
let partition_values = parse_partition_values(
181+
&self.state_info.logical_schema,
182+
transform,
183+
&partition_values,
184+
self.state_info.column_mapping_mode,
185+
)?;
188186
if self.is_file_partition_pruned(&partition_values) {
189187
return Ok(false);
190188
}
@@ -200,13 +198,14 @@ impl AddRemoveDedupVisitor<'_> {
200198
let base_row_id: Option<i64> =
201199
getters[Self::BASE_ROW_ID_INDEX].get_opt(i, "add.baseRowId")?;
202200
let transform = self
201+
.state_info
203202
.transform_spec
204203
.as_ref()
205204
.map(|transform| {
206205
get_transform_expr(
207206
transform,
208207
partition_values,
209-
&self.physical_schema,
208+
&self.state_info.physical_schema,
210209
base_row_id,
211210
)
212211
})
@@ -351,9 +350,7 @@ impl LogReplayProcessor for ScanLogReplayProcessor {
351350
let mut visitor = AddRemoveDedupVisitor::new(
352351
&mut self.seen_file_keys,
353352
selection_vector,
354-
self.state_info.logical_schema.clone(),
355-
self.state_info.physical_schema.clone(),
356-
self.state_info.transform_spec.clone(),
353+
self.state_info.clone(),
357354
self.partition_filter.clone(),
358355
is_log_batch,
359356
);
@@ -407,6 +404,7 @@ mod tests {
407404
};
408405
use crate::scan::PhysicalPredicate;
409406
use crate::schema::MetadataColumnSpec;
407+
use crate::table_features::ColumnMappingMode;
410408
use crate::Expression as Expr;
411409
use crate::{
412410
engine::sync::SyncEngine,
@@ -471,6 +469,7 @@ mod tests {
471469
physical_schema: logical_schema.clone(),
472470
physical_predicate: PhysicalPredicate::None,
473471
transform_spec: None,
472+
column_mapping_mode: ColumnMappingMode::None,
474473
});
475474
let iter = scan_action_iter(
476475
&SyncEngine::new(),

kernel/src/scan/mod.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::schema::{
2727
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, SchemaTransform, StructField,
2828
ToSchema as _,
2929
};
30+
use crate::table_features::ColumnMappingMode;
3031
use crate::{DeltaResult, Engine, EngineData, Error, FileMeta, SnapshotRef, Version};
3132

3233
use self::log_replay::scan_action_iter;
@@ -151,6 +152,7 @@ impl PhysicalPredicate {
151152
pub(crate) fn try_new(
152153
predicate: &Predicate,
153154
logical_schema: &Schema,
155+
column_mapping_mode: ColumnMappingMode,
154156
) -> DeltaResult<PhysicalPredicate> {
155157
if can_statically_skip_all_files(predicate) {
156158
return Ok(PhysicalPredicate::StaticSkipAll);
@@ -160,6 +162,7 @@ impl PhysicalPredicate {
160162
column_mappings: HashMap::new(),
161163
logical_path: vec![],
162164
physical_path: vec![],
165+
column_mapping_mode,
163166
};
164167
let schema_opt = get_referenced_fields.transform_struct(logical_schema);
165168
let mut unresolved = get_referenced_fields.unresolved_references.into_iter();
@@ -210,6 +213,7 @@ struct GetReferencedFields<'a> {
210213
column_mappings: HashMap<ColumnName, ColumnName>,
211214
logical_path: Vec<String>,
212215
physical_path: Vec<String>,
216+
column_mapping_mode: ColumnMappingMode,
213217
}
214218
impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> {
215219
// Capture the path mapping for this leaf field
@@ -235,7 +239,7 @@ impl<'a> SchemaTransform<'a> for GetReferencedFields<'a> {
235239
}
236240

237241
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
238-
let physical_name = field.physical_name();
242+
let physical_name = field.physical_name(self.column_mapping_mode);
239243
self.logical_path.push(field.name.clone());
240244
self.physical_path.push(physical_name.to_string());
241245
let field = self.recurse_into_struct_field(field);
@@ -729,6 +733,7 @@ pub(crate) mod test_utils {
729733

730734
use super::state::ScanCallback;
731735
use super::PhysicalPredicate;
736+
use crate::table_features::ColumnMappingMode;
732737
use crate::transforms::TransformSpec;
733738

734739
// Generates a batch of sidecar actions with the given paths.
@@ -843,6 +848,7 @@ pub(crate) mod test_utils {
843848
physical_schema: logical_schema,
844849
physical_predicate: PhysicalPredicate::None,
845850
transform_spec,
851+
column_mapping_mode: ColumnMappingMode::None,
846852
});
847853
let iter = scan_action_iter(
848854
&SyncEngine::new(),
@@ -1053,7 +1059,9 @@ mod tests {
10531059
];
10541060

10551061
for (predicate, expected) in test_cases {
1056-
let result = PhysicalPredicate::try_new(&predicate, &logical_schema).ok();
1062+
let result =
1063+
PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::Name)
1064+
.ok();
10571065
assert_eq!(
10581066
result, expected,
10591067
"Failed for predicate: {predicate:#?}, expected {expected:#?}, got {result:#?}"

kernel/src/scan/state_info.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ pub(crate) struct StateInfo {
2525
pub(crate) physical_predicate: PhysicalPredicate,
2626
/// Transform specification for converting physical to logical data
2727
pub(crate) transform_spec: Option<Arc<TransformSpec>>,
28+
/// The column mapping mode for this scan
29+
pub(crate) column_mapping_mode: ColumnMappingMode,
2830
}
2931

3032
/// Validating the metadata columns also extracts information needed to properly construct the full
@@ -192,7 +194,7 @@ impl StateInfo {
192194
let physical_schema = Arc::new(StructType::try_new(read_fields)?);
193195

194196
let physical_predicate = match predicate {
195-
Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema)?,
197+
Some(pred) => PhysicalPredicate::try_new(&pred, &logical_schema, column_mapping_mode)?,
196198
None => PhysicalPredicate::None,
197199
};
198200

@@ -208,6 +210,7 @@ impl StateInfo {
208210
physical_schema,
209211
physical_predicate,
210212
transform_spec,
213+
column_mapping_mode,
211214
})
212215
}
213216
}

kernel/src/schema/mod.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -314,17 +314,26 @@ impl StructField {
314314

315315
/// Get the physical name for this field as it should be read from parquet.
316316
///
317+
/// When `column_mapping_mode` is `None`, always returns the logical name (even if physical
318+
/// name metadata is present). When mode is `Id` or `Name`, returns the physical name from
319+
/// metadata if present, otherwise returns the logical name.
320+
///
317321
/// NOTE: Caller affirms that the schema was already validated by
318322
/// [`crate::table_features::validate_schema_column_mapping`], to ensure that annotations are
319323
/// always and only present when column mapping mode is enabled.
320324
#[internal_api]
321-
pub(crate) fn physical_name(&self) -> &str {
322-
match self
323-
.metadata
324-
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
325-
{
326-
Some(MetadataValue::String(physical_name)) => physical_name,
327-
_ => &self.name,
325+
pub(crate) fn physical_name(&self, column_mapping_mode: ColumnMappingMode) -> &str {
326+
match column_mapping_mode {
327+
ColumnMappingMode::None => &self.name,
328+
ColumnMappingMode::Id | ColumnMappingMode::Name => {
329+
match self
330+
.metadata
331+
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref())
332+
{
333+
Some(MetadataValue::String(physical_name)) => physical_name,
334+
_ => &self.name,
335+
}
336+
}
328337
}
329338
}
330339

@@ -422,7 +431,7 @@ impl StructField {
422431
.is_some_and(|x| matches!(x, MetadataValue::String(_))));
423432
}
424433
}
425-
field.physical_name().to_owned()
434+
field.physical_name(self.column_mapping_mode).to_owned()
426435
}
427436
};
428437

@@ -1901,7 +1910,7 @@ mod tests {
19011910
assert!(matches!(col_id, MetadataValue::Number(num) if *num == 4));
19021911
assert!(matches!(id_start, MetadataValue::Number(num) if *num == 2147483648i64));
19031912
assert_eq!(
1904-
field.physical_name(),
1913+
field.physical_name(mode),
19051914
"col-5f422f40-de70-45b2-88ab-1d5c90e94db1"
19061915
);
19071916
let physical_field = field.make_physical(mode);

kernel/src/table_changes/log_replay/tests.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::scan::state::DvInfo;
1010
use crate::scan::PhysicalPredicate;
1111
use crate::schema::{DataType, StructField, StructType};
1212
use crate::table_changes::log_replay::LogReplayScanner;
13-
use crate::table_features::TableFeature;
13+
use crate::table_features::{ColumnMappingMode, TableFeature};
1414
use crate::utils::test_utils::{assert_result_error_with_message, Action, LocalMockTable};
1515
use crate::Predicate;
1616
use crate::{DeltaResult, Engine, Error, Version};
@@ -539,10 +539,11 @@ async fn data_skipping_filter() {
539539
Scalar::from(4),
540540
);
541541
let logical_schema = get_schema();
542-
let predicate = match PhysicalPredicate::try_new(&predicate, &logical_schema) {
543-
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
544-
other => panic!("Unexpected result: {other:?}"),
545-
};
542+
let predicate =
543+
match PhysicalPredicate::try_new(&predicate, &logical_schema, ColumnMappingMode::None) {
544+
Ok(PhysicalPredicate::Some(p, s)) => Some((p, s)),
545+
other => panic!("Unexpected result: {other:?}"),
546+
};
546547
let commits = get_segment(engine.as_ref(), mock_table.table_root(), 0, None)
547548
.unwrap()
548549
.into_iter();

kernel/src/table_changes/physical_to_logical.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ pub(crate) fn get_cdf_transform_expr(
107107
&state_info.logical_schema,
108108
transform_spec,
109109
&scan_file.partition_values,
110+
state_info.column_mapping_mode,
110111
)?;
111112
partition_values.extend(parsed_values);
112113

@@ -131,6 +132,7 @@ mod tests {
131132
use crate::scan::state_info::StateInfo;
132133
use crate::scan::PhysicalPredicate;
133134
use crate::schema::{DataType, StructField, StructType};
135+
use crate::table_features::ColumnMappingMode;
134136
use crate::transforms::FieldTransformSpec;
135137
use std::collections::HashMap;
136138
use std::sync::Arc;
@@ -180,6 +182,7 @@ mod tests {
180182
physical_schema: physical_schema.into(),
181183
physical_predicate: PhysicalPredicate::None,
182184
transform_spec: Some(Arc::new(transform_spec)),
185+
column_mapping_mode: ColumnMappingMode::None,
183186
}
184187
}
185188

@@ -397,6 +400,7 @@ mod tests {
397400
physical_schema: physical_schema.clone().into(),
398401
physical_predicate: PhysicalPredicate::None,
399402
transform_spec: Some(Arc::new(transform_spec)),
403+
column_mapping_mode: ColumnMappingMode::None,
400404
};
401405

402406
let result = get_cdf_transform_expr(&scan_file, &state_info, &physical_schema);

kernel/src/transforms.rs

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use crate::expressions::{
1313
BinaryExpressionOp, Expression, ExpressionRef, Scalar, Transform, VariadicExpressionOp,
1414
};
1515
use crate::schema::{DataType, SchemaRef, StructType};
16+
use crate::table_features::ColumnMappingMode;
1617
use crate::{DeltaResult, Error};
1718

1819
/// A list of field transforms that describes a transform expression to be created at scan time.
@@ -70,13 +71,14 @@ pub(crate) fn parse_partition_value(
7071
field_idx: usize,
7172
logical_schema: &SchemaRef,
7273
partition_values: &HashMap<String, String>,
74+
column_mapping_mode: ColumnMappingMode,
7375
) -> DeltaResult<(usize, (String, Scalar))> {
7476
let Some(field) = logical_schema.field_at_index(field_idx) else {
7577
return Err(Error::InternalError(format!(
7678
"out of bounds partition column field index {field_idx}"
7779
)));
7880
};
79-
let name = field.physical_name();
81+
let name = field.physical_name(column_mapping_mode);
8082
let partition_value = parse_partition_value_raw(partition_values.get(name), field.data_type())?;
8183
Ok((field_idx, (name.to_string(), partition_value)))
8284
}
@@ -86,13 +88,19 @@ pub(crate) fn parse_partition_values(
8688
logical_schema: &SchemaRef,
8789
transform_spec: &TransformSpec,
8890
partition_values: &HashMap<String, String>,
91+
column_mapping_mode: ColumnMappingMode,
8992
) -> DeltaResult<HashMap<usize, (String, Scalar)>> {
9093
transform_spec
9194
.iter()
9295
.filter_map(|field_transform| match field_transform {
93-
FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => Some(
94-
parse_partition_value(*field_index, logical_schema, partition_values),
95-
),
96+
FieldTransformSpec::MetadataDerivedColumn { field_index, .. } => {
97+
Some(parse_partition_value(
98+
*field_index,
99+
logical_schema,
100+
partition_values,
101+
column_mapping_mode,
102+
))
103+
}
96104
FieldTransformSpec::DynamicColumn { .. }
97105
| FieldTransformSpec::StaticInsert { .. }
98106
| FieldTransformSpec::GenerateRowId { .. }
@@ -221,7 +229,7 @@ mod tests {
221229
)]));
222230
let partition_values = HashMap::new();
223231

224-
let result = parse_partition_value(5, &schema, &partition_values);
232+
let result = parse_partition_value(5, &schema, &partition_values, ColumnMappingMode::None);
225233
assert_result_error_with_message(result, "out of bounds");
226234
}
227235

@@ -256,7 +264,13 @@ mod tests {
256264
partition_values.insert("id".to_string(), "test".to_string());
257265
partition_values.insert("_change_type".to_string(), "insert".to_string());
258266

259-
let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap();
267+
let result = parse_partition_values(
268+
&schema,
269+
&transform_spec,
270+
&partition_values,
271+
ColumnMappingMode::None,
272+
)
273+
.unwrap();
260274
assert_eq!(result.len(), 2);
261275
assert!(result.contains_key(&0));
262276
assert!(result.contains_key(&1));
@@ -276,7 +290,13 @@ mod tests {
276290
let transform_spec = vec![];
277291
let partition_values = HashMap::new();
278292

279-
let result = parse_partition_values(&schema, &transform_spec, &partition_values).unwrap();
293+
let result = parse_partition_values(
294+
&schema,
295+
&transform_spec,
296+
&partition_values,
297+
ColumnMappingMode::None,
298+
)
299+
.unwrap();
280300
assert!(result.is_empty());
281301
}
282302

0 commit comments

Comments
 (0)