Skip to content

Commit ef2d4a7

Browse files
authored
Avoid slow stats conversion fallback for iceberg clone (delta-io#4366)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> ## Description This PR proposes to * Avoid slow stats conversion fallback for iceberg clone by default * Allow partial stats conversion for iceberg clone by default More specifically, * When stats conversion from iceberg off, fallback to slow stats conversion enabled * When stats conversion from iceberg on, fallback to slow stats conversion will not happen if partial stats conversion enabled. It will only happen if partial stats conversion disabled and iceberg source has partial stats - either minValues or maxValues is missing ## How was this patch tested? UTs ## Does this PR introduce _any_ user-facing changes? **Current**: delta tables cloned from iceberg source with only partial stats will collect stats from parquet footers. Here, partial stats means any of (maxValues, minValues, nullCounts) is missing **Future**: delta tables cloned from iceberg source with only partial stats will convert all available stats from iceberg source and not fallback to collecting stats from parquet footers. Here, partial stats means any of (maxValues, minValues, nullCounts) is missing
1 parent ce8bee4 commit ef2d4a7

File tree

4 files changed

+138
-58
lines changed

4 files changed

+138
-58
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergFileManifest.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,10 @@ class IcebergFileManifest(
5959
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CONVERT_ICEBERG_PARTITION_EVOLUTION_ENABLED)
6060

6161
private val statsAllowTypes: Set[TypeID] = IcebergStatsUtils.typesAllowStatsConversion(spark)
62+
private val allowPartialStatsConverted: Boolean =
63+
spark.sessionState.conf.getConf(
64+
DeltaSQLConf.DELTA_CLONE_ICEBERG_ALLOW_PARTIAL_STATS
65+
)
6266

6367
val basePath = table.location()
6468

@@ -128,6 +132,7 @@ class IcebergFileManifest(
128132
}
129133

130134
val shouldConvertStats = convertStats
135+
val partialStatsConvertedEnabled = allowPartialStatsConverted
131136
val statsAllowTypesSet = statsAllowTypes
132137

133138
val shouldCheckPartitionEvolution = !partitionEvolutionEnabled
@@ -163,7 +168,14 @@ class IcebergFileManifest(
163168
Some(convertPartition.toDelta(dataFile.partition()))
164169
} else None,
165170
stats = if (shouldConvertStats) {
166-
IcebergStatsUtils.icebergStatsToDelta(localTable.schema, dataFile, statsAllowTypesSet)
171+
IcebergStatsUtils.icebergStatsToDelta(
172+
localTable.schema,
173+
dataFile,
174+
statsAllowTypesSet,
175+
shouldSkipForFile = (df: DataFile) => {
176+
!partialStatsConvertedEnabled && IcebergStatsUtils.hasPartialStats(df)
177+
}
178+
)
167179
} else None
168180
)
169181
}

iceberg/src/main/scala/org/apache/spark/sql/delta/IcebergStatsUtils.scala

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package org.apache.spark.sql.delta.commands.convert
1818

1919
import java.lang.{Integer => JInt, Long => JLong}
2020
import java.nio.ByteBuffer
21+
import java.util.{Map => JMap}
2122

