Skip to content

Commit cb3cfb4

Browse files
yifeihifilonenko
authored andcommitted
SPARK-25299: add CI infrastructure and SortShuffleWriterBenchmark (apache-spark-on-k8s#498)
* add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache-spark-on-k8s#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache-spark-on-k8s#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
1 parent e4b36df commit cb3cfb4

File tree

3 files changed

+418
-0
lines changed

3 files changed

+418
-0
lines changed
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort
19+
20+
import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
21+
import java.util.UUID
22+
23+
import org.apache.commons.io.FileUtils
24+
import org.mockito.{Mock, MockitoAnnotations}
25+
import org.mockito.Answers.RETURNS_SMART_NULLS
26+
import org.mockito.ArgumentMatchers.any
27+
import org.mockito.Mockito.when
28+
import scala.collection.mutable
29+
import scala.collection.mutable.ArrayBuffer
30+
import scala.util.Random
31+
32+
import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext}
33+
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
34+
import org.apache.spark.executor.TaskMetrics
35+
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
36+
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
37+
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
38+
import org.apache.spark.shuffle.IndexShuffleBlockResolver
39+
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
40+
import org.apache.spark.util.Utils
41+
42+
abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {
43+
44+
protected val DEFAULT_DATA_STRING_SIZE = 5
45+
46+
// This is only used in the writer constructors, so it's ok to mock
47+
@Mock(answer = RETURNS_SMART_NULLS) protected var dependency:
48+
ShuffleDependency[String, String, String] = _
49+
// This is only used in the stop() function, so we can safely mock this without affecting perf
50+
@Mock(answer = RETURNS_SMART_NULLS) protected var taskContext: TaskContext = _
51+
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEnv: RpcEnv = _
52+
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEndpointRef: RpcEndpointRef = _
53+
54+
protected val defaultConf: SparkConf = new SparkConf(loadDefaults = false)
55+
protected val serializer: Serializer = new KryoSerializer(defaultConf)
56+
protected val partitioner: HashPartitioner = new HashPartitioner(10)
57+
protected val serializerManager: SerializerManager =
58+
new SerializerManager(serializer, defaultConf)
59+
protected val shuffleMetrics: TaskMetrics = new TaskMetrics
60+
61+
protected val tempFilesCreated: ArrayBuffer[File] = new ArrayBuffer[File]
62+
protected val filenameToFile: mutable.Map[String, File] = new mutable.HashMap[String, File]
63+
64+
class TestDiskBlockManager(tempDir: File) extends DiskBlockManager(defaultConf, false) {
65+
override def getFile(filename: String): File = {
66+
if (filenameToFile.contains(filename)) {
67+
filenameToFile(filename)
68+
} else {
69+
val outputFile = File.createTempFile("shuffle", null, tempDir)
70+
filenameToFile(filename) = outputFile
71+
outputFile
72+
}
73+
}
74+
75+
override def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
76+
var blockId = new TempShuffleBlockId(UUID.randomUUID())
77+
val file = getFile(blockId)
78+
tempFilesCreated += file
79+
(blockId, file)
80+
}
81+
}
82+
83+
class TestBlockManager(tempDir: File, memoryManager: MemoryManager) extends BlockManager("0",
84+
rpcEnv,
85+
null,
86+
serializerManager,
87+
defaultConf,
88+
memoryManager,
89+
null,
90+
null,
91+
null,
92+
null,
93+
1) {
94+
override val diskBlockManager = new TestDiskBlockManager(tempDir)
95+
override val remoteBlockTempFileManager = null
96+
}
97+
98+
protected var tempDir: File = _
99+
100+
protected var blockManager: BlockManager = _
101+
protected var blockResolver: IndexShuffleBlockResolver = _
102+
103+
protected var memoryManager: TestMemoryManager = _
104+
protected var taskMemoryManager: TaskMemoryManager = _
105+
106+
MockitoAnnotations.initMocks(this)
107+
when(dependency.partitioner).thenReturn(partitioner)
108+
when(dependency.serializer).thenReturn(serializer)
109+
when(dependency.shuffleId).thenReturn(0)
110+
when(taskContext.taskMetrics()).thenReturn(shuffleMetrics)
111+
when(rpcEnv.setupEndpoint(any[String], any[RpcEndpoint])).thenReturn(rpcEndpointRef)
112+
113+
def setup(): Unit = {
114+
memoryManager = new TestMemoryManager(defaultConf)
115+
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES)
116+
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
117+
tempDir = Utils.createTempDir()
118+
blockManager = new TestBlockManager(tempDir, memoryManager)
119+
blockResolver = new IndexShuffleBlockResolver(
120+
defaultConf,
121+
blockManager)
122+
}
123+
124+
def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
125+
benchmark.addTimerCase(name) { timer =>
126+
setup()
127+
func(timer)
128+
teardown()
129+
}
130+
}
131+
132+
def teardown(): Unit = {
133+
FileUtils.deleteDirectory(tempDir)
134+
tempFilesCreated.clear()
135+
filenameToFile.clear()
136+
}
137+
138+
protected class DataIterator (size: Int)
139+
extends Iterator[Product2[String, String]] {
140+
val random = new Random(123)
141+
var count = 0
142+
override def hasNext: Boolean = {
143+
count < size
144+
}
145+
146+
override def next(): Product2[String, String] = {
147+
count+=1
148+
val string = random.alphanumeric.take(5).mkString
149+
(string, string)
150+
}
151+
}
152+
153+
154+
def createDataIterator(size: Int): DataIterator = {
155+
new DataIterator(size)
156+
}
157+
158+
}
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort
19+
20+
import org.mockito.Mockito.when
21+
22+
import org.apache.spark.{Aggregator, SparkEnv}
23+
import org.apache.spark.benchmark.Benchmark
24+
import org.apache.spark.shuffle.BaseShuffleHandle
25+
import org.apache.spark.util.Utils
26+
27+
/**
28+
* Benchmark to measure performance for aggregate primitives.
29+
* {{{
30+
* To run this benchmark:
31+
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
32+
* 2. build/sbt "sql/test:runMain <this class>"
33+
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
34+
* Results will be written to "benchmarks/<this class>-results.txt".
35+
* }}}
36+
*/
37+
object SortShuffleWriterBenchmark extends ShuffleWriterBenchmarkBase {
38+
39+
private val shuffleHandle: BaseShuffleHandle[String, String, String] =
40+
new BaseShuffleHandle(
41+
shuffleId = 0,
42+
numMaps = 1,
43+
dependency = dependency)
44+
45+
private val MIN_NUM_ITERS = 10
46+
private val DATA_SIZE_SMALL = 1000
47+
private val DATA_SIZE_LARGE =
48+
PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES/4/DEFAULT_DATA_STRING_SIZE
49+
50+
def getWriter(aggregator: Option[Aggregator[String, String, String]],
51+
sorter: Option[Ordering[String]]): SortShuffleWriter[String, String, String] = {
52+
// we need this since SortShuffleWriter uses SparkEnv to get lots of its private vars
53+
SparkEnv.set(new SparkEnv(
54+
"0",
55+
null,
56+
serializer,
57+
null,
58+
serializerManager,
59+
null,
60+
null,
61+
null,
62+
blockManager,
63+
null,
64+
null,
65+
null,
66+
null,
67+
defaultConf
68+
))
69+
70+
if (aggregator.isEmpty && sorter.isEmpty) {
71+
when(dependency.mapSideCombine).thenReturn(false)
72+
} else {
73+
when(dependency.mapSideCombine).thenReturn(false)
74+
when(dependency.aggregator).thenReturn(aggregator)
75+
when(dependency.keyOrdering).thenReturn(sorter)
76+
}
77+
78+
when(taskContext.taskMemoryManager()).thenReturn(taskMemoryManager)
79+
80+
val shuffleWriter = new SortShuffleWriter[String, String, String](
81+
blockResolver,
82+
shuffleHandle,
83+
0,
84+
taskContext
85+
)
86+
shuffleWriter
87+
}
88+
89+
def writeBenchmarkWithSmallDataset(): Unit = {
90+
val size = DATA_SIZE_SMALL
91+
val benchmark = new Benchmark("SortShuffleWriter without spills",
92+
size,
93+
minNumIters = MIN_NUM_ITERS,
94+
output = output)
95+
addBenchmarkCase(benchmark, "small dataset without spills") { timer =>
96+
val shuffleWriter = getWriter(Option.empty, Option.empty)
97+
val dataIterator = createDataIterator(size)
98+
try {
99+
timer.startTiming()
100+
shuffleWriter.write(dataIterator)
101+
timer.stopTiming()
102+
assert(tempFilesCreated.isEmpty)
103+
} finally {
104+
shuffleWriter.stop(true)
105+
}
106+
}
107+
benchmark.run()
108+
}
109+
110+
def writeBenchmarkWithSpill(): Unit = {
111+
val size = DATA_SIZE_LARGE
112+
113+
val benchmark = new Benchmark("SortShuffleWriter with spills",
114+
size,
115+
minNumIters = MIN_NUM_ITERS,
116+
output = output,
117+
outputPerIteration = true)
118+
addBenchmarkCase(benchmark, "no map side combine") { timer =>
119+
val shuffleWriter = getWriter(Option.empty, Option.empty)
120+
val dataIterator = createDataIterator(size)
121+
try {
122+
timer.startTiming()
123+
shuffleWriter.write(dataIterator)
124+
timer.stopTiming()
125+
assert(tempFilesCreated.length == 7)
126+
} finally {
127+
shuffleWriter.stop(true)
128+
}
129+
}
130+
131+
def createCombiner(i: String): String = i
132+
def mergeValue(i: String, j: String): String = if (Ordering.String.compare(i, j) > 0) i else j
133+
def mergeCombiners(i: String, j: String): String =
134+
if (Ordering.String.compare(i, j) > 0) i else j
135+
val aggregator =
136+
new Aggregator[String, String, String](createCombiner, mergeValue, mergeCombiners)
137+
addBenchmarkCase(benchmark, "with map side aggregation") { timer =>
138+
val shuffleWriter = getWriter(Some(aggregator), Option.empty)
139+
val dataIterator = createDataIterator(size)
140+
try {
141+
timer.startTiming()
142+
shuffleWriter.write(dataIterator)
143+
timer.stopTiming()
144+
assert(tempFilesCreated.length == 7)
145+
} finally {
146+
shuffleWriter.stop(true)
147+
}
148+
}
149+
150+
val sorter = Ordering.String
151+
addBenchmarkCase(benchmark, "with map side sort") { timer =>
152+
val shuffleWriter = getWriter(Option.empty, Some(sorter))
153+
val dataIterator = createDataIterator(size)
154+
try {
155+
timer.startTiming()
156+
shuffleWriter.write(dataIterator)
157+
timer.stopTiming()
158+
assert(tempFilesCreated.length == 7)
159+
} finally {
160+
shuffleWriter.stop(true)
161+
}
162+
}
163+
benchmark.run()
164+
}
165+
166+
override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
167+
runBenchmark("SortShuffleWriter writer") {
168+
writeBenchmarkWithSmallDataset()
169+
writeBenchmarkWithSpill()
170+
}
171+
}
172+
}

0 commit comments

Comments
 (0)