Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
243 changes: 167 additions & 76 deletions native/core/src/execution/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1202,6 +1202,7 @@ impl PhysicalPlanner {
);

let tasks = parse_file_scan_tasks(
scan,
&scan.file_partitions[self.partition as usize].file_scan_tasks,
)?;
let file_task_groups = vec![tasks];
Expand Down Expand Up @@ -2672,29 +2673,66 @@ fn convert_spark_types_to_arrow_schema(
///
/// Each task contains a residual predicate that is used for row-group level filtering
/// during Parquet scanning.
///
/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing
/// of schemas, partition specs, partition types, name mappings, and other repeated data.
fn parse_file_scan_tasks(
proto_scan: &spark_operator::IcebergScan,
proto_tasks: &[spark_operator::IcebergFileScanTask],
) -> Result<Vec<iceberg::scan::FileScanTask>, ExecutionError> {
let results: Result<Vec<_>, _> = proto_tasks
// Build caches upfront: for 10K tasks with 1 schema, this parses the schema
// once instead of 10K times, eliminating redundant JSON deserialization
let schema_cache: Vec<Arc<iceberg::spec::Schema>> = proto_scan
.schema_pool
.iter()
.map(|proto_task| {
let schema: iceberg::spec::Schema = serde_json::from_str(&proto_task.schema_json)
.map_err(|e| {
ExecutionError::GeneralError(format!("Failed to parse schema JSON: {}", e))
})?;
.map(|json| {
serde_json::from_str(json).map(Arc::new).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to parse schema JSON from pool: {}",
e
))
})
})
.collect::<Result<Vec<_>, _>>()?;

let schema_ref = Arc::new(schema);
let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
.partition_type_pool
.iter()
.map(|json| {
serde_json::from_str(json).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to parse partition type JSON from pool: {}",
e
))
})
})
.collect::<Result<Vec<_>, _>>()?;

// CometScanRule validates format before serialization
debug_assert_eq!(
proto_task.data_file_format.as_str(),
"PARQUET",
"Only PARQUET format is supported. This indicates a bug in CometScanRule validation."
);
let data_file_format = iceberg::spec::DataFileFormat::Parquet;
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
.partition_spec_pool
.iter()
.map(|json| {
serde_json::from_str::<iceberg::spec::PartitionSpec>(json)
.ok()
.map(Arc::new)
})
.collect();

let deletes: Vec<iceberg::scan::FileScanTaskDeleteFile> = proto_task
.delete_files
let name_mapping_cache: Vec<Option<Arc<iceberg::spec::NameMapping>>> = proto_scan
.name_mapping_pool
.iter()
.map(|json| {
serde_json::from_str::<iceberg::spec::NameMapping>(json)
.ok()
.map(Arc::new)
})
.collect();

let delete_files_cache: Vec<Vec<iceberg::scan::FileScanTaskDeleteFile>> = proto_scan
.delete_files_pool
.iter()
.map(|list| {
list.delete_files
.iter()
.map(|del| {
let file_type = match del.content_type.as_str() {
Expand All @@ -2719,53 +2757,106 @@ fn parse_file_scan_tasks(
},
})
})
.collect::<Result<Vec<_>, ExecutionError>>()?;
.collect::<Result<Vec<_>, ExecutionError>>()
})
.collect::<Result<Vec<_>, _>>()?;

// Residuals are serialized with binding=false (name-based references).
// Convert to Iceberg predicate and bind to this file's schema for row-group filtering.
let bound_predicate = proto_task
.residual
.as_ref()
.and_then(|residual_expr| {
convert_spark_expr_to_predicate(residual_expr)
})
.map(
|pred| -> Result<iceberg::expr::BoundPredicate, ExecutionError> {
let bound = pred.bind(Arc::clone(&schema_ref), true).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to bind predicate to schema: {}",
e
))
})?;

Ok(bound)
},
)
.transpose()?;

let partition = if let (Some(partition_json), Some(partition_type_json)) = (
proto_task.partition_data_json.as_ref(),
proto_task.partition_type_json.as_ref(),
) {
let partition_type: iceberg::spec::StructType =
serde_json::from_str(partition_type_json).map_err(|e| {
let partition_data_cache: Vec<serde_json::Value> = proto_scan
.partition_data_pool
.iter()
.map(|json| {
serde_json::from_str(json).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to parse partition data JSON from pool: {}",
e
))
})
})
.collect::<Result<Vec<_>, _>>()?;

let results: Result<Vec<_>, _> = proto_tasks
.iter()
.map(|proto_task| {
let schema_ref = Arc::clone(
schema_cache
.get(proto_task.schema_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Failed to parse partition type JSON: {}",
e
"Invalid schema_idx: {} (pool size: {})",
proto_task.schema_idx,
schema_cache.len()
))
})?,
);

let data_file_format = iceberg::spec::DataFileFormat::Parquet;

let deletes = if let Some(idx) = proto_task.delete_files_idx {
delete_files_cache
.get(idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid delete_files_idx: {} (pool size: {})",
idx,
delete_files_cache.len()
))
})?
.clone()
} else {
vec![]
};

let bound_predicate = if let Some(idx) = proto_task.residual_idx {
proto_scan
.residual_pool
.get(idx as usize)
.and_then(convert_spark_expr_to_predicate)
.map(
|pred| -> Result<iceberg::expr::BoundPredicate, ExecutionError> {
pred.bind(Arc::clone(&schema_ref), true).map_err(|e| {
ExecutionError::GeneralError(format!(
"Failed to bind predicate to schema: {}",
e
))
})
},
)
.transpose()?
} else {
None
};

let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| {
ExecutionError::GeneralError(
"partition_type_idx is required when partition_data_idx is present"
.to_string(),
)
})?;

