Skip to content

Commit 077005c

Browse files
authored
perf: [iceberg] Use protobuf instead of JSON to serialize Iceberg partition values (apache#3247)
* perf: Use protobuf instead of JSON to serialize Iceberg partition values
1 parent 64ebfcc commit 077005c

File tree

4 files changed

+326
-251
lines changed

4 files changed

+326
-251
lines changed

native/core/src/execution/planner.rs

Lines changed: 108 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -2629,6 +2629,103 @@ fn convert_spark_types_to_arrow_schema(
26292629
arrow_schema
26302630
}
26312631

2632+
/// Converts a protobuf PartitionValue to an iceberg Literal.
2633+
///
2634+
fn partition_value_to_literal(
2635+
proto_value: &spark_operator::PartitionValue,
2636+
) -> Result<Option<iceberg::spec::Literal>, ExecutionError> {
2637+
use spark_operator::partition_value::Value;
2638+
2639+
if proto_value.is_null {
2640+
return Ok(None);
2641+
}
2642+
2643+
let literal = match &proto_value.value {
2644+
Some(Value::IntVal(v)) => iceberg::spec::Literal::int(*v),
2645+
Some(Value::LongVal(v)) => iceberg::spec::Literal::long(*v),
2646+
Some(Value::DateVal(v)) => {
2647+
// Convert i64 to i32 for date (days since epoch)
2648+
let days = (*v)
2649+
.try_into()
2650+
.map_err(|_| GeneralError(format!("Date value out of range: {}", v)))?;
2651+
iceberg::spec::Literal::date(days)
2652+
}
2653+
Some(Value::TimestampVal(v)) => iceberg::spec::Literal::timestamp(*v),
2654+
Some(Value::TimestampTzVal(v)) => iceberg::spec::Literal::timestamptz(*v),
2655+
Some(Value::StringVal(s)) => iceberg::spec::Literal::string(s.clone()),
2656+
Some(Value::DoubleVal(v)) => iceberg::spec::Literal::double(*v),
2657+
Some(Value::FloatVal(v)) => iceberg::spec::Literal::float(*v),
2658+
Some(Value::DecimalVal(bytes)) => {
2659+
// Deserialize unscaled BigInteger bytes to i128
2660+
// BigInteger is serialized as signed big-endian bytes
2661+
if bytes.len() > 16 {
2662+
return Err(GeneralError(format!(
2663+
"Decimal bytes too large: {} bytes (max 16 for i128)",
2664+
bytes.len()
2665+
)));
2666+
}
2667+
2668+
// Convert big-endian bytes to i128
2669+
let mut buf = [0u8; 16];
2670+
let offset = 16 - bytes.len();
2671+
buf[offset..].copy_from_slice(bytes);
2672+
2673+
// Handle sign extension for negative numbers
2674+
let value = if !bytes.is_empty() && (bytes[0] & 0x80) != 0 {
2675+
// Negative number - sign extend
2676+
for byte in buf.iter_mut().take(offset) {
2677+
*byte = 0xFF;
2678+
}
2679+
i128::from_be_bytes(buf)
2680+
} else {
2681+
// Positive number
2682+
i128::from_be_bytes(buf)
2683+
};
2684+
2685+
iceberg::spec::Literal::decimal(value)
2686+
}
2687+
Some(Value::BoolVal(v)) => iceberg::spec::Literal::bool(*v),
2688+
Some(Value::UuidVal(bytes)) => {
2689+
// Deserialize UUID from 16 bytes
2690+
if bytes.len() != 16 {
2691+
return Err(GeneralError(format!(
2692+
"Invalid UUID bytes length: {} (expected 16)",
2693+
bytes.len()
2694+
)));
2695+
}
2696+
let uuid = uuid::Uuid::from_slice(bytes)
2697+
.map_err(|e| GeneralError(format!("Failed to parse UUID: {}", e)))?;
2698+
iceberg::spec::Literal::uuid(uuid)
2699+
}
2700+
Some(Value::FixedVal(bytes)) => iceberg::spec::Literal::fixed(bytes.to_vec()),
2701+
Some(Value::BinaryVal(bytes)) => iceberg::spec::Literal::binary(bytes.to_vec()),
2702+
None => {
2703+
return Err(GeneralError(
2704+
"PartitionValue has no value set and is_null is false".to_string(),
2705+
));
2706+
}
2707+
};
2708+
2709+
Ok(Some(literal))
2710+
}
2711+
2712+
/// Converts a protobuf PartitionData to an iceberg Struct.
2713+
///
2714+
/// Uses the existing Struct::from_iter() API from iceberg-rust to construct the struct
2715+
/// from the list of partition values.
2716+
/// This can potentially be upstreamed to iceberg_rust
2717+
fn partition_data_to_struct(
2718+
proto_partition: &spark_operator::PartitionData,
2719+
) -> Result<iceberg::spec::Struct, ExecutionError> {
2720+
let literals: Vec<Option<iceberg::spec::Literal>> = proto_partition
2721+
.values
2722+
.iter()
2723+
.map(partition_value_to_literal)
2724+
.collect::<Result<Vec<_>, _>>()?;
2725+
2726+
Ok(iceberg::spec::Struct::from_iter(literals))
2727+
}
2728+
26322729
/// Converts protobuf FileScanTasks from Scala into iceberg-rust FileScanTask objects.
26332730
///
26342731
/// Each task contains a residual predicate that is used for row-group level filtering
@@ -2655,19 +2752,6 @@ fn parse_file_scan_tasks(
26552752
})
26562753
.collect::<Result<Vec<_>, _>>()?;
26572754

