diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 67b2523be3..72e0a662e6 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -1202,6 +1202,7 @@ impl PhysicalPlanner { ); let tasks = parse_file_scan_tasks( + scan, &scan.file_partitions[self.partition as usize].file_scan_tasks, )?; let file_task_groups = vec![tasks]; @@ -2672,29 +2673,66 @@ fn convert_spark_types_to_arrow_schema( /// /// Each task contains a residual predicate that is used for row-group level filtering /// during Parquet scanning. +/// +/// This function uses deduplication pools from the IcebergScan to avoid redundant parsing +/// of schemas, partition specs, partition types, name mappings, and other repeated data. fn parse_file_scan_tasks( + proto_scan: &spark_operator::IcebergScan, proto_tasks: &[spark_operator::IcebergFileScanTask], ) -> Result, ExecutionError> { - let results: Result, _> = proto_tasks + // Build caches upfront: for 10K tasks with 1 schema, this parses the schema + // once instead of 10K times, eliminating redundant JSON deserialization + let schema_cache: Vec> = proto_scan + .schema_pool .iter() - .map(|proto_task| { - let schema: iceberg::spec::Schema = serde_json::from_str(&proto_task.schema_json) - .map_err(|e| { - ExecutionError::GeneralError(format!("Failed to parse schema JSON: {}", e)) - })?; + .map(|json| { + serde_json::from_str(json).map(Arc::new).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to parse schema JSON from pool: {}", + e + )) + }) + }) + .collect::, _>>()?; - let schema_ref = Arc::new(schema); + let partition_type_cache: Vec = proto_scan + .partition_type_pool + .iter() + .map(|json| { + serde_json::from_str(json).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to parse partition type JSON from pool: {}", + e + )) + }) + }) + .collect::, _>>()?; - // CometScanRule validates format before serialization - debug_assert_eq!( - proto_task.data_file_format.as_str(), - "PARQUET", - "Only PARQUET format is supported. This indicates a bug in CometScanRule validation." - ); - let data_file_format = iceberg::spec::DataFileFormat::Parquet; + let partition_spec_cache: Vec>> = proto_scan + .partition_spec_pool + .iter() + .map(|json| { + serde_json::from_str::(json) + .ok() + .map(Arc::new) + }) + .collect(); - let deletes: Vec = proto_task - .delete_files + let name_mapping_cache: Vec>> = proto_scan + .name_mapping_pool + .iter() + .map(|json| { + serde_json::from_str::(json) + .ok() + .map(Arc::new) + }) + .collect(); + + let delete_files_cache: Vec> = proto_scan + .delete_files_pool + .iter() + .map(|list| { + list.delete_files .iter() .map(|del| { let file_type = match del.content_type.as_str() { @@ -2719,53 +2757,106 @@ fn parse_file_scan_tasks( }, }) }) - .collect::, ExecutionError>>()?; + .collect::, ExecutionError>>() + }) + .collect::, _>>()?; - // Residuals are serialized with binding=false (name-based references). - // Convert to Iceberg predicate and bind to this file's schema for row-group filtering. - let bound_predicate = proto_task - .residual - .as_ref() - .and_then(|residual_expr| { - convert_spark_expr_to_predicate(residual_expr) - }) - .map( - |pred| -> Result { - let bound = pred.bind(Arc::clone(&schema_ref), true).map_err(|e| { - ExecutionError::GeneralError(format!( - "Failed to bind predicate to schema: {}", - e - )) - })?; - - Ok(bound) - }, - ) - .transpose()?; - - let partition = if let (Some(partition_json), Some(partition_type_json)) = ( - proto_task.partition_data_json.as_ref(), - proto_task.partition_type_json.as_ref(), - ) { - let partition_type: iceberg::spec::StructType = - serde_json::from_str(partition_type_json).map_err(|e| { + let partition_data_cache: Vec = proto_scan + .partition_data_pool + .iter() + .map(|json| { + serde_json::from_str(json).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to parse partition data JSON from pool: {}", + e + )) + }) + }) + .collect::, _>>()?; + + let results: Result, _> = proto_tasks + .iter() + .map(|proto_task| { + let schema_ref = Arc::clone( + schema_cache + .get(proto_task.schema_idx as usize) + .ok_or_else(|| { ExecutionError::GeneralError(format!( - "Failed to parse partition type JSON: {}", - e + "Invalid schema_idx: {} (pool size: {})", + proto_task.schema_idx, + schema_cache.len() + )) + })?, + ); + + let data_file_format = iceberg::spec::DataFileFormat::Parquet; + + let deletes = if let Some(idx) = proto_task.delete_files_idx { + delete_files_cache + .get(idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid delete_files_idx: {} (pool size: {})", + idx, + delete_files_cache.len() + )) + })? + .clone() + } else { + vec![] + }; + + let bound_predicate = if let Some(idx) = proto_task.residual_idx { + proto_scan + .residual_pool + .get(idx as usize) + .and_then(convert_spark_expr_to_predicate) + .map( + |pred| -> Result { + pred.bind(Arc::clone(&schema_ref), true).map_err(|e| { + ExecutionError::GeneralError(format!( + "Failed to bind predicate to schema: {}", + e + )) + }) + }, + ) + .transpose()? + } else { + None + }; + + let partition = if let Some(partition_data_idx) = proto_task.partition_data_idx { + let partition_type_idx = proto_task.partition_type_idx.ok_or_else(|| { + ExecutionError::GeneralError( + "partition_type_idx is required when partition_data_idx is present" + .to_string(), + ) + })?; + + let partition_data_value = partition_data_cache + .get(partition_data_idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid partition_data_idx: {} (cache size: {})", + partition_data_idx, + partition_data_cache.len() )) })?; - let partition_data_value: serde_json::Value = serde_json::from_str(partition_json) - .map_err(|e| { + let partition_type = partition_type_cache + .get(partition_type_idx as usize) + .ok_or_else(|| { ExecutionError::GeneralError(format!( - "Failed to parse partition data JSON: {}", - e + "Invalid partition_type_idx: {} (cache size: {})", + partition_type_idx, + partition_type_cache.len() )) })?; match iceberg::spec::Literal::try_from_json( - partition_data_value, - &iceberg::spec::Type::Struct(partition_type), + partition_data_value.clone(), + &iceberg::spec::Type::Struct(partition_type.clone()), ) { Ok(Some(iceberg::spec::Literal::Struct(s))) => Some(s), Ok(None) => None, @@ -2786,28 +2877,28 @@ fn parse_file_scan_tasks( None }; - let partition_spec = if let Some(partition_spec_json) = - proto_task.partition_spec_json.as_ref() - { - // Try to parse partition spec, but gracefully handle unknown transforms - // for forward compatibility (e.g., TestForwardCompatibility tests) - match serde_json::from_str::(partition_spec_json) { - Ok(spec) => Some(Arc::new(spec)), - Err(_) => None, - } - } else { - None - }; - - let name_mapping = if let Some(name_mapping_json) = proto_task.name_mapping_json.as_ref() - { - match serde_json::from_str::(name_mapping_json) { - Ok(mapping) => Some(Arc::new(mapping)), - Err(_) => None, // Name mapping is optional - } - } else { - None - }; + let partition_spec = proto_task + .partition_spec_idx + .and_then(|idx| partition_spec_cache.get(idx as usize)) + .and_then(|opt| opt.clone()); + + let name_mapping = proto_task + .name_mapping_idx + .and_then(|idx| name_mapping_cache.get(idx as usize)) + .and_then(|opt| opt.clone()); + + let project_field_ids = proto_scan + .project_field_ids_pool + .get(proto_task.project_field_ids_idx as usize) + .ok_or_else(|| { + ExecutionError::GeneralError(format!( + "Invalid project_field_ids_idx: {} (pool size: {})", + proto_task.project_field_ids_idx, + proto_scan.project_field_ids_pool.len() + )) + })? + .field_ids + .clone(); Ok(iceberg::scan::FileScanTask { data_file_path: proto_task.data_file_path.clone(), @@ -2816,7 +2907,7 @@ fn parse_file_scan_tasks( record_count: proto_task.record_count, data_file_format, schema: schema_ref, - project_field_ids: proto_task.project_field_ids.clone(), + project_field_ids, predicate: bound_predicate, deletes, partition, diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index f30c503f48..015b5d96b6 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -122,6 +122,26 @@ message IcebergScan { // Table metadata file path for FileIO initialization string metadata_location = 4; + + // Deduplication pools - shared data referenced by index from tasks + repeated string schema_pool = 5; + repeated string partition_type_pool = 6; + repeated string partition_spec_pool = 7; + repeated string name_mapping_pool = 8; + repeated ProjectFieldIdList project_field_ids_pool = 9; + repeated string partition_data_pool = 10; + repeated DeleteFileList delete_files_pool = 11; + repeated spark.spark_expression.Expr residual_pool = 12; +} + +// Helper message for deduplicating field ID lists +message ProjectFieldIdList { + repeated int32 field_ids = 1; +} + +// Helper message for deduplicating delete file lists +message DeleteFileList { + repeated IcebergDeleteFile delete_files = 1; } // Groups FileScanTasks for a single Spark partition @@ -141,42 +161,15 @@ message IcebergFileScanTask { // Record count if reading entire file optional uint64 record_count = 4; - // File format (PARQUET, AVRO, or ORC) - string data_file_format = 5; - - // File schema as JSON (may differ due to schema evolution) - string schema_json = 6; - - // Field IDs to project - repeated int32 project_field_ids = 7; - - // Delete files for MOR tables - repeated IcebergDeleteFile delete_files = 8; - - // Residual filter after partition pruning (applied at row-group level) - // Example: if scan filter is "date >= '2024-01-01' AND status = 'active'" - // and file partition is date='2024-06-15', residual is "status = 'active'" - optional spark.spark_expression.Expr residual = 9; - - // Partition data from manifest entry (for proper constant identification) - // Serialized as JSON to represent the Struct of partition values - optional string partition_data_json = 10; - - // Partition type schema as JSON (Iceberg StructType for partition fields) - // Used to deserialize partition_data_json into proper Iceberg types - optional string partition_type_json = 12; - - // Partition spec as JSON (entire PartitionSpec object) - // Used to determine which partition fields are identity-transformed (constants) - // The spec includes spec-id embedded in the JSON. - optional string partition_spec_json = 13; - - // Name mapping from table metadata (property: schema.name-mapping.default) - // Used to resolve field IDs from column names when Parquet files lack field IDs - // or have field ID conflicts (e.g., Hive table migrations via add_files). - // Per Iceberg spec rule #2: "Use schema.name-mapping.default metadata to map - // field id to columns without field id". - optional string name_mapping_json = 14; + // Indices into IcebergScan deduplication pools + uint32 schema_idx = 15; + optional uint32 partition_type_idx = 16; + optional uint32 partition_spec_idx = 17; + optional uint32 name_mapping_idx = 18; + uint32 project_field_ids_idx = 19; + optional uint32 partition_data_idx = 20; + optional uint32 delete_files_idx = 21; + optional uint32 residual_idx = 22; } // Iceberg delete file for MOR tables (positional or equality deletes) diff --git a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala index db94f53846..dc7df531f6 100644 --- a/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala +++ b/spark/src/main/scala/org/apache/comet/serde/operator/CometIcebergNativeScan.scala @@ -19,6 +19,7 @@ package org.apache.comet.serde.operator +import scala.collection.mutable import scala.jdk.CollectionConverters._ import org.json4s._ @@ -33,6 +34,7 @@ import org.apache.spark.sql.types._ import org.apache.comet.ConfigEntry import org.apache.comet.iceberg.IcebergReflection import org.apache.comet.serde.{CometOperatorSerde, OperatorOuterClass} +import org.apache.comet.serde.ExprOuterClass.Expr import org.apache.comet.serde.OperatorOuterClass.{Operator, SparkStructField} import org.apache.comet.serde.QueryPlanSerde.{exprToProto, serializeDataType} @@ -165,17 +167,12 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit } /** - * Serializes delete files from an Iceberg FileScanTask. - * - * Extracts delete files (position deletes and equality deletes) from a FileScanTask and adds - * them to the task builder. Delete files are used for Iceberg's merge-on-read approach where - * updates and deletes are stored separately from data files. + * Extracts delete files from an Iceberg FileScanTask as a list (for deduplication). */ - private def serializeDeleteFiles( + private def extractDeleteFilesList( task: Any, contentFileClass: Class[_], - fileScanTaskClass: Class[_], - taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder): Unit = { + fileScanTaskClass: Class[_]): Seq[OperatorOuterClass.IcebergDeleteFile] = { try { // scalastyle:off classforname val deleteFileClass = Class.forName(IcebergReflection.ClassNames.DELETE_FILE) @@ -183,11 +180,11 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val deletes = IcebergReflection.getDeleteFilesFromTask(task, fileScanTaskClass) - deletes.asScala.foreach { deleteFile => + deletes.asScala.flatMap { deleteFile => try { IcebergReflection .extractFileLocation(contentFileClass, deleteFile) - .foreach { deletePath => + .map { deletePath => val deleteBuilder = OperatorOuterClass.IcebergDeleteFile.newBuilder() deleteBuilder.setFilePath(deletePath) @@ -230,13 +227,14 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit case _: Exception => } - taskBuilder.addDeleteFiles(deleteBuilder.build()) + deleteBuilder.build() } } catch { case e: Exception => logWarning(s"Failed to serialize delete file: ${e.getMessage}") + None } - } + }.toSeq } catch { case e: Exception => val msg = @@ -259,13 +257,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit task: Any, contentScanTaskClass: Class[_], fileScanTaskClass: Class[_], - taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder): Unit = { + taskBuilder: OperatorOuterClass.IcebergFileScanTask.Builder, + icebergScanBuilder: OperatorOuterClass.IcebergScan.Builder, + partitionTypeToPoolIndex: mutable.HashMap[String, Int], + partitionSpecToPoolIndex: mutable.HashMap[String, Int], + partitionDataToPoolIndex: mutable.HashMap[String, Int]): Unit = { try { val specMethod = fileScanTaskClass.getMethod("spec") val spec = specMethod.invoke(task) if (spec != null) { - // Serialize the entire PartitionSpec to JSON (includes spec-id) + // Deduplicate partition spec try { // scalastyle:off classforname val partitionSpecParserClass = @@ -277,7 +279,14 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit val partitionSpecJson = toJsonMethod .invoke(null, spec) .asInstanceOf[String] - taskBuilder.setPartitionSpecJson(partitionSpecJson) + + val specIdx = partitionSpecToPoolIndex.getOrElseUpdate( + partitionSpecJson, { + val idx = partitionSpecToPoolIndex.size + icebergScanBuilder.addPartitionSpecPool(partitionSpecJson) + idx + }) + taskBuilder.setPartitionSpecIdx(specIdx) } catch { case e: Exception => logWarning(s"Failed to serialize partition spec to JSON: ${e.getMessage}") @@ -352,7 +361,13 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit ("type" -> "struct") ~ ("fields" -> fieldsJson))) - taskBuilder.setPartitionTypeJson(partitionTypeJson) + val typeIdx = partitionTypeToPoolIndex.getOrElseUpdate( + partitionTypeJson, { + val idx = partitionTypeToPoolIndex.size + icebergScanBuilder.addPartitionTypePool(partitionTypeJson) + idx + }) + taskBuilder.setPartitionTypeIdx(typeIdx) } } catch { case e: Exception => @@ -399,7 +414,14 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Only serialize partition data if we have non-unknown fields if (partitionDataMap.nonEmpty) { val partitionJson = compact(render(JObject(partitionDataMap.toList))) - taskBuilder.setPartitionDataJson(partitionJson) + + val partitionDataIdx = partitionDataToPoolIndex.getOrElseUpdate( + partitionJson, { + val idx = partitionDataToPoolIndex.size + icebergScanBuilder.addPartitionDataPool(partitionJson) + idx + }) + taskBuilder.setPartitionDataIdx(partitionDataIdx) } } } @@ -609,6 +631,19 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit childOp: Operator*): Option[OperatorOuterClass.Operator] = { val icebergScanBuilder = OperatorOuterClass.IcebergScan.newBuilder() + // Deduplication structures - map unique values to pool indices + val schemaToPoolIndex = mutable.HashMap[AnyRef, Int]() + val partitionTypeToPoolIndex = mutable.HashMap[String, Int]() + val partitionSpecToPoolIndex = mutable.HashMap[String, Int]() + val nameMappingToPoolIndex = mutable.HashMap[String, Int]() + val projectFieldIdsToPoolIndex = mutable.HashMap[Seq[Int], Int]() + val partitionDataToPoolIndex = mutable.HashMap[String, Int]() + val deleteFilesToPoolIndex = + mutable.HashMap[Seq[OperatorOuterClass.IcebergDeleteFile], Int]() + val residualToPoolIndex = mutable.HashMap[Option[Expr], Int]() + + var totalTasks = 0 + // Get pre-extracted metadata from planning phase // If metadata is None, this is a programming error - metadata should have been extracted // in CometScanRule before creating CometBatchScanExec @@ -662,6 +697,8 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit tasksMethod.invoke(taskGroup).asInstanceOf[java.util.Collection[_]] tasksCollection.asScala.foreach { task => + totalTasks += 1 + try { val taskBuilder = OperatorOuterClass.IcebergFileScanTask.newBuilder() @@ -859,9 +896,17 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // scalastyle:on classforname val toJsonMethod = schemaParserClass.getMethod("toJson", schemaClass) toJsonMethod.setAccessible(true) - val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] - taskBuilder.setSchemaJson(schemaJson) + // Use object identity for deduplication: Iceberg Schema objects are immutable + // and reused across tasks, making identity-based deduplication safe + val schemaIdx = schemaToPoolIndex.getOrElseUpdate( + schema, { + val idx = schemaToPoolIndex.size + val schemaJson = toJsonMethod.invoke(null, schema).asInstanceOf[String] + icebergScanBuilder.addSchemaPool(schemaJson) + idx + }) + taskBuilder.setSchemaIdx(schemaIdx) // Build field ID mapping from the schema we're using val nameToFieldId = IcebergReflection.buildFieldIdMapping(schema) @@ -869,20 +914,28 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit // Extract project_field_ids for scan.output columns. // For schema evolution: try task schema first, then fall back to // global scan schema (pre-extracted in metadata). - scan.output.foreach { attr => - val fieldId = nameToFieldId + val projectFieldIds = scan.output.flatMap { attr => + nameToFieldId .get(attr.name) .orElse(metadata.globalFieldIdMapping.get(attr.name)) - - fieldId match { - case Some(id) => - taskBuilder.addProjectFieldIds(id) - case None => + .orElse { logWarning( s"Column '${attr.name}' not found in task or scan schema," + "skipping projection") - } + None + } } + + // Deduplicate project field IDs + val projectFieldIdsIdx = projectFieldIdsToPoolIndex.getOrElseUpdate( + projectFieldIds, { + val idx = projectFieldIdsToPoolIndex.size + val listBuilder = OperatorOuterClass.ProjectFieldIdList.newBuilder() + projectFieldIds.foreach(id => listBuilder.addFieldIds(id)) + icebergScanBuilder.addProjectFieldIdsPool(listBuilder.build()) + idx + }) + taskBuilder.setProjectFieldIdsIdx(projectFieldIdsIdx) } catch { case e: Exception => val msg = @@ -893,29 +946,48 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit throw new RuntimeException(msg, e) } - taskBuilder.setDataFileFormat(metadata.fileFormat) + // Deduplicate delete files + val deleteFilesList = + extractDeleteFilesList(task, contentFileClass, fileScanTaskClass) + if (deleteFilesList.nonEmpty) { + val deleteFilesIdx = deleteFilesToPoolIndex.getOrElseUpdate( + deleteFilesList, { + val idx = deleteFilesToPoolIndex.size + val listBuilder = OperatorOuterClass.DeleteFileList.newBuilder() + deleteFilesList.foreach(df => listBuilder.addDeleteFiles(df)) + icebergScanBuilder.addDeleteFilesPool(listBuilder.build()) + idx + }) + taskBuilder.setDeleteFilesIdx(deleteFilesIdx) + } - // Serialize delete files (position deletes and equality deletes) - serializeDeleteFiles(task, contentFileClass, fileScanTaskClass, taskBuilder) + // Extract and deduplicate residual expression + val residualExprOpt = + try { + val residualMethod = contentScanTaskClass.getMethod("residual") + val residualExpr = residualMethod.invoke(task) - try { - val residualMethod = contentScanTaskClass.getMethod("residual") - val residualExpr = residualMethod.invoke(task) + val catalystExpr = convertIcebergExpression(residualExpr, scan.output) - val catalystExpr = convertIcebergExpression(residualExpr, scan.output) - - catalystExpr - .flatMap { expr => + catalystExpr.flatMap { expr => exprToProto(expr, scan.output, binding = false) } - .foreach { protoExpr => - taskBuilder.setResidual(protoExpr) - } - } catch { - case e: Exception => - logWarning( - "Failed to extract residual expression from FileScanTask: " + - s"${e.getMessage}") + } catch { + case e: Exception => + logWarning( + "Failed to extract residual expression from FileScanTask: " + + s"${e.getMessage}") + None + } + + residualExprOpt.foreach { residualExpr => + val residualIdx = residualToPoolIndex.getOrElseUpdate( + Some(residualExpr), { + val idx = residualToPoolIndex.size + icebergScanBuilder.addResidualPool(residualExpr) + idx + }) + taskBuilder.setResidualIdx(residualIdx) } // Serialize partition spec and data (field definitions, transforms, values) @@ -923,10 +995,22 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit task, contentScanTaskClass, fileScanTaskClass, - taskBuilder) - - // Set name mapping if available (shared by all tasks, pre-extracted) - metadata.nameMapping.foreach(taskBuilder.setNameMappingJson) + taskBuilder, + icebergScanBuilder, + partitionTypeToPoolIndex, + partitionSpecToPoolIndex, + partitionDataToPoolIndex) + + // Deduplicate name mapping + metadata.nameMapping.foreach { nm => + val nmIdx = nameMappingToPoolIndex.getOrElseUpdate( + nm, { + val idx = nameMappingToPoolIndex.size + icebergScanBuilder.addNameMappingPool(nm) + idx + }) + taskBuilder.setNameMappingIdx(nmIdx) + } partitionBuilder.addFileScanTasks(taskBuilder.build()) } @@ -948,6 +1032,32 @@ object CometIcebergNativeScan extends CometOperatorSerde[CometBatchScanExec] wit return None } + // Log deduplication summary + val allPoolSizes = Seq( + schemaToPoolIndex.size, + partitionTypeToPoolIndex.size, + partitionSpecToPoolIndex.size, + nameMappingToPoolIndex.size, + projectFieldIdsToPoolIndex.size, + partitionDataToPoolIndex.size, + deleteFilesToPoolIndex.size, + residualToPoolIndex.size) + + val avgDedup = if (totalTasks == 0) { + "0.0" + } else { + // Filter out empty pools - they shouldn't count as 100% dedup + val nonEmptyPools = allPoolSizes.filter(_ > 0) + if (nonEmptyPools.isEmpty) { + "0.0" + } else { + val avgUnique = nonEmptyPools.sum.toDouble / nonEmptyPools.length + f"${(1.0 - avgUnique / totalTasks) * 100}%.1f" + } + } + + logInfo(s"IcebergScan: $totalTasks tasks, ${allPoolSizes.size} pools ($avgDedup% avg dedup)") + builder.clearChildren() Some(builder.setIcebergScan(icebergScanBuilder).build()) }