Skip to content

Commit 95955be

Browse files
committed
chore: extract comparison tool from fuzzer
1 parent 6d28c0c commit 95955be

File tree

4 files changed

+24
-42
lines changed

4 files changed

+24
-42
lines changed

dev/benchmarks/tpcbench.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -111,12 +111,12 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
111111
# coming across for running DDL stmt
112112
if len(df.columns) > 0:
113113
output_path = f"{write_path}/q{query}"
114-
# sort by all columns to have predictable output dataset for comparison
115-
df_sorted = df.orderBy(*df.columns)
116114
# rename same column names for output
117-
# output doesn't allow non unique column names
118115
# a, a, b, b => a, a_1, b, b_1
119-
dedup_columns(df_sorted).coalesce(1).write.mode("overwrite").parquet(output_path)
116+
# output doesn't allow non unique column names
117+
deduped = dedup_columns(df)
118+
# sort by all columns to have predictable output dataset for comparison
119+
deduped.orderBy(*deduped.columns).coalesce(1).write.mode("overwrite").parquet(output_path)
120120
print(f"Query {query} results written to {output_path}")
121121
else:
122122
print(f"Skipping write: DataFrame has no schema for {output_path}")

fuzz-testing/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ The example below is for TPC-H queries results generated by pure Spark and Comet
113113
```shell
114114
$SPARK_HOME/bin/spark-submit \
115115
--master $SPARK_MASTER \
116-
--class org.apache.comet.fuzz.ComparisonToolMain
116+
--class org.apache.comet.fuzz.ComparisonTool
117117
target/comet-fuzz-spark3.5_2.12-0.12.0-SNAPSHOT-jar-with-dependencies.jar \
118118
compareParquet --input-spark-folder=/tmp/tpch/spark --input-comet-folder=/tmp/tpch/comet
119119
```

fuzz-testing/src/main/scala/org/apache/comet/fuzz/ComparisonTool.scala

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.io.File
2323

2424
import org.rogach.scallop.{ScallopConf, ScallopOption, Subcommand}
2525

26-
import org.apache.spark.sql.SparkSession
26+
import org.apache.spark.sql.{functions, SparkSession}
2727

2828
class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments) {
2929
object compareParquet extends Subcommand("compareParquet") {
@@ -36,7 +36,7 @@ class ComparisonToolConf(arguments: Seq[String]) extends ScallopConf(arguments)
3636
verify()
3737
}
3838

39-
object ComparisonToolMain {
39+
object ComparisonTool {
4040

4141
lazy val spark: SparkSession = SparkSession
4242
.builder()
@@ -108,22 +108,14 @@ object ComparisonToolMain {
108108
// Read Spark parquet files
109109
spark.conf.set("spark.comet.enabled", "false")
110110
val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
111-
val sparkRows = sparkDf.collect()
112-
val sparkPlan = sparkDf.queryExecution.executedPlan.toString
111+
val sparkRows = sparkDf.orderBy(sparkDf.columns.map(functions.col): _*).collect()
113112

114113
// Read Comet parquet files
115114
val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
116-
val cometRows = cometDf.collect()
117-
val cometPlan = cometDf.queryExecution.executedPlan.toString
115+
val cometRows = cometDf.orderBy(cometDf.columns.map(functions.col): _*).collect()
118116

119117
// Compare the results
120-
QueryComparison.assertSameRows(
121-
sparkRows,
122-
cometRows,
123-
sqlText = s"Reading parquet from subfolder: $subfolderName",
124-
sparkPlan,
125-
cometPlan,
126-
output)
118+
QueryComparison.assertSameRows(sparkRows, cometRows, output)
127119

128120
output.write(s"Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
129121

fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ import scala.io.Source
2626

2727
import org.apache.spark.sql.{Row, SparkSession}
2828

29+
import org.apache.comet.fuzz.QueryComparison.showPlans
30+
2931
object QueryRunner {
3032

3133
def createOutputMdFile(): BufferedWriter = {
@@ -80,17 +82,21 @@ object QueryRunner {
8082
val cometRows = df.collect()
8183
val cometPlan = df.queryExecution.executedPlan.toString
8284

83-
val success = QueryComparison.assertSameRows(
84-
sparkRows,
85-
cometRows,
86-
sqlText = sql,
87-
sparkPlan,
88-
cometPlan,
89-
output = w)
85+
var success = QueryComparison.assertSameRows(sparkRows, cometRows, output = w)
86+
87+
// check that the plan contains Comet operators
88+
if (!cometPlan.contains("Comet")) {
89+
success = false
90+
w.write("[ERROR] Comet did not accelerate any part of the plan\n")
91+
}
92+
93+
QueryComparison.showSQL(w, sql)
9094

9195
if (success) {
9296
cometSuccessCount += 1
9397
} else {
98+
// show plans for failed queries
99+
showPlans(w, sparkPlan, cometPlan)
94100
cometFailureCount += 1
95101
}
96102

@@ -142,9 +148,6 @@ object QueryComparison {
142148
def assertSameRows(
143149
sparkRows: Array[Row],
144150
cometRows: Array[Row],
145-
sqlText: String,
146-
sparkPlan: String,
147-
cometPlan: String,
148151
output: BufferedWriter): Boolean = {
149152
var success = true
150153
if (sparkRows.length == cometRows.length) {
@@ -160,8 +163,6 @@ object QueryComparison {
160163
for (j <- 0 until l.length) {
161164
if (!same(l(j), r(j))) {
162165
success = false
163-
showSQL(output, sqlText)
164-
showPlans(output, sparkPlan, cometPlan)
165166
output.write(s"First difference at row $i:\n")
166167
output.write("Spark: `" + formatRow(l) + "`\n")
167168
output.write("Comet: `" + formatRow(r) + "`\n")
@@ -172,23 +173,12 @@ object QueryComparison {
172173
}
173174
} else {
174175
success = false
175-
showSQL(output, sqlText)
176-
showPlans(output, sparkPlan, cometPlan)
177176
output.write(
178177
s"[ERROR] Spark produced ${sparkRows.length} rows and " +
179178
s"Comet produced ${cometRows.length} rows.\n")
180179
}
181180

182-
// check that the plan contains Comet operators
183-
if (!cometPlan.contains("Comet")) {
184-
success = false
185-
showSQL(output, sqlText)
186-
showPlans(output, sparkPlan, cometPlan)
187-
output.write("[ERROR] Comet did not accelerate any part of the plan\n")
188-
}
189-
190181
success
191-
192182
}
193183

194184
private def same(l: Any, r: Any): Boolean = {
@@ -252,7 +242,7 @@ object QueryComparison {
252242
w.write("```\n")
253243
}
254244

255-
private def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = {
245+
def showPlans(w: BufferedWriter, sparkPlan: String, cometPlan: String): Unit = {
256246
w.write("### Spark Plan\n")
257247
w.write(s"```\n$sparkPlan\n```\n")
258248
w.write("### Comet Plan\n")

0 commit comments

Comments
 (0)