Skip to content

Commit d579011

Browse files
committed
chore: extract comparison tool from fuzzer
1 parent 91f9ff1 commit d579011

File tree

3 files changed

+206
-99
lines changed

3 files changed

+206
-99
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.comet.fuzz
21+
22+
import java.io.File
23+
24+
import scala.util.Random
25+
26+
import org.rogach.scallop.{ScallopConf, Subcommand}
27+
import org.rogach.scallop.ScallopOption
28+
29+
import org.apache.spark.sql.SparkSession
30+
31+
import org.apache.comet.testing.{DataGenOptions, ParquetGenerator}
32+
33+
class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
34+
object compareParquet extends Subcommand("compareParquet") {
35+
val inputSparkFolder: ScallopOption[String] =
36+
opt[String](required = true, descr = "Folder with Spark produced results in Parquet format")
37+
val inputCometFolder: ScallopOption[String] =
38+
opt[String](required = true, descr = "Folder with Comet produced results in Parquet format")
39+
}
40+
addSubcommand(compareParquet)
41+
verify()
42+
}
43+
44+
object Main {
45+
46+
lazy val spark: SparkSession = SparkSession
47+
.builder()
48+
.getOrCreate()
49+
50+
def main(args: Array[String]): Unit = {
51+
val conf = new Conf(args.toIndexedSeq)
52+
conf.subcommand match {
53+
case Some(conf.compareParquet) =>
54+
compareParquetFolders(
55+
spark,
56+
conf.compareParquet.inputSparkFolder(),
57+
conf.compareParquet.inputCometFolder())
58+
59+
case _ =>
60+
// scalastyle:off println
61+
println("Invalid subcommand")
62+
// scalastyle:on println
63+
sys.exit(-1)
64+
}
65+
}
66+
67+
private def compareParquetFolders(
68+
spark: SparkSession,
69+
sparkFolderPath: String,
70+
cometFolderPath: String): Unit = {
71+
72+
val output = QueryRunner.createOutputMdFile()
73+
74+
try {
75+
val sparkFolder = new File(sparkFolderPath)
76+
val cometFolder = new File(cometFolderPath)
77+
78+
if (!sparkFolder.exists() || !sparkFolder.isDirectory) {
79+
throw new IllegalArgumentException(
80+
s"Spark folder does not exist or is not a directory: $sparkFolderPath")
81+
}
82+
83+
if (!cometFolder.exists() || !cometFolder.isDirectory) {
84+
throw new IllegalArgumentException(
85+
s"Comet folder does not exist or is not a directory: $cometFolderPath")
86+
}
87+
88+
// Get all subdirectories from the Spark folder
89+
val sparkSubfolders = sparkFolder
90+
.listFiles()
91+
.filter(_.isDirectory)
92+
.map(_.getName)
93+
.sorted
94+
95+
output.write(s"# Comparing Parquet Folders\n\n")
96+
output.write(s"Spark folder: $sparkFolderPath\n")
97+
output.write(s"Comet folder: $cometFolderPath\n")
98+
output.write(s"Found ${sparkSubfolders.length} subfolders to compare\n\n")
99+
100+
// Compare each subfolder
101+
sparkSubfolders.foreach { subfolderName =>
102+
val sparkSubfolderPath = new File(sparkFolder, subfolderName)
103+
val cometSubfolderPath = new File(cometFolder, subfolderName)
104+
105+
if (!cometSubfolderPath.exists() || !cometSubfolderPath.isDirectory) {
106+
output.write(s"## Subfolder: $subfolderName\n")
107+
output.write(
108+
s"[WARNING] Comet subfolder not found: ${cometSubfolderPath.getAbsolutePath}\n\n")
109+
} else {
110+
output.write(s"## Comparing subfolder: $subfolderName\n\n")
111+
112+
try {
113+
// Read Spark parquet files
114+
spark.conf.set("spark.comet.enabled", "false")
115+
val sparkDf = spark.read.parquet(sparkSubfolderPath.getAbsolutePath)
116+
val sparkRows = sparkDf.collect()
117+
val sparkPlan = sparkDf.queryExecution.executedPlan.toString
118+
119+
// Read Comet parquet files
120+
val cometDf = spark.read.parquet(cometSubfolderPath.getAbsolutePath)
121+
val cometRows = cometDf.collect()
122+
val cometPlan = cometDf.queryExecution.executedPlan.toString
123+
124+
// Compare the results
125+
QueryComparison.assertSameRows(
126+
sparkRows,
127+
cometRows,
128+
sqlText = s"Reading parquet from subfolder: $subfolderName",
129+
sparkPlan,
130+
cometPlan,
131+
output)
132+
133+
output.write(s"✓ Subfolder $subfolderName: ${sparkRows.length} rows matched\n\n")
134+
135+
} catch {
136+
case e: Exception =>
137+
output.write(
138+
s"[ERROR] Failed to compare subfolder $subfolderName: ${e.getMessage}\n")
139+
val sw = new java.io.StringWriter()
140+
val p = new java.io.PrintWriter(sw)
141+
e.printStackTrace(p)
142+
p.close()
143+
output.write(s"```\n${sw.toString}\n```\n\n")
144+
}
145+
}
146+
147+
output.flush()
148+
}
149+
150+
output.write(s"\n# Comparison Complete\n")
151+
output.write(s"Compared ${sparkSubfolders.length} subfolders\n")
152+
153+
} finally {
154+
output.close()
155+
}
156+
}
157+
}

