Skip to content

Commit 38c3369

Browse files
committed
Pass table master key ID to native scan
1 parent 57780bc commit 38c3369

File tree

4 files changed

+23
-2
lines changed

4 files changed

+23
-2
lines changed

native/core/src/execution/operators/iceberg_scan.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,8 @@ pub struct IcebergScanExec {
6262
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
6363
/// Metrics
6464
metrics: ExecutionPlanMetricsSet,
65+
/// Optional master key ID for encrypted Iceberg table
66+
table_master_key: Option<String>,
6567
}
6668

6769
impl IcebergScanExec {
@@ -70,6 +72,7 @@ impl IcebergScanExec {
7072
schema: SchemaRef,
7173
catalog_properties: HashMap<String, String>,
7274
file_task_groups: Vec<Vec<iceberg::scan::FileScanTask>>,
75+
table_master_key: Option<String>,
7376
) -> Result<Self, ExecutionError> {
7477
let output_schema = schema;
7578
let num_partitions = file_task_groups.len();
@@ -84,6 +87,7 @@ impl IcebergScanExec {
8487
catalog_properties,
8588
file_task_groups,
8689
metrics,
90+
table_master_key,
8791
})
8892
}
8993

@@ -166,6 +170,7 @@ impl IcebergScanExec {
166170

167171
let task_stream = futures::stream::iter(tasks.into_iter().map(Ok)).boxed();
168172

173+
// TODO: pass table master key to Iceberg Rust ArrowReaderBuilder
169174
let reader = iceberg::arrow::ArrowReaderBuilder::new(file_io)
170175
.with_batch_size(batch_size)
171176
.with_data_file_concurrency_limit(context.session_config().target_partitions())

native/core/src/execution/planner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1180,6 +1180,7 @@ impl PhysicalPlanner {
11801180
required_schema,
11811181
catalog_properties,
11821182
file_task_groups,
1183+
scan.table_master_key.clone(),
11831184
)?;
11841185

11851186
Ok((

native/proto/src/proto/operator.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ message IcebergScan {
132132
repeated string partition_data_pool = 10;
133133
repeated DeleteFileList delete_files_pool = 11;
134134
repeated spark.spark_expression.Expr residual_pool = 12;
135+
optional string table_master_key = 13;
135136
}
136137

137138
// Helper message for deduplicating field ID lists

spark/src/main/scala/org/apache/comet/iceberg/IcebergReflection.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,8 @@ object IcebergReflection extends Logging {
729729
* Mapping from column names to Iceberg field IDs (built from scanSchema)
730730
* @param catalogProperties
731731
* Catalog properties for FileIO (S3 credentials, regions, etc.)
732+
* @param tableMasterKey
733+
* Optional table master key for encrypted table
732734
*/
733735
case class CometIcebergNativeScanMetadata(
734736
table: Any,
@@ -739,7 +741,8 @@ case class CometIcebergNativeScanMetadata(
739741
tableSchema: Any,
740742
globalFieldIdMapping: Map[String, Int],
741743
catalogProperties: Map[String, String],
742-
fileFormat: String)
744+
fileFormat: String,
745+
tableMasterKey: Option[String])
743746

744747
object CometIcebergNativeScanMetadata extends Logging {
745748

@@ -780,6 +783,16 @@ object CometIcebergNativeScanMetadata extends Logging {
780783
}
781784
}
782785

786+
// tableMasterKey is optional
787+
val tableMasterKey = getTableProperties(table).flatMap { properties =>
788+
val masterKey = "encryption.key-id"
789+
if (properties.containsKey(masterKey)) {
790+
Some(properties.get(masterKey))
791+
} else {
792+
None
793+
}
794+
}
795+
783796
val globalFieldIdMapping = buildFieldIdMapping(scanSchema)
784797

785798
// File format is always PARQUET,
@@ -796,7 +809,8 @@ object CometIcebergNativeScanMetadata extends Logging {
796809
tableSchema = tableSchema,
797810
globalFieldIdMapping = globalFieldIdMapping,
798811
catalogProperties = catalogProperties,
799-
fileFormat = fileFormat)
812+
fileFormat = fileFormat,
813+
tableMasterKey = tableMasterKey)
800814
}
801815
}
802816
}

0 commit comments

Comments
 (0)