Skip to content

Commit c514066

Browse files
authored
Add configs for converting date and timestamp stats for cloned delta table from iceberg (delta-io#4362)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description Add configs for converting date and timestamp stats ## How was this patch tested? UTs
1 parent ae4a83f commit c514066

File tree

3 files changed

+32
-7
lines changed

3 files changed

+32
-7
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergStatsUtils.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ object IcebergStatsUtils extends DeltaLogging {
4848
// The stats for these types will be converted to Delta stats
4949
// except for following types:
5050
// DECIMAL (decided by DeltaSQLConf.DELTA_CONVERT_ICEBERG_DECIMAL_STATS)
51+
// DATE (decided by DeltaSQLConf.DELTA_CONVERT_ICEBERG_DATE_STATS)
52+
// TIMESTAMP (decided by DeltaSQLConf.DELTA_CONVERT_ICEBERG_TIMESTAMP_STATS)
5153
// which are decided by spark configs dynamically
5254
private val STATS_ALLOW_TYPES = Set[TypeID](
5355
TypeID.BOOLEAN,
@@ -66,13 +68,17 @@ object IcebergStatsUtils extends DeltaLogging {
6668
TypeID.DECIMAL
6769
)
6870

71+
private val CONFIGS_TO_STATS_ALLOW_TYPES = Map(
72+
DeltaSQLConf.DELTA_CONVERT_ICEBERG_DATE_STATS -> TypeID.DATE,
73+
DeltaSQLConf.DELTA_CONVERT_ICEBERG_TIMESTAMP_STATS -> TypeID.TIMESTAMP,
74+
DeltaSQLConf.DELTA_CONVERT_ICEBERG_DECIMAL_STATS -> TypeID.DECIMAL
75+
)
76+
6977
def typesAllowStatsConversion(spark: SparkSession): Set[TypeID] = {
70-
val statsDisallowTypes =
71-
if (spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_DECIMAL_STATS)) {
72-
Set.empty[TypeID]
73-
} else {
74-
Set(TypeID.DECIMAL)
75-
}
78+
val statsDisallowTypes = CONFIGS_TO_STATS_ALLOW_TYPES.filter {
79+
case (conf, _) => !spark.sessionState.conf.getConf(conf)
80+
}.values.toSet
81+
7682
typesAllowStatsConversion(statsDisallowTypes)
7783
}
7884

iceberg/src/test/scala/org/apache/spark/sql/delta/commands/convert/IcebergStatsUtilsSuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,20 @@ import java.util.{List => JList, Map => JMap}
2323

2424
import scala.collection.JavaConverters._
2525

26+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
2627
import org.apache.spark.sql.delta.util.JsonUtils
2728
import org.apache.iceberg.{DataFile, FileContent, FileFormat, PartitionData, PartitionSpec, Schema, StructLike}
2829
import org.apache.iceberg.transforms._
2930
import org.apache.iceberg.types.Conversions
31+
import org.apache.iceberg.types.Type
32+
import org.apache.iceberg.types.Type.TypeID
3033
import org.apache.iceberg.types.Types._
3134

3235
import org.apache.spark.SparkFunSuite
36+
import org.apache.spark.internal.config.ConfigEntry
37+
import org.apache.spark.sql.test.SharedSparkSession
3338

34-
class IcebergStatsUtilsSuite extends SparkFunSuite {
39+
class IcebergStatsUtilsSuite extends SparkFunSuite with SharedSparkSession {
3540

3641
private val StatsAllowTypes =
3742
IcebergStatsUtils.typesAllowStatsConversion(statsDisallowTypes = Set.empty)

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2353,6 +2353,20 @@ trait DeltaSQLConfBase {
23532353
.booleanConf
23542354
.createWithDefault(true)
23552355

2356+
val DELTA_CONVERT_ICEBERG_DATE_STATS = buildConf("collectStats.convertIceberg.date")
2357+
.internal()
2358+
.doc("When enabled, attempts to convert Iceberg stats for DATE to Delta stats" +
2359+
"when cloning from an Iceberg source.")
2360+
.booleanConf
2361+
.createWithDefault(true)
2362+
2363+
val DELTA_CONVERT_ICEBERG_TIMESTAMP_STATS = buildConf("collectStats.convertIceberg.timestamp")
2364+
.internal()
2365+
.doc("When enabled, attempts to convert Iceberg stats for TIMESTAMP to Delta stats" +
2366+
"when cloning from an Iceberg source.")
2367+
.booleanConf
2368+
.createWithDefault(true)
2369+
23562370
/////////////////////
23572371
// Optimized Write
23582372
/////////////////////

0 commit comments

Comments
 (0)