fuzz-testing/src/main/scala/org/apache/comet/fuzz/Main.scala

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -61,17 +61,6 @@ class Conf(arguments: Seq[String]) extends ScallopConf(arguments) {
6161
opt[Int](required = false, descr = "Number of input files to use")
6262
}
6363
addSubcommand(runQueries)
64-
object runTPCQueries extends Subcommand("runTPC") {
65-
val dataFolder: ScallopOption[String] =
66-
opt[String](
67-
required = true,
68-
descr = "Folder for input data. Expected folder struct `$dataFolder/tableName/*.parquet`")
69-
val queriesFolder: ScallopOption[String] =
70-
opt[String](
71-
required = true,
72-
descr = "Folder for test queries. Expected folder struct `$queriesFolder/*.sql`")
73-
}
74-
addSubcommand(runTPCQueries)
7564
verify()
7665
}
7766

@@ -115,11 +104,6 @@ object Main {
115104
conf.generateQueries.numQueries())
116105
case Some(conf.runQueries) =>
117106
QueryRunner.runQueries(spark, conf.runQueries.numFiles(), conf.runQueries.filename())
118-
case Some(conf.runTPCQueries) =>
119-
QueryRunner.runTPCQueries(
120-
spark,
121-
conf.runTPCQueries.dataFolder(),
122-
conf.runTPCQueries.queriesFolder())
123107
case _ =>
124108
// scalastyle:off println
125109
println("Invalid subcommand")

fuzz-testing/src/main/scala/org/apache/comet/fuzz/QueryRunner.scala

Lines changed: 49 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
package org.apache.comet.fuzz
2121

22-
import java.io.{BufferedWriter, File, FileWriter, PrintWriter, StringWriter}
22+
import java.io.{BufferedWriter, FileWriter, PrintWriter, StringWriter}
2323

2424
import scala.collection.mutable
2525
import scala.io.Source
@@ -37,7 +37,7 @@ object QueryRunner {
3737
new BufferedWriter(new FileWriter(outputFilename))
3838
}
3939

