Skip to content

Commit 60c0f1e

Browse files
authored
chore: Add shuffle benchmark for deeply nested schemas (apache#2902)
1 parent 6ab58cc commit 60c0f1e

File tree

4 files changed

+208
-17
lines changed

4 files changed

+208
-17
lines changed

dev/benchmarks/comet-tpch.sh

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,5 +50,4 @@ $SPARK_HOME/bin/spark-submit \
5050
--data $TPCH_DATA \
5151
--queries $TPCH_QUERIES \
5252
--output . \
53-
--write /tmp \
5453
--iterations 1

spark/src/main/scala/org/apache/comet/testing/FuzzDataGenerator.scala

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,70 @@ object FuzzDataGenerator {
8888
StructType(fields.toSeq)
8989
}
9090

91+
def generateNestedSchema(
92+
r: Random,
93+
numCols: Int,
94+
minDepth: Int,
95+
maxDepth: Int,
96+
options: SchemaGenOptions): StructType = {
97+
assert(numCols > 0)
98+
assert(minDepth >= 0)
99+
assert(maxDepth >= 0)
100+
assert(minDepth <= maxDepth)
101+
assert(
102+
options.generateArray || options.generateStruct || options.generateMap,
103+
"cannot generate nested schema if options do not include generating complex types")
104+
105+
var counter = 0
106+
107+
def generateFieldName() = {
108+
val name = s"c_$counter"
109+
counter += 1
110+
name
111+
}
112+
113+
def generateArray(depth: Int, name: String) = {
114+
val element = genField(r, depth + 1)
115+
StructField(name, DataTypes.createArrayType(element.dataType, true))
116+
}
117+
118+
def generateStruct(depth: Int, name: String) = {
119+
val fields =
120+
Range(1, 2 + r.nextInt(10)).map(_ => genField(r, depth + 1)).toArray
121+
StructField(name, DataTypes.createStructType(fields))
122+
}
123+
124+
def generateMap(depth: Int, name: String) = {
125+
val keyField = genField(r, depth + 1)
126+
val valueField = genField(r, depth + 1)
127+
StructField(name, DataTypes.createMapType(keyField.dataType, valueField.dataType))
128+
}
129+
130+
def generatePrimitive(name: String) = {
131+
StructField(name, randomChoice(options.primitiveTypes, r))
132+
}
133+
134+
def genField(r: Random, depth: Int): StructField = {
135+
val name = generateFieldName()
136+
val generators = new ListBuffer[() => StructField]()
137+
if (options.generateArray && depth < maxDepth) {
138+
generators += (() => generateArray(depth + 1, name))
139+
}
140+
if (options.generateStruct && depth < maxDepth) {
141+
generators += (() => generateStruct(depth + 1, name))
142+
}
143+
if (options.generateMap && depth < maxDepth) {
144+
generators += (() => generateMap(depth + 1, name))
145+
}
146+
if (depth >= minDepth) {
147+
generators += (() => generatePrimitive(name))
148+
}
149+
randomChoice(generators.toSeq, r)()
150+
}
151+
152+
StructType(Range(0, numCols).map(_ => genField(r, 0)))
153+
}
154+
91155
def generateDataFrame(
92156
r: Random,
93157
spark: SparkSession,

spark/src/test/scala/org/apache/comet/DataGeneratorSuite.scala

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,46 @@
1919

2020
package org.apache.comet
2121

22+
import scala.util.Random
23+
2224
import org.apache.spark.sql.CometTestBase
23-
import org.apache.spark.sql.types.StructType
25+
import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructType}
26+
27+
import org.apache.comet.testing.{FuzzDataGenerator, SchemaGenOptions}
2428

2529
class DataGeneratorSuite extends CometTestBase {
2630

31+
test("generate nested schema has at least minDepth levels") {
32+
val minDepth = 3
33+
val numCols = 4
34+
val schema = FuzzDataGenerator.generateNestedSchema(
35+
new Random(42),
36+
numCols,
37+
minDepth = minDepth,
38+
maxDepth = minDepth + 1,
39+
options = SchemaGenOptions(generateMap = true, generateArray = true, generateStruct = true))
40+
assert(schema.fields.length == numCols)
41+
42+
def calculateDepth(dataType: DataType): Int = {
43+
dataType match {
44+
case ArrayType(elementType, _) => 1 + calculateDepth(elementType)
45+
case StructType(fields) =>
46+
if (fields.isEmpty) 1
47+
else 1 + fields.map(f => calculateDepth(f.dataType)).max
48+
case MapType(k, v, _) =>
49+
calculateDepth(k).max(calculateDepth(v))
50+
case _ =>
51+
// primitive type
52+
1
53+
}
54+
}
55+
56+
val actualDepth = schema.fields.map(f => calculateDepth(f.dataType)).max
57+
assert(
58+
actualDepth >= minDepth,
59+
s"Generated schema depth $actualDepth is less than required minimum depth $minDepth")
60+
}
61+
2762
test("test configurable stringGen in row generator") {
2863
val gen = DataGenerator.DEFAULT
2964
val chars = "abcde"

spark/src/test/scala/org/apache/spark/sql/benchmark/CometShuffleBenchmark.scala

Lines changed: 108 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,29 @@
1919

2020
package org.apache.spark.sql.benchmark
2121

22+
import java.text.SimpleDateFormat
23+
24+
import scala.util.Random
25+
2226
import org.apache.spark.SparkConf
2327
import org.apache.spark.benchmark.Benchmark
24-
import org.apache.spark.sql.{Column, SparkSession}
28+
import org.apache.spark.sql.{Column, SaveMode, SparkSession}
2529
import org.apache.spark.sql.internal.SQLConf
2630
import org.apache.spark.sql.types._
2731

2832
import org.apache.comet.CometConf
2933
import org.apache.comet.CometSparkSessionExtensions
34+
import org.apache.comet.testing.{DataGenOptions, FuzzDataGenerator, SchemaGenOptions}
3035

36+
// spotless:off
3137
/**
3238
* Benchmark to measure Comet shuffle performance. To run this benchmark:
33-
* `SPARK_GENERATE_BENCHMARK_FILES=1 make
34-
* benchmark-org.apache.spark.sql.benchmark.CometShuffleBenchmark` Results will be written to
35-
* "spark/benchmarks/CometShuffleBenchmark-**results.txt".
39+
* `SPARK_GENERATE_BENCHMARK_FILES=1 make benchmark-org.apache.spark.sql.benchmark.CometShuffleBenchmark`
40+
* Results will be written to "spark/benchmarks/CometShuffleBenchmark-**results.txt".
3641
*/
42+
// spotless:on
3743
object CometShuffleBenchmark extends CometBenchmarkBase {
44+
3845
override def getSparkSession: SparkSession = {
3946
val conf = new SparkConf()
4047
.setAppName("CometShuffleBenchmark")
@@ -97,7 +104,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
97104
}
98105
}
99106

100-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
107+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
101108
withSQLConf(
102109
CometConf.COMET_ENABLED.key -> "true",
103110
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -154,7 +161,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
154161
}
155162
}
156163

157-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
164+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
158165
withSQLConf(
159166
CometConf.COMET_ENABLED.key -> "true",
160167
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -209,7 +216,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
209216
}
210217
}
211218

212-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
219+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
213220
withSQLConf(
214221
CometConf.COMET_ENABLED.key -> "true",
215222
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -224,7 +231,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
224231
}
225232
}
226233