2658-
let partition_type_cache: Vec<iceberg::spec::StructType> = proto_scan
2659-
.partition_type_pool
2660-
.iter()
2661-
.map(|json| {
2662-
serde_json::from_str(json).map_err(|e| {
2663-
ExecutionError::GeneralError(format!(
2664-
"Failed to parse partition type JSON from pool: {}",
2665-
e
2666-
))
2667-
})
2668-
})
2669-
.collect::<Result<Vec<_>, _>>()?;
2670-
26712755
let partition_spec_cache: Vec<Option<Arc<iceberg::spec::PartitionSpec>>> = proto_scan
26722756
.partition_spec_pool
26732757
.iter()
@@ -2721,19 +2805,7 @@ fn parse_file_scan_tasks(
27212805
})
27222806
.collect::<Result<Vec<_>, _>>()?;
27232807

2724-
let partition_data_cache: Vec<serde_json::Value> = proto_scan
2725-
.partition_data_pool
2726-
.iter()
2727-
.map(|json| {
2728-
serde_json::from_str(json).map_err(|e| {
2729-
ExecutionError::GeneralError(format!(
2730-
"Failed to parse partition data JSON from pool: {}",
2731-
e
2732-
))
2733-
})
2734-
})
2735-
.collect::<Result<Vec<_>, _>>()?;
2736-
2808+
// Partition data pool is in protobuf messages
27372809
let results: Result<Vec<_>, _> = proto_tasks
27382810
.iter()
27392811
.map(|proto_task| {
@@ -2787,48 +2859,24 @@ fn parse_file_scan_tasks(
27872859
};
27882860

27892861
let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx {
2790-
let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| {
2791-
ExecutionError::GeneralError(
2792-
"partition_type_idx is required when partition_data_idx is present"
2793-
.to_string(),
2794-
)
2795-
})?;
2796-
2797-
let partition_data_value = partition_data_cache
2862+
// Get partition data from protobuf pool
2863+
let partition_data_proto = proto_scan
2864+
.partition_data_pool
27982865
.get(partition_data_idx as usize)
27992866
.ok_or_else(|| {
28002867
ExecutionError::GeneralError(format!(
2801-
"Invalid partition_data_idx: {} (cache size: {})",
2868+
"Invalid partition_data_idx: {} (pool size: {})",
28022869
partition_data_idx,
2803-
partition_data_cache.len()
2870+
proto_scan.partition_data_pool.len()
28042871
))
28052872
})?;
28062873

2807-
let partition_type = partition_type_cache
2808-
.get(partition_type_idx as usize)
2809-
.ok_or_else(|| {
2810-
ExecutionError::GeneralError(format!(
2811-
"Invalid partition_type_idx: {} (cache size: {})",
2812-
partition_type_idx,
2813-
partition_type_cache.len()
2814-
))
2815-
})?;
2816-
2817-
match iceberg::spec::Literal::try_from_json(
2818-
partition_data_value.clone(),
2819-
&iceberg::spec::Type::Struct(partition_type.clone()),
2820-
) {
2821-
Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s),
2822-
Ok(None) => None,
2823-
Ok(other) => {
2824-
return Err(GeneralError(format!(
2825-
"Expected struct literal for partition data, got: {:?}",
2826-
other
2827-
)))
2828-
}
2874+
// Convert protobuf PartitionData to iceberg Struct
2875+
match partition_data_to_struct(partition_data_proto) {
2876+
Ok(s) => Some(s),
28292877
Err(e) => {
2830-
return Err(GeneralError(format!(
2831-
"Failed to deserialize partition data from JSON: {}",
2878+
return Err(ExecutionError::GeneralError(format!(
2879+
"Failed to deserialize partition data from protobuf: {}",
28322880
e
28332881
)))
28342882
}

native/proto/src/proto/operator.proto

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,32 @@ message CsvOptions {
130130
bool truncated_rows = 8;
131131
}
132132

133+
// Partition value for Iceberg partition data
134+
message PartitionValue {
135+
int32 field_id = 1;
136+
oneof value {
137+
int32 int_val = 2;
138+
int64 long_val = 3;
139+
int64 date_val = 4; // days since epoch
140+
int64 timestamp_val = 5; // microseconds since epoch
141+
int64 timestamp_tz_val = 6; // microseconds with timezone
142+
string string_val = 7;
143+
double double_val = 8;
144+
float float_val = 9;
145+
bytes decimal_val = 10; // unscaled BigInteger bytes
146+
bool bool_val = 11;
147+
bytes uuid_val = 12;
148+
bytes fixed_val = 13;
149+
bytes binary_val = 14;
150+
}
151+
bool is_null = 15;
152+
}
153+
154+
// Collection of partition values for a single partition
155+
message PartitionData {
156+
repeated PartitionValue values = 1;
157+
}
158+
133159
message IcebergScan {
134160
// Schema to read
135161
repeated SparkStructField required_schema = 1;
@@ -149,7 +175,7 @@ message IcebergScan {
149175
repeated string partition_spec_pool = 7;
150176
repeated string name_mapping_pool = 8;
151177
repeated ProjectFieldIdList project_field_ids_pool = 9;
152-
repeated string partition_data_pool = 10;
178+
repeated PartitionData partition_data_pool = 10;
153179
repeated DeleteFileList delete_files_pool = 11;
154180
repeated spark.spark_expression.Expr residual_pool = 12;
155181
}

0 commit comments

Comments
 (0)