let partition_data_value = partition_data_cache
.get(partition_data_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid partition_data_idx: {} (cache size: {})",
partition_data_idx,
partition_data_cache.len()
))
})?;

let partition_data_value: serde_json::Value = serde_json::from_str(partition_json)
.map_err(|e| {
let partition_type = partition_type_cache
.get(partition_type_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Failed to parse partition data JSON: {}",
e
"Invalid partition_type_idx: {} (cache size: {})",
partition_type_idx,
partition_type_cache.len()
))
})?;

match iceberg::spec::Literal::try_from_json(
partition_data_value,
&iceberg::spec::Type::Struct(partition_type),
partition_data_value.clone(),
&iceberg::spec::Type::Struct(partition_type.clone()),
) {
Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
Ok(None) => None,
Expand All @@ -2786,28 +2877,28 @@ fn parse_file_scan_tasks(
None
};

let partition_spec = if let Some(partition_spec_json) =
proto_task.partition_spec_json.as_ref()
{
// Try to parse partition spec, but gracefully handle unknown transforms
// for forward compatibility (e.g., TestForwardCompatibility tests)
match serde_json::from_str::<iceberg::spec::PartitionSpec>(partition_spec_json) {
Ok(spec) => Some(Arc::new(spec)),
Err(_) => None,
}
} else {
None
};

let name_mapping = if let Some(name_mapping_json) = proto_task.name_mapping_json.as_ref()
{
match serde_json::from_str::<iceberg::spec::NameMapping>(name_mapping_json) {
Ok(mapping) => Some(Arc::new(mapping)),
Err(_) => None, // Name mapping is optional
}
} else {
None
};
let partition_spec = proto_task
.partition_spec_idx
.and_then(|idx| partition_spec_cache.get(idx as usize))
.and_then(|opt| opt.clone());

let name_mapping = proto_task
.name_mapping_idx
.and_then(|idx| name_mapping_cache.get(idx as usize))
.and_then(|opt| opt.clone());

let project_field_ids = proto_scan
.project_field_ids_pool
.get(proto_task.project_field_ids_idx as usize)
.ok_or_else(|| {
ExecutionError::GeneralError(format!(
"Invalid project_field_ids_idx: {} (pool size: {})",
proto_task.project_field_ids_idx,
proto_scan.project_field_ids_pool.len()
))
})?
.field_ids
.clone();

Ok(iceberg::scan::FileScanTask {
data_file_path: proto_task.data_file_path.clone(),
Expand All @@ -2816,7 +2907,7 @@ fn parse_file_scan_tasks(
record_count: proto_task.record_count,
data_file_format,
schema: schema_ref,
project_field_ids: proto_task.project_field_ids.clone(),
project_field_ids,
predicate: bound_predicate,
deletes,
partition,
Expand Down
65 changes: 29 additions & 36 deletions native/proto/src/proto/operator.proto
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,26 @@ message IcebergScan {

// Table metadata file path for FileIO initialization
string metadata_location = 4;

// Deduplication pools - shared data referenced by index from tasks
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

repeated string schema_pool = 5;
repeated string partition_type_pool = 6;
repeated string partition_spec_pool = 7;
repeated string name_mapping_pool = 8;
repeated ProjectFieldIdList project_field_ids_pool = 9;
repeated string partition_data_pool = 10;
repeated DeleteFileList delete_files_pool = 11;
repeated spark.spark_expression.Expr residual_pool = 12;
}

// Helper message for deduplicating field ID lists
message ProjectFieldIdList {
repeated int32 field_ids = 1;
}

// Helper message for deduplicating delete file lists
message DeleteFileList {
repeated IcebergDeleteFile delete_files = 1;
}

// Groups FileScanTasks for a single Spark partition
Expand All @@ -141,42 +161,15 @@ message IcebergFileScanTask {
// Record count if reading entire file
optional uint64 record_count = 4;

// File format (PARQUET, AVRO, or ORC)
string data_file_format = 5;

// File schema as JSON (may differ due to schema evolution)
string schema_json = 6;

// Field IDs to project
repeated int32 project_field_ids = 7;

// Delete files for MOR tables
repeated IcebergDeleteFile delete_files = 8;

// Residual filter after partition pruning (applied at row-group level)
// Example: if scan filter is "date >= '2024-01-01' AND status = 'active'"
// and file partition is date='2024-06-15', residual is "status = 'active'"
optional spark.spark_expression.Expr residual = 9;

// Partition data from manifest entry (for proper constant identification)
// Serialized as JSON to represent the Struct of partition values
optional string partition_data_json = 10;

// Partition type schema as JSON (Iceberg StructType for partition fields)
// Used to deserialize partition_data_json into proper Iceberg types
optional string partition_type_json = 12;

// Partition spec as JSON (entire PartitionSpec object)
// Used to determine which partition fields are identity-transformed (constants)
// The spec includes spec-id embedded in the JSON.
optional string partition_spec_json = 13;

// Name mapping from table metadata (property: schema.name-mapping.default)
// Used to resolve field IDs from column names when Parquet files lack field IDs
// or have field ID conflicts (e.g., Hive table migrations via add_files).
// Per Iceberg spec rule #2: "Use schema.name-mapping.default metadata to map
// field id to columns without field id".
optional string name_mapping_json = 14;
// Indices into IcebergScan deduplication pools
uint32 schema_idx = 15;
optional uint32 partition_type_idx = 16;
optional uint32 partition_spec_idx = 17;
optional uint32 name_mapping_idx = 18;
uint32 project_field_ids_idx = 19;
optional uint32 partition_data_idx = 20;
optional uint32 delete_files_idx = 21;
optional uint32 residual_idx = 22;
}

// Iceberg delete file for MOR tables (positional or equality deletes)
Expand Down
Loading
Loading