Skip to content

Commit 940dd9d

Browse files
committed
feat: Avoid duplicated write nodes for AQE execution
1 parent 56f4d01 commit 940dd9d

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

spark/src/test/scala/org/apache/comet/parquet/CometParquetWriterSuite.scala

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -99,19 +99,24 @@ class CometParquetWriterSuite extends CometTestBase {
9999

100100
capturedPlan.foreach { qe =>
101101
val executedPlan = stripAQEPlan(qe.executedPlan)
102-
val hasNativeWrite = executedPlan.exists {
103-
case _: CometNativeWriteExec => true
102+
103+
// Count CometNativeWriteExec instances in the plan
104+
var nativeWriteCount = 0
105+
executedPlan.foreach {
106+
case _: CometNativeWriteExec =>
107+
nativeWriteCount += 1
104108
case d: DataWritingCommandExec =>
105-
d.child.exists {
106-
case _: CometNativeWriteExec => true
107-
case _ => false
109+
d.child.foreach {
110+
case _: CometNativeWriteExec =>
111+
nativeWriteCount += 1
112+
case _ =>
108113
}
109-
case _ => false
114+
case _ =>
110115
}
111116

112117
assert(
113-
hasNativeWrite,
114-
s"Expected CometNativeWriteExec in the plan, but got:\n${executedPlan.treeString}")
118+
nativeWriteCount == 1,
119+
s"Expected exactly one CometNativeWriteExec in the plan, but found $nativeWriteCount:\n${executedPlan.treeString}")
115120
}
116121
} finally {
117122
spark.listenerManager.unregister(listener)
@@ -201,12 +206,13 @@ class CometParquetWriterSuite extends CometTestBase {
201206

202207
test("basic parquet write with repartition") {
203208
withTempPath { dir =>
204-
val outputPath = new File(dir, "output.parquet").getAbsolutePath
205-
206209
// Create test data and write it to a temp parquet file first
207210
withTempPath { inputDir =>
208211
val inputPath = createTestData(inputDir)
209212
Seq(true, false).foreach(adaptive => {
213+
// Create a new output path for each AQE value
214+
val outputPath = new File(dir, s"output_aqe_$adaptive.parquet").getAbsolutePath
215+
210216
withSQLConf(
211217
CometConf.COMET_NATIVE_PARQUET_WRITE_ENABLED.key -> "true",
212218
"spark.sql.adaptive.enabled" -> adaptive.toString,

0 commit comments

Comments
 (0)