Skip to content

Commit 53e4092

Browse files
authored
perf: [iceberg] Deduplicate serialized metadata for Iceberg native scan (#2933)
1 parent 89a3c5f commit 53e4092

File tree

3 files changed

+355
-161
lines changed

3 files changed

+355
-161
lines changed

native/core/src/execution/planner.rs

Lines changed: 167 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -1169,6 +1169,7 @@ impl PhysicalPlanner {
11691169
);
11701170

11711171
let tasks = parse_file_scan_tasks(
1172+
scan,
11721173
&scan.file_partitions[self.partition as usize].file_scan_tasks,
11731174
)?;
11741175
let file_task_groups = vec![tasks];
@@ -2639,29 +2640,66 @@ fn convert_spark_types_to_arrow_schema(
26392640
///
26402641
/// Each task contains a residual predicate that is used for row-group level filtering
26412642
/// during Parquet scanning.
2643+
///
2644+
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
2645+
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
26422646
fn parse_file_scan_tasks(
2647+
proto_scan: &spark_operator::IcebergScan,
26432648
proto_tasks: &[spark_operator::IcebergFileScanTask],
26442649
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
2645-
let results: Result<Vec<_>, _> = proto_tasks
2650+
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
2651+
// once instead of 10K times, eliminating redundant JSON deserialization
2652+
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
2653+
.schema_pool
26462654
.iter()
2647-
.map(|proto_task| {
2648-
let schema: iceberg::spec::Schema = serde_json::from_str(&proto_task.schema_json)
2649-
.map_err(|e| {
2650-
ExecutionError::GeneralError(format!("Failed to parse schema JSON: {}", e))
2651-
})?;
2655+
.map(|json| {
2656+
serde_json::from_str(json).map(Arc::new).map_err(|e| {
2657+
ExecutionError::GeneralError(format!(
2658+
"Failed to parse schema JSON from pool: {}",
2659+
e
2660+
))
2661+
})
2662+
})
2663+
.collect::<Result<Vec<_>, _>>()?;
26522664

2653-
let schema_ref = Arc::new(schema);
2665+
let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
2666+
.partition_type_pool
2667+
.iter()
2668+
.map(|json| {
2669+
serde_json::from_str(json).map_err(|e| {
2670+
ExecutionError::GeneralError(format!(
2671+
"Failed to parse partition type JSON from pool: {}",
2672+
e
2673+
))
2674+
})
2675+
})
2676+
.collect::<Result<Vec<_>, _>>()?;
26542677

2655-
// CometScanRule validates format before serialization
2656-
debug_assert_eq!(
2657-
proto_task.data_file_format.as_str(),
2658-
"PARQUET",
2659-
"Only PARQUET format is supported. This indicates a bug in CometScanRule validation."
2660-
);
2661-
let data_file_format = iceberg::spec::DataFileFormat::Parquet;
2678+
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
2679+
.partition_spec_pool
2680+
.iter()
2681+
.map(|json| {
2682+
serde_json::from_str::<iceberg::spec::PartitionSpec>(json)
2683+
.ok()
2684+
.map(Arc::new)
2685+
})
2686+
.collect();
26622687

2663-
let deletes: Vec<iceberg::scan::FileScanTaskDeleteFile> = proto_task
2664-
.delete_files
2688+
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
2689+
.name_mapping_pool
2690+
.iter()
2691+
.map(|json| {
2692+
serde_json::from_str::<iceberg::spec::NameMapping>(json)
2693+
.ok()
2694+
.map(Arc::new)
2695+
})
2696+
.collect();
2697+
2698+
let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
2699+
.delete_files_pool
2700+
.iter()
2701+
.map(|list| {
2702+
list.delete_files
26652703
.iter()
26662704
.map(|del| {
26672705
let file_type = match del.content_type.as_str() {
@@ -2686,53 +2724,106 @@ fn parse_file_scan_tasks(
26862724
},
26872725
})
26882726
})
2689-
.collect::<Result<Vec<_>, ExecutionError>>()?;
2727+
.collect::<Result<Vec<_>, ExecutionError>>()
2728+
})
2729+
.collect::<Result<Vec<_>, _>>()?;
26902730

2691-
// Residuals are serialized with binding=false (name-based references).
2692-
// Convert to Iceberg predicate and bind to this file's schema for row-group filtering.
2693-
let bound_predicate = proto_task
2694-
.residual
2695-
.as_ref()
2696-
.and_then(|residual_expr| {
2697-
convert_spark_expr_to_predicate(residual_expr)
2698-
})
2699-
.map(
2700-
|pred| -> Result<iceberg::expr::BoundPredicate, ExecutionError> {
2701-
let bound = pred.bind(Arc::clone(&schema_ref), true).map_err(|e| {
2702-
ExecutionError::GeneralError(format!(
2703-
"Failed to bind predicate to schema: {}",
2704-
e
2705-
))
2706-
})?;
2707-
2708-
Ok(bound)
2709-
},
2710-
)
2711-
.transpose()?;
2712-
2713-
let partition = if let (Some(partition_json), Some(partition_type_json)) = (
2714-
proto_task.partition_data_json.as_ref(),
2715-
proto_task.partition_type_json.as_ref(),
2716-
) {
2717-
let partition_type: iceberg::spec::StructType =
2718-
serde_json::from_str(partition_type_json).map_err(|e| {
2731+
let partition_data_cache: Vec<serde_json::Value> = proto_scan
2732+
.partition_data_pool
2733+
.iter()
2734+
.map(|json| {
2735+
serde_json::from_str(json).map_err(|e| {
2736+
ExecutionError::GeneralError(format!(
2737+
"Failed to parse partition data JSON from pool: {}",
2738+
e
2739+
))
2740+
})
2741+
})
2742+
.collect::<Result<Vec<_>, _>>()?;
2743+
2744+
let results: Result<Vec<_>, _> = proto_tasks
2745+
.iter()
2746+
.map(|proto_task| {
2747+
let schema_ref = Arc::clone(
2748+
schema_cache
2749+
.get(proto_task.schema_idx as usize)
2750+
.ok_or_else(|| {
27192751
ExecutionError::GeneralError(format!(
2720-
"Failed to parse partition type JSON: {}",
2721-
e
2752+
"Invalid schema_idx: {} (pool size: {})",
2753+
proto_task.schema_idx,
2754+
schema_cache.len()
2755+
))
2756+
})?,
2757+
);
2758+
2759+
let data_file_format = iceberg::spec::DataFileFormat::Parquet;
2760+
2761+
let deletes = if let Some(idx) = proto_task.delete_files_idx {
2762+
delete_files_cache
2763+
.get(idx as usize)
2764+
.ok_or_else(|| {
2765+
ExecutionError::GeneralError(format!(
2766+
"Invalid delete_files_idx: {} (pool size: {})",
2767+
idx,
2768+
delete_files_cache.len()
2769+
))
2770+
})?
2771+
.clone()
2772+
} else {
2773+
vec![]
2774+
};
2775+
2776+
let bound_predicate = if let Some(idx) = proto_task.residual_idx {
2777+
proto_scan
2778+
.residual_pool
2779+
.get(idx as usize)
2780+
.and_then(convert_spark_expr_to_predicate)
2781+
.map(
2782+
|pred| -> Result<iceberg::expr::BoundPredicate, ExecutionError> {
2783+
pred.bind(Arc::clone(&schema_ref), true).map_err(|e| {
2784+
ExecutionError::GeneralError(format!(
2785+
"Failed to bind predicate to schema: {}",
2786+
e
2787+
))
2788+
})
2789+
},
2790+
)
2791+
.transpose()?
2792+
} else {
2793+
None
2794+
};
2795+
2796+
let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
2797+
let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| {
2798+
ExecutionError::GeneralError(
2799+
"partition_type_idx is required when partition_data_idx is present"
2800+
.to_string(),
2801+
)
2802+
})?;
2803+
2804+
let partition_data_value = partition_data_cache
2805+
.get(partition_data_idx as usize)
2806+
.ok_or_else(|| {
2807+
ExecutionError::GeneralError(format!(
2808+
"Invalid partition_data_idx: {} (cache size: {})",
2809+
partition_data_idx,
2810+
partition_data_cache.len()
27222811
))
27232812
})?;
27242813

2725-
let partition_data_value: serde_json::Value = serde_json::from_str(partition_json)
2726-
.map_err(|e| {
2814+
let partition_type = partition_type_cache
2815+
.get(partition_type_idx as usize)
2816+
.ok_or_else(|| {
27272817
ExecutionError::GeneralError(format!(
2728-
"Failed to parse partition data JSON: {}",
2729-
e
2818+
"Invalid partition_type_idx: {} (cache size: {})",
2819+
partition_type_idx,
2820+
partition_type_cache.len()
27302821
))
27312822
})?;
27322823

27332824
match iceberg::spec::Literal::try_from_json(
2734-
partition_data_value,
2735-
&iceberg::spec::Type::Struct(partition_type),
2825+
partition_data_value.clone(),
2826+
&iceberg::spec::Type::Struct(partition_type.clone()),
27362827
) {
27372828
Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
27382829
Ok(None) => None,
@@ -2753,28 +2844,28 @@ fn parse_file_scan_tasks(
27532844
None
27542845
};
27552846

2756-
let partition_spec = if let Some(partition_spec_json) =
2757-
proto_task.partition_spec_json.as_ref()
2758-
{
2759-
// Try to parse partition spec, but gracefully handle unknown transforms
2760-
// for forward compatibility (e.g., TestForwardCompatibility tests)
2761-
match serde_json::from_str::<iceberg::spec::PartitionSpec>(partition_spec_json) {
2762-
Ok(spec) => Some(Arc::new(spec)),
2763-
Err(_) => None,
2764-
}
2765-
} else {
2766-
None
2767-
};
2768-
2769-
let name_mapping = if let Some(name_mapping_json) = proto_task.name_mapping_json.as_ref()
2770-
{
2771-
match serde_json::from_str::<iceberg::spec::NameMapping>(name_mapping_json) {
2772-
Ok(mapping) => Some(Arc::new(mapping)),
2773-
Err(_) => None, // Name mapping is optional
2774-
}
2775-
} else {
2776-
None
2777-
};
2847+
let partition_spec = proto_task
2848+
.partition_spec_idx
2849+
.and_then(|idx| partition_spec_cache.get(idx as usize))
2850+
.and_then(|opt| opt.clone());
2851+
2852+
let name_mapping = proto_task
2853+
.name_mapping_idx
2854+
.and_then(|idx| name_mapping_cache.get(idx as usize))
2855+
.and_then(|opt| opt.clone());
2856+
2857+
let project_field_ids = proto_scan
2858+
.project_field_ids_pool
2859+
.get(proto_task.project_field_ids_idx as usize)
2860+
.ok_or_else(|| {
2861+
ExecutionError::GeneralError(format!(
2862+
"Invalid project_field_ids_idx: {} (pool size: {})",
2863+
proto_task.project_field_ids_idx,
2864+
proto_scan.project_field_ids_pool.len()
2865+
))
2866+
})?
2867+
.field_ids
2868+
.clone();
27782869

27792870
Ok(iceberg::scan::FileScanTask {
27802871
data_file_path: proto_task.data_file_path.clone(),
@@ -2783,7 +2874,7 @@ fn parse_file_scan_tasks(
27832874
record_count: proto_task.record_count,
27842875
data_file_format,
27852876
schema: schema_ref,
2786-
project_field_ids: proto_task.project_field_ids.clone(),
2877+
project_field_ids,
27872878
predicate: bound_predicate,
27882879
deletes,
27892880
partition,

native/proto/src/proto/operator.proto

Lines changed: 29 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,26 @@ message IcebergScan {
122122

123123
// Table metadata file path for FileIO initialization
124124
string metadata_location = 4;
125+
126+
// Deduplication pools - shared data referenced by index from tasks
127+
repeated string schema_pool = 5;
128+
repeated string partition_type_pool = 6;
129+
repeated string partition_spec_pool = 7;
130+
repeated string name_mapping_pool = 8;
131+
repeated ProjectFieldIdList project_field_ids_pool = 9;
132+
repeated string partition_data_pool = 10;
133+
repeated DeleteFileList delete_files_pool = 11;
134+
repeated spark.spark_expression.Expr residual_pool = 12;
135+
}
136+
137+
// Helper message for deduplicating field ID lists
138+
message ProjectFieldIdList {
139+
repeated int32 field_ids = 1;
140+
}
141+
142+
// Helper message for deduplicating delete file lists
143+
message DeleteFileList {
144+
repeated IcebergDeleteFile delete_files = 1;
125145
}
126146

127147
// Groups FileScanTasks for a single Spark partition
@@ -141,42 +161,15 @@ message IcebergFileScanTask {
141161
// Record count if reading entire file
142162
optional uint64 record_count = 4;
143163

144-
// File format (PARQUET, AVRO, or ORC)
145-
string data_file_format = 5;
146-
147-
// File schema as JSON (may differ due to schema evolution)
148-
string schema_json = 6;
149-
150-
// Field IDs to project
151-
repeated int32 project_field_ids = 7;
152-
153-
// Delete files for MOR tables
154-
repeated IcebergDeleteFile delete_files = 8;
155-
156-
// Residual filter after partition pruning (applied at row-group level)
157-
// Example: if scan filter is "date >= '2024-01-01' AND status = 'active'"
158-
// and file partition is date='2024-06-15', residual is "status = 'active'"
159-
optional spark.spark_expression.Expr residual = 9;
160-
161-
// Partition data from manifest entry (for proper constant identification)
162-
// Serialized as JSON to represent the Struct of partition values
163-
optional string partition_data_json = 10;
164-
165-
// Partition type schema as JSON (Iceberg StructType for partition fields)
166-
// Used to deserialize partition_data_json into proper Iceberg types
167-
optional string partition_type_json = 12;
168-
169-
// Partition spec as JSON (entire PartitionSpec object)
170-
// Used to determine which partition fields are identity-transformed (constants)
171-
// The spec includes spec-id embedded in the JSON.
172-
optional string partition_spec_json = 13;
173-
174-
// Name mapping from table metadata (property: schema.name-mapping.default)
175-
// Used to resolve field IDs from column names when Parquet files lack field IDs
176-
// or have field ID conflicts (e.g., Hive table migrations via add_files).
177-
// Per Iceberg spec rule #2: "Use schema.name-mapping.default metadata to map
178-
// field id to columns without field id".
179-
optional string name_mapping_json = 14;
164+
// Indices into IcebergScan deduplication pools
165+
uint32 schema_idx = 15;
166+
optional uint32 partition_type_idx = 16;
167+
optional uint32 partition_spec_idx = 17;
168+
optional uint32 name_mapping_idx = 18;
169+
uint32 project_field_ids_idx = 19;
170+
optional uint32 partition_data_idx = 20;
171+
optional uint32 delete_files_idx = 21;
172+
optional uint32 residual_idx = 22;
180173
}
181174

182175
// Iceberg delete file for MOR tables (positional or equality deletes)

0 commit comments

Comments
 (0)