Skip to content

Commit 135f149

Browse files
committed
remove inadvertent changes, make comments more succinct
1 parent b30195c commit 135f149

File tree

3 files changed

+27
-123
lines changed

3 files changed

+27
-123
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -235,8 +235,6 @@ impl ArrowReader {
235235
// RecordBatchTransformer performs any transformations required on the RecordBatches
236236
// that come back from the file, such as type promotion, default column insertion
237237
// and column re-ordering.
238-
// Always use build_with_partition_data to ensure name_mapping is passed through,
239-
// even when partition spec/data aren't available.
240238
let mut record_batch_transformer = RecordBatchTransformer::build_with_partition_data(
241239
task.schema_ref(),
242240
task.project_field_ids(),
@@ -1949,7 +1947,7 @@ message schema {
19491947
start: 0,
19501948
length: 0,
19511949
record_count: None,
1952-
data_file_path: format!("{}/1.parquet", table_location),
1950+
data_file_path: format!("{table_location}/1.parquet"),
19531951
data_file_format: DataFileFormat::Parquet,
19541952
schema: schema.clone(),
19551953
project_field_ids: vec![1],
@@ -2323,21 +2321,16 @@ message schema {
23232321
.unwrap();
23242322

23252323
let total_rows_task2: usize = result2.iter().map(|b| b.num_rows()).sum();
2326-
println!(
2327-
"Task 2 (bytes {}-{}) returned {} rows",
2328-
rg1_start, file_end, total_rows_task2
2329-
);
2324+
println!("Task 2 (bytes {rg1_start}-{file_end}) returned {total_rows_task2} rows");
23302325

23312326
assert_eq!(
23322327
total_rows_task1, 100,
2333-
"Task 1 should read only the first row group (100 rows), but got {} rows",
2334-
total_rows_task1
2328+
"Task 1 should read only the first row group (100 rows), but got {total_rows_task1} rows"
23352329
);
23362330

23372331
assert_eq!(
23382332
total_rows_task2, 200,
2339-
"Task 2 should read only the second+third row groups (200 rows), but got {} rows",
2340-
total_rows_task2
2333+
"Task 2 should read only the second+third row groups (200 rows), but got {total_rows_task2} rows"
23412334
);
23422335

23432336
// Verify the actual data values are correct (not just the row count)
@@ -2348,7 +2341,7 @@ message schema {
23482341
.as_primitive::<arrow_array::types::Int32Type>();
23492342
let first_val = id_col.value(0);
23502343
let last_val = id_col.value(id_col.len() - 1);
2351-
println!("Task 1 data range: {} to {}", first_val, last_val);
2344+
println!("Task 1 data range: {first_val} to {last_val}");
23522345

23532346
assert_eq!(first_val, 0, "Task 1 should start with id=0");
23542347
assert_eq!(last_val, 99, "Task 1 should end with id=99");
@@ -2360,7 +2353,7 @@ message schema {
23602353
.column(0)
23612354
.as_primitive::<arrow_array::types::Int32Type>();
23622355
let first_val = id_col.value(0);
2363-
println!("Task 2 first value: {}", first_val);
2356+
println!("Task 2 first value: {first_val}");
23642357

23652358
assert_eq!(first_val, 100, "Task 2 should start with id=100, not id=0");
23662359
}
@@ -2418,7 +2411,7 @@ message schema {
24182411
start: 0,
24192412
length: 0,
24202413
record_count: None,
2421-
data_file_path: format!("{}/old_file.parquet", table_location),
2414+
data_file_path: format!("{table_location}/old_file.parquet"),
24222415
data_file_format: DataFileFormat::Parquet,
24232416
schema: new_schema.clone(),
24242417
project_field_ids: vec![1, 2], // Request both columns 'a' and 'b'

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 20 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -37,56 +37,18 @@ use crate::{Error, ErrorKind, Result};
3737

3838
/// Build a map of field ID to constant value for identity-partitioned fields.
3939
///
40-
/// This implements the Iceberg spec's "Column Projection" rule #1
41-
/// (https://iceberg.apache.org/spec/#column-projection):
42-
/// > "Return the value from partition metadata if an Identity Transform exists for the field
43-
/// > and the partition value is present in the `partition` struct on `data_file` object
44-
/// > in the manifest."
40+
/// Implements Iceberg spec "Column Projection" rule #1: use partition metadata constants
41+
/// only for identity-transformed fields. Non-identity transforms (bucket, truncate, year, etc.)
42+
/// store derived values in partition metadata, so source columns must be read from data files.
4543
///
46-
/// This matches Java's `PartitionUtil.constantsMap()` which only adds fields where:
47-
/// ```java
48-
/// if (field.transform().isIdentity()) {
49-
/// idToConstant.put(field.sourceId(), converted);
50-
/// }
51-
/// ```
44+
/// Example: For `bucket(4, id)`, partition metadata has `id_bucket = 2` (bucket number),
45+
/// but the actual `id` values (100, 200, 300) are only in the data file.
5246
///
53-
/// # Why only identity transforms?
54-
///
55-
/// Non-identity transforms (bucket, truncate, year, month, day, hour) produce DERIVED values
56-
/// that differ from the source column values. For example:
57-
/// - `bucket(4, id)` produces hash values 0-3, not the actual `id` values
58-
/// - `day(timestamp)` produces day-since-epoch integers, not the timestamp values
59-
///
60-
/// These source columns MUST be read from the data file because partition metadata only
61-
/// stores the transformed values (e.g., bucket number), not the original column values.
62-
///
63-
/// # Java Implementation Reference
64-
///
65-
/// This matches Java's `PartitionUtil.constantsMap()` (util/PartitionUtil.java):
66-
/// ```java
67-
/// public static Map<Integer, Object> constantsMap(PartitionData data, PartitionSpec spec) {
68-
/// Map<Integer, Object> idToConstant = Maps.newHashMap();
69-
/// for (int pos = 0; pos < spec.fields().size(); pos += 1) {
70-
/// PartitionField field = spec.fields().get(pos);
71-
/// if (field.transform().isIdentity()) { // <-- ONLY identity transforms
72-
/// Object converted = convertConstant(field.sourceId(), data.get(pos, javaClass));
73-
/// idToConstant.put(field.sourceId(), converted);
74-
/// }
75-
/// }
76-
/// return idToConstant;
77-
/// }
78-
/// ```
79-
///
80-
/// # Example: Bucket Partitioning
81-
///
82-
/// For a table partitioned by `bucket(4, id)`:
83-
/// - Partition metadata stores: `id_bucket = 2` (the bucket number)
84-
/// - Data file contains: `id = 100, 200, 300` (the actual values)
85-
/// - Reading must use data from the file, not the constant `2` from partition metadata
47+
/// Matches Java's `PartitionUtil.constantsMap()` which filters `if (field.transform().isIdentity())`.
8648
///
8749
/// # References
88-
/// - Iceberg spec: format/spec.md "Column Projection" section
89-
/// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
50+
/// - Spec: https://iceberg.apache.org/spec/#column-projection
51+
/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java:constantsMap()
9052
fn constants_map(
9153
partition_spec: &PartitionSpec,
9254
partition_data: &Struct,
@@ -191,8 +153,6 @@ pub(crate) struct RecordBatchTransformer {
191153
partition_data: Option<Struct>,
192154

193155
// Optional name mapping for resolving field IDs from column names
194-
// Per Iceberg spec rule #2: "Use schema.name-mapping.default metadata
195-
// to map field id to columns without field id"
196156
name_mapping: Option<Arc<NameMapping>>,
197157

198158
// BatchTransform gets lazily constructed based on the schema of
@@ -218,70 +178,28 @@ impl RecordBatchTransformer {
218178

219179
/// Build a RecordBatchTransformer with partition spec and data for proper constant identification.
220180
///
221-
/// # Overview
222-
///
223-
/// This method implements the Iceberg spec's "Column Projection" rules
224-
/// (https://iceberg.apache.org/spec/#column-projection) for resolving field IDs that are
225-
/// "not present" in a data file:
226-
///
181+
/// Implements the Iceberg spec's "Column Projection" rules for resolving field IDs "not present" in data files:
227182
/// 1. Return the value from partition metadata if an Identity Transform exists
228183
/// 2. Use schema.name-mapping.default metadata to map field id to columns without field id
229184
/// 3. Return the default value if it has a defined initial-default
230185
/// 4. Return null in all other cases
231186
///
232-
/// # Why this method was added
233-
///
234-
/// The gap in iceberg-rust was that `FileScanTask` had no way to pass partition information
235-
/// to `RecordBatchTransformer`. This caused two problems:
236-
///
237-
/// 1. **Incorrect handling of bucket partitioning**: Without partition spec information,
238-
/// iceberg-rust couldn't distinguish between:
239-
/// - Identity transforms (use constants from partition metadata)
240-
/// - Non-identity transforms like bucket (read from data file)
187+
/// # Why this method exists
241188
///
242-
/// This caused bucket-partitioned source columns to be incorrectly treated as constants,
243-
/// breaking runtime filtering and returning incorrect query results.
189+
/// 1. **Bucket partitioning**: Distinguish identity transforms (use partition metadata constants)
190+
/// from non-identity transforms like bucket (read from data file) to enable runtime filtering on
191+
/// bucket-partitioned columns.
244192
///
245-
/// 2. **Add_files field ID conflicts**: When importing Hive tables via add_files,
246-
/// partition columns with `initial_default` values could have field IDs that conflicted
247-
/// with data column field IDs in the Parquet file.
193+
/// 2. **Add_files field ID conflicts**: When importing Hive tables, partition columns can have field IDs
194+
/// conflicting with Parquet data columns (e.g., Parquet has field_id=1→"name", but Iceberg expects
195+
/// field_id=1→"id"). Per spec, such fields are "not present" and should use name mapping (rule #2).
248196
///
249-
/// Example:
250-
/// - Parquet file written with: field_id=1→"name", field_id=2→"dept"
251-
/// - Imported via add_files: field_id=1→"id" (partition), field_id=2→"name", field_id=3→"dept"
252-
///
253-
/// When looking for field_id=1 ("id"), we find field_id=1 in the Parquet file, but it's
254-
/// the WRONG field (it's "name"). Per the spec, the correct field (id=1, name="id") is
255-
/// "not present" in the file and should be resolved via name mapping (rule #2) or
256-
/// initial-default (rule #3).
257-
///
258-
/// # The fix
259-
///
260-
/// This method accepts `partition_spec`, `partition_data`, and `name_mapping`, which are used to:
261-
/// - Build a `constants_map` that ONLY includes identity-transformed partition fields
262-
/// (matching Java's `PartitionUtil.constantsMap()` behavior)
263-
/// - Detect field ID conflicts by verifying both field ID AND name match (when name mapping present)
264-
/// - Apply name mapping when field IDs are missing or conflicting (spec rule #2)
265-
///
266-
/// This matches Java's approach (ParquetSchemaUtil.applyNameMapping, ReadConf.java lines 83-85)
267-
/// which rewrites Parquet schema field IDs based on names before projection. Our implementation
268-
/// detects conflicts during projection but achieves the same result.
269-
///
270-
/// # What was changed
271-
///
272-
/// To enable this fix, the following fields were added to `FileScanTask`:
273-
/// - `partition: Option<Struct>` - The partition data for this file
274-
/// - `partition_spec: Option<Arc<PartitionSpec>>` - The actual partition spec
275-
/// - `name_mapping: Option<Arc<NameMapping>>` - The name mapping from table metadata
276-
///
277-
/// These fields should be populated by any system that reads Iceberg tables and provides
278-
/// FileScanTasks to the ArrowReader.
197+
/// This matches Java's ParquetSchemaUtil.applyNameMapping approach but detects conflicts during projection.
279198
///
280199
/// # References
281-
/// - Iceberg spec: https://iceberg.apache.org/spec/#column-projection
282-
/// - Java impl: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
283-
/// - Java impl: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
284-
/// - Java test: spark/src/test/java/.../TestRuntimeFiltering.java
200+
/// - Spec: https://iceberg.apache.org/spec/#column-projection
201+
/// - Java: core/src/main/java/org/apache/iceberg/util/PartitionUtil.java
202+
/// - Java: parquet/src/main/java/org/apache/iceberg/parquet/ParquetSchemaUtil.java
285203
pub(crate) fn build_with_partition_data(
286204
snapshot_schema: Arc<IcebergSchema>,
287205
projected_iceberg_field_ids: &[i32],

crates/iceberg/src/scan/task.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,6 @@ pub struct FileScanTask {
9999
/// Name mapping from table metadata (property: schema.name-mapping.default),
100100
/// used to resolve field IDs from column names when Parquet files lack field IDs
101101
/// or have field ID conflicts.
102-
///
103-
/// Per Iceberg spec rule #2: "Use schema.name-mapping.default metadata to map
104-
/// field id to columns without field id".
105-
///
106-
/// This is essential for scenarios like:
107-
/// - Hive table migrations via add_files where Parquet has no field IDs
108-
/// - Field ID conflicts where partition columns conflict with data column IDs
109102
#[serde(default)]
110103
#[serde(skip_serializing_if = "Option::is_none")]
111104
#[serde(serialize_with = "serialize_not_implemented")]

0 commit comments

Comments
 (0)