Skip to content

Commit 1a5d83b

Browse files
committed
[SPARK-25589][SQL][TEST] Add BloomFilterBenchmark
## What changes were proposed in this pull request? This PR aims to add `BloomFilterBenchmark`. For ORC data source, Apache Spark has been supporting for a long time. For Parquet data source, it's expected to be added with next Parquet release update. ## How was this patch tested? Manual. ```scala SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain org.apache.spark.sql.execution.benchmark.BloomFilterBenchmark" ``` Closes apache#22605 from dongjoon-hyun/SPARK-25589. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 928d073 commit 1a5d83b

File tree

2 files changed

+111
-0
lines changed

2 files changed

+111
-0
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
================================================================================================
2+
ORC Write
3+
================================================================================================
4+
5+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
6+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
7+
Write 100M rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
8+
------------------------------------------------------------------------------------------------
9+
Without bloom filter 16765 / 17587 6.0 167.7 1.0X
10+
With bloom filter 20060 / 20626 5.0 200.6 0.8X
11+
12+
13+
================================================================================================
14+
ORC Read
15+
================================================================================================
16+
17+
OpenJDK 64-Bit Server VM 1.8.0_181-b13 on Linux 3.10.0-862.3.2.el7.x86_64
18+
Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
19+
Read a row from 100M rows: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
20+
------------------------------------------------------------------------------------------------
21+
Without bloom filter 1857 / 1904 53.9 18.6 1.0X
22+
With bloom filter 1399 / 1437 71.5 14.0 1.3X
23+
24+
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.benchmark
19+
20+
import scala.util.Random
21+
22+
import org.apache.spark.benchmark.Benchmark
23+
24+
/**
25+
* Benchmark to measure read performance with Bloom filters.
26+
*
27+
* Currently, only ORC supports bloom filters, we will add Parquet BM as soon as it becomes
28+
* available.
29+
*
30+
* To run this benchmark:
31+
* {{{
32+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
33+
* 2. build/sbt "sql/test:runMain <this class>"
34+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
35+
* Results will be written to "benchmarks/BloomFilterBenchmark-results.txt".
36+
* }}}
37+
*/
38+
object BloomFilterBenchmark extends SqlBasedBenchmark {
39+
import spark.implicits._
40+
41+
private val scaleFactor = 100
42+
private val N = scaleFactor * 1000 * 1000
43+
private val df = spark.range(N).map(_ => Random.nextInt)
44+
45+
private def writeBenchmark(): Unit = {
46+
withTempPath { dir =>
47+
val path = dir.getCanonicalPath
48+
49+
runBenchmark(s"ORC Write") {
50+
val benchmark = new Benchmark(s"Write ${scaleFactor}M rows", N, output = output)
51+
benchmark.addCase("Without bloom filter") { _ =>
52+
df.write.mode("overwrite").orc(path + "/withoutBF")
53+
}
54+
benchmark.addCase("With bloom filter") { _ =>
55+
df.write.mode("overwrite")
56+
.option("orc.bloom.filter.columns", "value").orc(path + "/withBF")
57+
}
58+
benchmark.run()
59+
}
60+
}
61+
}
62+
63+
private def readBenchmark(): Unit = {
64+
withTempPath { dir =>
65+
val path = dir.getCanonicalPath
66+
67+
df.write.orc(path + "/withoutBF")
68+
df.write.option("orc.bloom.filter.columns", "value").orc(path + "/withBF")
69+
70+
runBenchmark(s"ORC Read") {
71+
val benchmark = new Benchmark(s"Read a row from ${scaleFactor}M rows", N, output = output)
72+
benchmark.addCase("Without bloom filter") { _ =>
73+
spark.read.orc(path + "/withoutBF").where("value = 0").count
74+
}
75+
benchmark.addCase("With bloom filter") { _ =>
76+
spark.read.orc(path + "/withBF").where("value = 0").count
77+
}
78+
benchmark.run()
79+
}
80+
}
81+
}
82+
83+
override def runBenchmarkSuite(): Unit = {
84+
writeBenchmark()
85+
readBenchmark()
86+
}
87+
}

0 commit comments

Comments
 (0)