2223
import scala.collection.JavaConverters._
2324
import scala.util.control.NonFatal
@@ -93,38 +94,50 @@ object IcebergStatsUtils extends DeltaLogging {
9394
*
9495
* @param icebergSchema Iceberg table schema
9596
* @param dataFile Iceberg DataFile that contains stats info
97+
* @param statsAllowTypes Iceberg types that are allowed to convert stats
98+
* @param shouldSkipForFile Function => true if a data file should be skipped
9699
* @return None if stats is missing on the DataFile or error occurs during conversion
97100
*/
98101
def icebergStatsToDelta(
99102
icebergSchema: Schema,
100103
dataFile: DataFile,
101-
statsAllowTypes: Set[TypeID]): Option[String] = {
104+
statsAllowTypes: Set[TypeID],
105+
shouldSkipForFile: DataFile => Boolean): Option[String] = {
106+
if (shouldSkipForFile(dataFile)) {
107+
return None
108+
}
102109
try {
103-
// Any empty or null fields means Iceberg has disabled column stats
104-
if (dataFile.upperBounds == null ||
105-
dataFile.upperBounds.isEmpty ||
106-
dataFile.lowerBounds == null ||
107-
dataFile.lowerBounds.isEmpty ||
108-
dataFile.nullValueCounts == null ||
109-
dataFile.nullValueCounts.isEmpty
110-
) {
111-
return None
112-
}
113110
Some(icebergStatsToDelta(
114111
icebergSchema,
115112
dataFile.recordCount,
116-
dataFile.upperBounds.asScala.toMap,
117-
dataFile.lowerBounds.asScala.toMap,
118-
dataFile.nullValueCounts.asScala.toMap,
113+
Option(dataFile.upperBounds).map(_.asScala.toMap).filter(_.nonEmpty),
114+
Option(dataFile.lowerBounds).map(_.asScala.toMap).filter(_.nonEmpty),
115+
Option(dataFile.nullValueCounts).map(_.asScala.toMap).filter(_.nonEmpty),
119116
statsAllowTypes
120117
))
121118
} catch {
122119
case NonFatal(e) =>
123-
logError("Exception while converting Iceberg stats to Delta format", e)
120+
logInfo("[Iceberg-Stats-Conversion] " +
121+
"Exception while converting Iceberg stats to Delta format", e)
124122
None
125123
}
126124
}
127125

126+
def hasPartialStats(dataFile: DataFile): Boolean = {
127+
def nonEmptyMap[K, V](m: JMap[K, V]): Boolean = {
128+
m != null && !m.isEmpty
129+
}
130+
// nullValueCounts is less common, so we ignore it
131+
val hasPartialStats =
132+
!nonEmptyMap(dataFile.upperBounds()) ||
133+
!nonEmptyMap(dataFile.lowerBounds())
134+
if (hasPartialStats) {
135+
logInfo(s"[Iceberg-Stats-Conversion] $dataFile only has partial stats:" +
136+
s"upperBounds=${dataFile.upperBounds}, lowerBounds = ${dataFile.lowerBounds()}")
137+
}
138+
hasPartialStats
139+
}
140+
128141
/**
129142
* Convert Iceberg DataFile stats into Delta stats.
130143
*
@@ -176,9 +189,9 @@ object IcebergStatsUtils extends DeltaLogging {
176189
private[convert] def icebergStatsToDelta(
177190
icebergSchema: Schema,
178191
numRecords: Long,
179-
maxMap: Map[JInt, ByteBuffer],
180-
minMap: Map[JInt, ByteBuffer],
181-
nullCountMap: Map[JInt, JLong],
192+
maxMap: Option[Map[JInt, ByteBuffer]],
193+
minMap: Option[Map[JInt, ByteBuffer]],
194+
nullCountMap: Option[Map[JInt, JLong]],
182195
statsAllowTypes: Set[TypeID]): String = {
183196

184197
def deserialize(ftype: IcebergType, value: Any): Any = {
@@ -222,11 +235,14 @@ object IcebergStatsUtils extends DeltaLogging {
222235

223236
JsonUtils.toJson(
224237
Map(
225-
NUM_RECORDS -> numRecords,
226-
MAX -> collectStats(icebergSchema.columns, maxMap, deserialize, statsAllowTypes),
227-
MIN -> collectStats(icebergSchema.columns, minMap, deserialize, statsAllowTypes),
238+
NUM_RECORDS -> numRecords
239+
) ++ maxMap.map(
240+
MAX -> collectStats(icebergSchema.columns, _, deserialize, statsAllowTypes)
241+
) ++ minMap.map(
242+
MIN -> collectStats(icebergSchema.columns, _, deserialize, statsAllowTypes)
243+
) ++ nullCountMap.map(
228244
NULL_COUNT -> collectStats(
229-
icebergSchema.columns, nullCountMap, (_: IcebergType, v: Any) => v, statsAllowTypes
245+
icebergSchema.columns, _, (_: IcebergType, v: Any) => v, statsAllowTypes
230246
)
231247
)
232248
)

iceberg/src/test/scala/org/apache/spark/sql/delta/commands/convert/IcebergStatsUtilsSuite.scala

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.sql.delta.commands.convert
1919
import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Integer => JInt, Long => JLong}
2020
import java.math.BigDecimal
2121
import java.nio.ByteBuffer
22-
import java.util.{List => JList, Map => JMap}
22+
import java.util.{HashMap => JHashMap, List => JList, Map => JMap}
2323

2424
import scala.collection.JavaConverters._
2525

@@ -101,9 +101,9 @@ class IcebergStatsUtilsSuite extends SparkFunSuite with SharedSparkSession {
101101
val deltaStats = IcebergStatsUtils.icebergStatsToDelta(
102102
icebergSchema,
103103
1251,
104-
minMap,
105-
maxMap,
106-
nullCountMap,
104+
Some(minMap),
105+
Some(maxMap),
106+
Some(nullCountMap),
107107
statsAllowTypes = StatsAllowTypes
108108
)
109109

@@ -131,27 +131,27 @@ class IcebergStatsUtilsSuite extends SparkFunSuite with SharedSparkSession {
131131
val deltaStats = IcebergStatsUtils.icebergStatsToDelta(
132132
icebergSchema,
133133
1251,
134-
minMap = Map(
134+
minMap = Some(Map(
135135
Integer.valueOf(1) ->
136136
Conversions.toByteBuffer(TimestampType.withZone, JLong.valueOf(1734391979000000L)),
137137
Integer.valueOf(2) ->
138138
Conversions.toByteBuffer(TimestampType.withoutZone, JLong.valueOf(1734391979000000L)),
139139
Integer.valueOf(3) ->
140140
Conversions.toByteBuffer(DecimalType.of(10, 5), new BigDecimal("3.44141"))
141-
),
142-
maxMap = Map(
141+
)),
142+
maxMap = Some(Map(
143143
Integer.valueOf(1) ->
144144
Conversions.toByteBuffer(TimestampType.withZone, JLong.valueOf(1734394979000000L)),
145145
Integer.valueOf(2) ->
146146
Conversions.toByteBuffer(TimestampType.withoutZone, JLong.valueOf(1734394979000000L)),
147147
Integer.valueOf(3) ->
148148
Conversions.toByteBuffer(DecimalType.of(10, 5), new BigDecimal("9.99999"))
149-
),
150-
nullCountMap = Map(
149+
)),
150+
nullCountMap = Some(Map(
151151
Integer.valueOf(1) -> JLong.valueOf(20),
152152
Integer.valueOf(2) -> JLong.valueOf(10),
153153
Integer.valueOf(3) -> JLong.valueOf(31)
154-
),
154+
)),
155155
statsAllowTypes = StatsAllowTypes
156156
)
157157
assertResult(
@@ -180,23 +180,23 @@ class IcebergStatsUtilsSuite extends SparkFunSuite with SharedSparkSession {
180180
val deltaStats = IcebergStatsUtils.icebergStatsToDelta(
181181
icebergSchema,
182182
1251,
183-
minMap = Map(
183+
minMap = Some(Map(
184184
Integer.valueOf(1) -> Conversions.toByteBuffer(IntegerType.get, JInt.valueOf(-5)),
185185
Integer.valueOf(2) -> Conversions.toByteBuffer(LongType.get, null),
186186
Integer.valueOf(3) -> null
187-
),
188-
maxMap = Map(
187+
)),
188+
maxMap = Some(Map(
189189
Integer.valueOf(1) -> Conversions.toByteBuffer(IntegerType.get, JInt.valueOf(5)),
190190
// stats for value 2 is missing
191191
Integer.valueOf(3) -> Conversions.toByteBuffer(StringType.get, "maxval"),
192192
Integer.valueOf(5) -> Conversions.toByteBuffer(StringType.get, "maxval")
193-
),
194-
nullCountMap = Map(
193+
)),
194+
nullCountMap = Some(Map(
195195
Integer.valueOf(1) -> JLong.valueOf(0),
196196
Integer.valueOf(2) -> null,
197197
Integer.valueOf(3) -> JLong.valueOf(2),
198198
Integer.valueOf(5) -> JLong.valueOf(3)
199-
),
199+
)),
200200
statsAllowTypes = StatsAllowTypes
201201
)
202202
assertResult(
@@ -209,33 +209,68 @@ class IcebergStatsUtilsSuite extends SparkFunSuite with SharedSparkSession {
209209
JsonUtils.fromJson[StatsObject](deltaStats))
210210
}
211211

212+
private def testStatsConversion(
213+
expectedStatsJson: String, dataFile: DataFile, icebergSchema: Schema): Unit = {
214+
val expectedStats = JsonUtils.fromJson[StatsObject](expectedStatsJson)
215+
val actualStats =
216+
IcebergStatsUtils.icebergStatsToDelta(
217+
icebergSchema, dataFile, StatsAllowTypes, shouldSkipForFile = _ => false
218+
)
219+
.map(JsonUtils.fromJson[StatsObject](_))
220+
.get
221+
assertResult(expectedStats)(actualStats)
222+
}
223+
212224
test("stats conversion while DataFile misses the stats fields") {
213225
val icebergSchema = new Schema(10, Seq[NestedField](
214226
NestedField.required(1, "col_int", IntegerType.get),
215227
NestedField.required(2, "col_long", LongType.get),
216228
NestedField.required(3, "col_st", StringType.get)
217229
).asJava)
218-
val expectedStats = JsonUtils.fromJson[StatsObject](
230+
val expectedStatsJson =
219231
"""{"numRecords":0,"maxValues":{"col_int":100992003},
220232
|"minValues":{"col_int":100992003},"nullCount":{"col_int":2}}"""
221-
.stripMargin)
222-
val actualStats =
223-
IcebergStatsUtils.icebergStatsToDelta(icebergSchema, DummyDataFile(), StatsAllowTypes)
224-
.map(JsonUtils.fromJson[StatsObject](_))
225-
.get
226-
assertResult(expectedStats)(actualStats)
227-
assertResult(None)(IcebergStatsUtils.icebergStatsToDelta(
228-
icebergSchema,
229-
DummyDataFile(upperBounds = null),
230-
statsAllowTypes = StatsAllowTypes))
231-
assertResult(None)(IcebergStatsUtils.icebergStatsToDelta(
232-
icebergSchema,
233-
DummyDataFile(lowerBounds = null),
234-
statsAllowTypes = StatsAllowTypes))
235-
assertResult(None)(IcebergStatsUtils.icebergStatsToDelta(
236-
icebergSchema,
237-
DummyDataFile(nullValueCounts = null),
238-
statsAllowTypes = StatsAllowTypes))
233+
.stripMargin
234+
testStatsConversion(expectedStatsJson, DummyDataFile(), icebergSchema)
235+
236+
val expectedStatsWithoutUpperBound =
237+
"""{"numRecords":0,"minValues":{"col_int":100992003},
238+
|"nullCount":{"col_int":2}}"""
239+
.stripMargin
240+
testStatsConversion(
241+
expectedStatsWithoutUpperBound, DummyDataFile(upperBounds = null), icebergSchema
242+
)
243+
testStatsConversion(
244+
expectedStatsWithoutUpperBound,
245+
DummyDataFile(upperBounds = new JHashMap[Integer, ByteBuffer]()),
246+
icebergSchema
247+
)
248+
249+
val expectedStatsWithoutLowerBound =
250+
"""{"numRecords":0,"maxValues":{"col_int":100992003},
251+
|"nullCount":{"col_int":2}}"""
252+
.stripMargin
253+
testStatsConversion(
254+
expectedStatsWithoutLowerBound, DummyDataFile(lowerBounds = null), icebergSchema
255+
)
256+
testStatsConversion(
257+
expectedStatsWithoutLowerBound,
258+
DummyDataFile(lowerBounds = new JHashMap[Integer, ByteBuffer]()),
259+
icebergSchema
260+
)
261+
262+
val expectedStatsWithoutNullCounts =
263+
"""{"numRecords":0,"maxValues":{"col_int":100992003},
264+
|"minValues":{"col_int":100992003}}"""
265+
.stripMargin
266+
testStatsConversion(
267+
expectedStatsWithoutNullCounts, DummyDataFile(nullValueCounts = null), icebergSchema
268+
)
269+
testStatsConversion(
270+
expectedStatsWithoutNullCounts,
271+
DummyDataFile(nullValueCounts = new JHashMap[Integer, JLong]()),
272+
icebergSchema
273+
)
239274
}
240275
}
241276

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2367,6 +2367,23 @@ trait DeltaSQLConfBase {
23672367
.booleanConf
23682368
.createWithDefault(true)
23692369

2370+
/**
2371+
* For iceberg clone,
2372+
* When stats conversion from iceberg off, fallback to slow stats conversion enabled
2373+
* When stats conversion from iceberg on,
2374+
* fallback to slow stats conversion will not happen if partial stats conversion enabled
2375+
* fallback only happens if partial stats conversion disabled and iceberg has partial stats
2376+
* - either minValues or maxValues is missing
2377+
*/
2378+
val DELTA_CLONE_ICEBERG_ALLOW_PARTIAL_STATS =
2379+
buildConf("clone.iceberg.allowPartialStats")
2380+
.internal()
2381+
.doc("If true, allow converting partial stats from iceberg stats " +
2382+
"to delta stats during clone."
2383+
)
2384+
.booleanConf
2385+
.createWithDefault(true)
2386+
23702387
/////////////////////
23712388
// Optimized Write
23722389
/////////////////////

0 commit comments

Comments
 (0)