Skip to content

Commit 8572dae

Browse files
committed
Store fields instead of constants
1 parent 060b45d commit 8572dae

File tree

3 files changed

+27
-25
lines changed

3 files changed

+27
-25
lines changed

crates/iceberg/src/arrow/reader.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl ArrowReader {
248248
.with_constant(
249249
RESERVED_FIELD_ID_FILE,
250250
PrimitiveLiteral::String(task.data_file_path.clone()),
251-
);
251+
)?;
252252

253253
if let Some(batch_size) = batch_size {
254254
record_batch_stream_builder = record_batch_stream_builder.with_batch_size(batch_size);

crates/iceberg/src/arrow/record_batch_transformer.rs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -113,8 +113,9 @@ enum SchemaComparison {
113113
pub(crate) struct RecordBatchTransformer {
114114
snapshot_schema: Arc<IcebergSchema>,
115115
projected_iceberg_field_ids: Vec<i32>,
116-
// Map from field ID to constant value for virtual/metadata fields
117-
constants_map: HashMap<i32, PrimitiveLiteral>,
116+
// Pre-computed constant field information: field_id -> (arrow_type, value)
117+
// Avoids duplicate lookups and type conversions during batch processing
118+
constant_fields: HashMap<i32, (DataType, PrimitiveLiteral)>,
118119

119120
// BatchTransform gets lazily constructed based on the schema of
120121
// the first RecordBatch we receive from the file
@@ -133,7 +134,7 @@ impl RecordBatchTransformer {
133134
Self {
134135
snapshot_schema,
135136
projected_iceberg_field_ids,
136-
constants_map: HashMap::new(),
137+
constant_fields: HashMap::new(),
137138
batch_transform: None,
138139
}
139140
}
@@ -144,9 +145,10 @@ impl RecordBatchTransformer {
144145
/// # Arguments
145146
/// * `field_id` - The field ID to associate with the constant
146147
/// * `value` - The constant value for this field
147-
pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Self {
148-
self.constants_map.insert(field_id, value);
149-
self
148+
pub(crate) fn with_constant(mut self, field_id: i32, value: PrimitiveLiteral) -> Result<Self> {
149+
let arrow_type = Self::primitive_literal_to_arrow_type(&value)?;
150+
self.constant_fields.insert(field_id, (arrow_type, value));
151+
Ok(self)
150152
}
151153

152154
pub(crate) fn process_record_batch(
@@ -183,7 +185,7 @@ impl RecordBatchTransformer {
183185
record_batch.schema_ref(),
184186
self.snapshot_schema.as_ref(),
185187
&self.projected_iceberg_field_ids,
186-
&self.constants_map,
188+
&self.constant_fields,
187189
)?);
188190

189191
self.process_record_batch(record_batch)?
@@ -202,7 +204,7 @@ impl RecordBatchTransformer {
202204
source_schema: &ArrowSchemaRef,
203205
snapshot_schema: &IcebergSchema,
204206
projected_iceberg_field_ids: &[i32],
205-
constants_map: &HashMap<i32, PrimitiveLiteral>,
207+
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
206208
) -> Result<BatchTransform> {
207209
let mapped_unprojected_arrow_schema = Arc::new(schema_to_arrow_schema(snapshot_schema)?);
208210
let field_id_to_mapped_schema_map =
@@ -213,16 +215,17 @@ impl RecordBatchTransformer {
213215
let fields: Result<Vec<_>> = projected_iceberg_field_ids
214216
.iter()
215217
.map(|field_id| {
216-
// Check if this is a constant/virtual field
217-
if let Some(constant_value) = constants_map.get(field_id) {
218-
// Create a field for the virtual column based on the constant type
219-
let arrow_type = Self::primitive_literal_to_arrow_type(constant_value)?;
218+
// Check if this is a constant/virtual field (pre-computed)
219+
if let Some((arrow_type, _)) = constant_fields.get(field_id) {
220+
// Create a field for the virtual column
220221
let field_name = get_metadata_column_name(*field_id)?;
221222
Ok(Arc::new(
222-
Field::new(field_name, arrow_type, false).with_metadata(HashMap::from([(
223-
PARQUET_FIELD_ID_META_KEY.to_string(),
224-
field_id.to_string(),
225-
)])),
223+
Field::new(field_name, arrow_type.clone(), false).with_metadata(
224+
HashMap::from([(
225+
PARQUET_FIELD_ID_META_KEY.to_string(),
226+
field_id.to_string(),
227+
)]),
228+
),
226229
))
227230
} else {
228231
Ok(field_id_to_mapped_schema_map
@@ -245,7 +248,7 @@ impl RecordBatchTransformer {
245248
snapshot_schema,
246249
projected_iceberg_field_ids,
247250
field_id_to_mapped_schema_map,
248-
constants_map,
251+
constant_fields,
249252
)?,
250253
target_schema,
251254
}),
@@ -302,18 +305,18 @@ impl RecordBatchTransformer {
302305
snapshot_schema: &IcebergSchema,
303306
projected_iceberg_field_ids: &[i32],
304307
field_id_to_mapped_schema_map: HashMap<i32, (FieldRef, usize)>,
305-
constants_map: &HashMap<i32, PrimitiveLiteral>,
308+
constant_fields: &HashMap<i32, (DataType, PrimitiveLiteral)>,
306309
) -> Result<Vec<ColumnSource>> {
307310
let field_id_to_source_schema_map =
308311
Self::build_field_id_to_arrow_schema_map(source_schema)?;
309312

310313
projected_iceberg_field_ids.iter().map(|field_id|{
311-
// Check if this is a constant/virtual field first
312-
if let Some(constant_value) = constants_map.get(field_id) {
314+
// Check if this is a constant/virtual field (pre-computed)
315+
if let Some((arrow_type, value)) = constant_fields.get(field_id) {
313316
// This is a virtual field - add it with the constant value
314317
return Ok(ColumnSource::Add {
315-
value: Some(constant_value.clone()),
316-
target_type: Self::primitive_literal_to_arrow_type(constant_value)?,
318+
value: Some(value.clone()),
319+
target_type: arrow_type.clone(),
317320
});
318321
}
319322

crates/iceberg/src/metadata_columns.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,5 @@ pub fn is_metadata_field(field_id: i32) -> bool {
8484
/// # Returns
8585
/// `true` if the column name is a metadata column, `false` otherwise
8686
pub fn is_metadata_column_name(column_name: &str) -> bool {
87-
column_name == RESERVED_COL_NAME_FILE
88-
// Additional metadata column names can be checked here in the future
87+
get_metadata_field_id(column_name).is_ok()
8988
}

0 commit comments

Comments
 (0)