Skip to content

Commit 769d76e

Browse files
authored
fix: NativeScan count assert firing for no reason (#2850)
1 parent 361e0bd commit 769d76e

File tree

2 files changed

+35
-22
lines changed

2 files changed

+35
-22
lines changed

spark/src/main/scala/org/apache/spark/sql/comet/operators.scala

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -217,22 +217,14 @@ abstract class CometNativeExec extends CometExec {
217217
// TODO: support native metrics for all operators.
218218
val nativeMetrics = CometMetricNode.fromCometPlan(this)
219219

220+
// Go over all the native scans, in order to see if they need encryption options.
220221
// For each relation in a CometNativeScan generate a hadoopConf,
221222
// for each file path in a relation associate with hadoopConf
222-
val cometNativeScans: Seq[CometNativeScanExec] = this
223-
.collectLeaves()
224-
.filter(_.isInstanceOf[CometNativeScanExec])
225-
.map(_.asInstanceOf[CometNativeScanExec])
226-
assert(
227-
cometNativeScans.size <= 1,
228-
"We expect one native scan in a Comet plan since we will broadcast one hadoopConf.")
229-
// If this assumption changes in the future, you can look at the commit history of #2447
230-
// to see how there used to be a map of relations to broadcasted confs in case multiple
231-
// relations in a single plan. The example that came up was UNION. See discussion at:
232-
// https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
233-
val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
234-
cometNativeScans.headOption.fold(
235-
(None: Option[Broadcast[SerializableConfiguration]], Seq.empty[String])) { scan =>
223+
// This is done per native plan, so only count scans until a comet input is reached.
224+
val encryptionOptions =
225+
mutable.ArrayBuffer.empty[(Broadcast[SerializableConfiguration], Seq[String])]
226+
foreachUntilCometInput(this) {
227+
case scan: CometNativeScanExec =>
236228
// This creates a hadoopConf that brings in any SQLConf "spark.hadoop.*" configs and
237229
// per-relation configs since different tables might have different decryption
238230
// properties.
@@ -244,10 +236,25 @@ abstract class CometNativeExec extends CometExec {
244236
val broadcastedConf =
245237
scan.relation.sparkSession.sparkContext
246238
.broadcast(new SerializableConfiguration(hadoopConf))
247-
(Some(broadcastedConf), scan.relation.inputFiles.toSeq)
248-
} else {
249-
(None, Seq.empty)
239+
240+
val optsTuple: (Broadcast[SerializableConfiguration], Seq[String]) =
241+
(broadcastedConf, scan.relation.inputFiles.toSeq)
242+
encryptionOptions += optsTuple
250243
}
244+
case _ => // no-op
245+
}
246+
assert(
247+
encryptionOptions.size <= 1,
248+
"We expect one native scan that requires encryption reading in a Comet plan," +
249+
" since we will broadcast one hadoopConf.")
250+
// If this assumption changes in the future, you can look at the commit history of #2447
251+
// to see how there used to be a map of relations to broadcasted confs in case multiple
252+
// relations in a single plan. The example that came up was UNION. See discussion at:
253+
// https://github.com/apache/datafusion-comet/pull/2447#discussion_r2406118264
254+
val (broadcastedHadoopConfForEncryption, encryptedFilePaths) =
255+
encryptionOptions.headOption match {
256+
case Some((conf, paths)) => (Some(conf), paths)
257+
case None => (None, Seq.empty)
251258
}
252259

253260
def createCometExecIter(

spark/src/test/scala/org/apache/spark/sql/comet/ParquetEncryptionITCase.scala

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import org.apache.parquet.crypto.DecryptionPropertiesFactory
3232
import org.apache.parquet.crypto.keytools.{KeyToolkit, PropertiesDrivenCryptoFactory}
3333
import org.apache.parquet.crypto.keytools.mocks.InMemoryKMS
3434
import org.apache.spark.{DebugFilesystem, SparkConf}
35-
import org.apache.spark.sql.{CometTestBase, SQLContext}
35+
import org.apache.spark.sql.{functions, CometTestBase, SQLContext}
3636
import org.apache.spark.sql.internal.SQLConf
3737
import org.apache.spark.sql.test.SQLTestUtils
3838

@@ -359,7 +359,8 @@ class ParquetEncryptionITCase extends CometTestBase with SQLTestUtils {
359359
KeyToolkit.KMS_CLIENT_CLASS_PROPERTY_NAME ->
360360
"org.apache.parquet.crypto.keytools.mocks.InMemoryKMS",
361361
InMemoryKMS.KEY_LIST_PROPERTY_NAME ->
362-
s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}") {
362+
s"footerKey: ${footerKey}, key1: ${key1}, key2: ${key2}",
363+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true") {
363364

364365
// Write first file with key1
365366
val inputDF1 = spark
@@ -394,11 +395,16 @@ class ParquetEncryptionITCase extends CometTestBase with SQLTestUtils {
394395
val parquetDF2 = spark.read.parquet(parquetDir2)
395396

396397
val unionDF = parquetDF1.union(parquetDF2)
398+
// Since the union has its own executeColumnar, problems would not surface if it is the last operator
399+
// If we add another comet aggregate after the union, we see the need for the
400+
// foreachUntilCometInput() in operator.scala
401+
// as we would error on multiple native scan execs despite no longer being in the same plan at all
402+
val aggDf = unionDF.agg(functions.sum("id"))
397403

398404
if (CometConf.COMET_ENABLED.get(conf)) {
399-
checkSparkAnswerAndOperator(unionDF)
405+
checkSparkAnswerAndOperator(aggDf)
400406
} else {
401-
checkSparkAnswer(unionDF)
407+
checkSparkAnswer(aggDf)
402408
}
403409
}
404410
}
@@ -447,7 +453,7 @@ class ParquetEncryptionITCase extends CometTestBase with SQLTestUtils {
447453
}
448454

449455
protected override def sparkConf: SparkConf = {
450-
val conf = new SparkConf()
456+
val conf = super.sparkConf
451457
conf.set("spark.hadoop.fs.file.impl", classOf[DebugFilesystem].getName)
452458
conf
453459
}

0 commit comments

Comments
 (0)