227-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle + Prefer Dictionary)") { _ =>
234+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle + Prefer Dictionary)") { _ =>
228235
withSQLConf(
229236
CometConf.COMET_ENABLED.key -> "true",
230237
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -239,7 +246,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
239246
}
240247
}
241248

242-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle + Fallback to string)") { _ =>
249+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle + Fallback to string)") { _ =>
243250
withSQLConf(
244251
CometConf.COMET_ENABLED.key -> "true",
245252
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -305,7 +312,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
305312
}
306313
}
307314

308-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
315+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
309316
withSQLConf(
310317
CometConf.COMET_ENABLED.key -> "true",
311318
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -319,7 +326,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
319326
}
320327
}
321328

322-
benchmark.addCase("SQL Parquet - Comet (Comet Async Arrow Shuffle)") { _ =>
329+
benchmark.addCase("SQL Parquet - Comet (Comet Async JVM Shuffle)") { _ =>
323330
withSQLConf(
324331
CometConf.COMET_ENABLED.key -> "true",
325332
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -389,7 +396,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
389396
}
390397
}
391398

392-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
399+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
393400
withSQLConf(
394401
CometConf.COMET_ENABLED.key -> "true",
395402
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -402,7 +409,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
402409
}
403410
}
404411

405-
benchmark.addCase("SQL Parquet - Comet (Comet Shuffle)") { _ =>
412+
benchmark.addCase("SQL Parquet - Comet (Comet Native Shuffle)") { _ =>
406413
withSQLConf(
407414
CometConf.COMET_ENABLED.key -> "true",
408415
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -459,7 +466,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
459466
}
460467
}
461468

462-
benchmark.addCase("SQL Parquet - Comet (Comet Arrow Shuffle)") { _ =>
469+
benchmark.addCase("SQL Parquet - Comet (Comet JVM Shuffle)") { _ =>
463470
withSQLConf(
464471
CometConf.COMET_ENABLED.key -> "true",
465472
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -472,7 +479,7 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
472479
}
473480
}
474481

475-
benchmark.addCase("SQL Parquet - Comet (Comet Shuffle)") { _ =>
482+
benchmark.addCase("SQL Parquet - Comet (Comet Native Shuffle)") { _ =>
476483
withSQLConf(
477484
CometConf.COMET_ENABLED.key -> "true",
478485
CometConf.COMET_EXEC_ENABLED.key -> "true",
@@ -490,7 +497,73 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
490497
}
491498
}
492499

500+
def shuffleDeeplyNestedBenchmark(
501+
name: String,
502+
filename: String,
503+
numRows: Int,
504+
partitionNum: Int): Unit = {
505+
val benchmark =
506+
new Benchmark(s"Shuffle with nested schema ($name)", numRows, output = output)
507+
val df = spark.read.parquet(filename)
508+
withTempTable("deeplyNestedTable") {
509+
df.createOrReplaceTempView("deeplyNestedTable")
510+
val sql = "select * from deeplyNestedTable"
511+
512+
benchmark.addCase("Spark") { _ =>
513+
spark
514+
.sql(sql)
515+
.repartition(partitionNum)
516+
.noop()
517+
}
518+
519+
benchmark.addCase("Comet (Spark Shuffle)") { _ =>
520+
withSQLConf(
521+
CometConf.COMET_ENABLED.key -> "true",
522+
CometConf.COMET_EXEC_ENABLED.key -> "true",
523+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false") {
524+
spark
525+
.sql(sql)
526+
.repartition(partitionNum)
527+
.noop()
528+
}
529+
}
530+
531+
for (shuffle <- Seq("jvm", "native")) {
532+
benchmark.addCase(s"Comet ($shuffle Shuffle)") { _ =>
533+
withSQLConf(
534+
CometConf.COMET_ENABLED.key -> "true",
535+
CometConf.COMET_EXEC_ENABLED.key -> "true",
536+
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
537+
CometConf.COMET_SHUFFLE_MODE.key -> shuffle) {
538+
spark
539+
.sql(sql)
540+
.repartition(partitionNum)
541+
.noop()
542+
}
543+
}
544+
}
545+
546+
benchmark.run()
547+
}
548+
}
549+
493550
override def runCometBenchmark(mainArgs: Array[String]): Unit = {
551+
552+
// nested type shuffle
553+
val numRows = 1000
554+
for (maxDepth <- Seq(2, 6)) {
555+
val filename =
556+
createDeeplyNestedParquetFile(numRows, maxDepth)
557+
try {
558+
for (partitionNum <- Seq(5, 201)) {
559+
val name = s"maxDepth=$maxDepth, partitionNum=$partitionNum"
560+
shuffleDeeplyNestedBenchmark(name, filename, numRows, partitionNum)
561+
}
562+
} finally {
563+
new java.io.File(filename).delete()
564+
}
565+
}
566+
494567
runBenchmarkWithTable("Shuffle on array", 1024 * 1024 * 1) { v =>
495568
Seq(
496569
BooleanType,
@@ -725,4 +798,24 @@ object CometShuffleBenchmark extends CometBenchmarkBase {
725798
}
726799
}
727800
}
801+
802+
private def createDeeplyNestedParquetFile(numRows: Int, maxDepth: Int): String = {
803+
val r = new Random(42)
804+
val options =
805+
SchemaGenOptions(generateArray = true, generateStruct = true, generateMap = true)
806+
val schema = FuzzDataGenerator.generateNestedSchema(r, 100, maxDepth - 1, maxDepth, options)
807+
val tempDir = System.getProperty("java.io.tmpdir")
808+
val filename = s"$tempDir/CometShuffleBenchmark_${System.currentTimeMillis()}.parquet"
809+
withSQLConf(CometConf.COMET_ENABLED.key -> "false") {
810+
val dataGenOptions = DataGenOptions(
811+
generateNegativeZero = false,
812+
// override base date due to known issues with experimental scans
813+
baseDate =
814+
new SimpleDateFormat("YYYY-MM-DD hh:mm:ss").parse("2024-05-25 12:34:56").getTime)
815+
val df =
816+
FuzzDataGenerator.generateDataFrame(r, spark, schema, numRows, dataGenOptions)
817+
df.write.mode(SaveMode.Overwrite).parquet(filename)
818+
}
819+
filename
820+
}
728821
}

0 commit comments

Comments
 (0)