40-
def assertCorrectness(
40+
def executeSQLAndAssertCorrectness(
4141
spark: SparkSession,
4242
sql: String,
4343
showFailedSparkQueries: Boolean = false,
@@ -59,35 +59,17 @@ object QueryRunner {
5959
val cometRows = df.collect()
6060
val cometPlan = df.queryExecution.executedPlan.toString
6161

62-
if (sparkRows.length == cometRows.length) {
63-
var i = 0
64-
while (i < sparkRows.length) {
65-
val l = sparkRows(i)
66-
val r = cometRows(i)
67-
assert(l.length == r.length)
68-
for (j <- 0 until l.length) {
69-
if (!same(l(j), r(j))) {
70-
showSQL(output, sql)
71-
showPlans(output, sparkPlan, cometPlan)
72-
output.write(s"First difference at row $i:\n")
73-
output.write("Spark: `" + formatRow(l) + "`\n")
74-
output.write("Comet: `" + formatRow(r) + "`\n")
75-
i = sparkRows.length
76-
}
77-
}
78-
i += 1
79-
}
80-
} else {
81-
showSQL(output, sql)
82-
showPlans(output, sparkPlan, cometPlan)
83-
output.write(
84-
s"[ERROR] Spark produced ${sparkRows.length} rows and " +
85-
s"Comet produced ${cometRows.length} rows.\n")
86-
}
62+
QueryComparison.assertSameRows(
63+
sparkRows,
64+
cometRows,
65+
sqlText = sql,
66+
sparkPlan,
67+
cometPlan,
68+
output)
8769
} catch {
8870
case e: Exception =>
8971
// the query worked in Spark but failed in Comet, so this is likely a bug in Comet
90-
showSQL(output, sql)
72+
QueryComparison.showSQL(output, sql)
9173
output.write(s"[ERROR] Query failed in Comet: ${e.getMessage}:\n")
9274
output.write("```\n")
9375
val sw = new StringWriter()
@@ -105,7 +87,7 @@ object QueryRunner {
10587
case e: Exception =>
10688
// we expect many generated queries to be invalid
10789
if (showFailedSparkQueries) {
108-
showSQL(output, sql)
90+
QueryComparison.showSQL(output, sql)
10991
output.write(s"Query failed in Spark: ${e.getMessage}\n")
11092
}
11193
}
@@ -133,67 +115,18 @@ object QueryRunner {
133115
try {
134116
querySource
135117
.getLines()
136-
.foreach(sql => assertCorrectness(spark, sql, showFailedSparkQueries, output = w))
118+
.foreach(sql =>
119+
executeSQLAndAssertCorrectness(spark, sql, showFailedSparkQueries, output = w))
137120

138121
} finally {
139122
w.close()
140123
querySource.close()
141124
}
142125
}
143126

144-
def runTPCQueries(
145-
spark: SparkSession,
146-
dataFolderName: String,
147-
queriesFolderName: String): Unit = {
148-
val output = QueryRunner.createOutputMdFile()
149-
150-
// Load data tables from dataFolder
151-
val dataFolder = new File(dataFolderName)
152-
if (!dataFolder.exists() || !dataFolder.isDirectory) {
153-
// scalastyle:off println
154-
println(s"Error: Data folder $dataFolder does not exist or is not a directory")
155-
// scalastyle:on println
156-
sys.exit(-1)
157-
}
158-
159-
// Traverse data folder and create temp views
160-
dataFolder.listFiles().filter(_.isDirectory).foreach { tableDir =>
161-
val tableName = tableDir.getName
162-
val parquetPath = s"${tableDir.getAbsolutePath}/*.parquet"
163-
spark.read.parquet(parquetPath).createOrReplaceTempView(tableName)
164-
// scalastyle:off println
165-
println(s"Created temp view: $tableName from $parquetPath")
166-
// scalastyle:on println
167-
}
168-
169-
// Load and run queries from queriesFolder
170-
val queriesFolder = new File(queriesFolderName)
171-
if (!queriesFolder.exists() || !queriesFolder.isDirectory) {
172-
// scalastyle:off println
173-
println(s"Error: Queries folder $queriesFolder does not exist or is not a directory")
174-
// scalastyle:on println
175-
sys.exit(-1)
176-
}
177-
178-
// Traverse queries folder and run each .sql file
179-
queriesFolder.listFiles().filter(f => f.isFile && f.getName.endsWith(".sql")).foreach {
180-
sqlFile =>
181-
// scalastyle:off println
182-
println(s"Running query from: ${sqlFile.getName}")
183-
// scalastyle:on println
184-
185-
val querySource = Source.fromFile(sqlFile)
186-
try {
187-
val sql = querySource.mkString
188-
QueryRunner.assertCorrectness(spark, sql, showFailedSparkQueries = false, output)
189-
} finally {
190-
querySource.close()
191-
}
192-
}
193-
194-
output.close()
195-
}
127+
}
196128

129+
object QueryComparison {
197130
private def same(l: Any, r: Any): Boolean = {
198131
if (l == null || r == null) {
199132
return l == null && r == null
@@ -235,7 +168,7 @@ object QueryRunner {
235168
row.toSeq.map(format).mkString(",")
236169
}
237170

238-
private def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = {
171+
def showSQL(w: BufferedWriter, sql: String, maxLength: Int = 120): Unit = {
239172
w.write("## SQL\n")
240173
w.write("```\n")
241174
val words = sql.split(" ")
@@ -262,4 +195,37 @@ object QueryRunner {
262195
w.write(s"```\n$cometPlan\n```\n")
263196
}
264197

198+
def assertSameRows(
199+
sparkRows: Array[Row],
200+
cometRows: Array[Row],
201+
sqlText: String,
202+
sparkPlan: String,
203+
cometPlan: String,
204+
output: BufferedWriter): Unit = {
205+
if (sparkRows.length == cometRows.length) {
206+
var i = 0
207+
while (i < sparkRows.length) {
208+
val l = sparkRows(i)
209+
val r = cometRows(i)
210+
assert(l.length == r.length)
211+
for (j <- 0 until l.length) {
212+
if (!same(l(j), r(j))) {
213+
showSQL(output, sqlText)
214+
showPlans(output, sparkPlan, cometPlan)
215+
output.write(s"First difference at row $i:\n")
216+
output.write("Spark: `" + formatRow(l) + "`\n")
217+
output.write("Comet: `" + formatRow(r) + "`\n")
218+
i = sparkRows.length
219+
}
220+
}
221+
i += 1
222+
}
223+
} else {
224+
showSQL(output, sqlText)
225+
showPlans(output, sparkPlan, cometPlan)
226+
output.write(
227+
s"[ERROR] Spark produced ${sparkRows.length} rows and " +
228+
s"Comet produced ${cometRows.length} rows.\n")
229+
}
230+
}
265231
}

0 commit comments

Comments
 (0)