Skip to content

Commit da51528

Browse files
authored
fix: Fall back to native_comet for encrypted Parquet scans (apache#2250)
1 parent 2009655 commit da51528

File tree

2 files changed

+16
-7
lines changed

2 files changed

+16
-7
lines changed

spark/src/main/scala/org/apache/comet/rules/CometScanRule.scala

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,11 +131,22 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
131131
return withInfos(scanExec, fallbackReasons.toSet)
132132
}
133133

134+
val encryptionEnabled: Boolean =
135+
conf.getConfString("parquet.crypto.factory.class", "").nonEmpty &&
136+
conf.getConfString("parquet.encryption.kms.client.class", "").nonEmpty
137+
134138
var scanImpl = COMET_NATIVE_SCAN_IMPL.get()
135139

136140
// if scan is auto then pick the best available scan
137141
if (scanImpl == SCAN_AUTO) {
138-
scanImpl = selectScan(scanExec, r.partitionSchema)
142+
if (encryptionEnabled) {
143+
logInfo(
144+
s"Auto scan mode falling back to $SCAN_NATIVE_COMET because " +
145+
s"$SCAN_NATIVE_ICEBERG_COMPAT does not support reading encrypted Parquet files")
146+
scanImpl = SCAN_NATIVE_COMET
147+
} else {
148+
scanImpl = selectScan(scanExec, r.partitionSchema)
149+
}
139150
}
140151

141152
if (scanImpl == SCAN_NATIVE_DATAFUSION && !COMET_EXEC_ENABLED.get()) {
@@ -182,10 +193,6 @@ case class CometScanRule(session: SparkSession) extends Rule[SparkPlan] with Com
182193
return withInfos(scanExec, fallbackReasons.toSet)
183194
}
184195

185-
val encryptionEnabled: Boolean =
186-
conf.getConfString("parquet.crypto.factory.class", "").nonEmpty &&
187-
conf.getConfString("parquet.encryption.kms.client.class", "").nonEmpty
188-
189196
if (scanImpl != CometConf.SCAN_NATIVE_COMET && encryptionEnabled) {
190197
fallbackReasons +=
191198
"Full native scan disabled because encryption is not supported"

spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ class ParquetEncryptionITCase extends CometTestBase with SQLTestUtils {
4949
private val key2 = encoder.encodeToString("1234567890123451".getBytes(StandardCharsets.UTF_8))
5050

5151
test("SPARK-34990: Write and read an encrypted parquet") {
52-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET)
52+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
53+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
5354

5455
import testImplicits._
5556

@@ -92,7 +93,8 @@ class ParquetEncryptionITCase extends CometTestBase with SQLTestUtils {
9293
}
9394

9495
test("SPARK-37117: Can't read files in Parquet encryption external key material mode") {
95-
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() == CometConf.SCAN_NATIVE_COMET)
96+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_DATAFUSION)
97+
assume(CometConf.COMET_NATIVE_SCAN_IMPL.get() != CometConf.SCAN_NATIVE_ICEBERG_COMPAT)
9698

9799
import testImplicits._
98100

0 commit comments

Comments
 (0)