Skip to content

Commit 11a37ed

Browse files
yifeihmccheah
authored andcommitted
SPARK-25299: add CI infrastructure and SortShuffleWriterBenchmark (apache-spark-on-k8s#498)
* add initial bypass merge sort shuffle writer benchmarks * dd unsafe shuffle writer benchmarks * changes in bypassmergesort benchmarks * cleanup * add circle script * add this branch for testing * fix circle attempt 1 * checkout code * add some caches? * why is it not pull caches... * save as artifact instead of publishing * mkdir * typo * try uploading artifacts again * try print per iteration to avoid circle erroring out on idle * blah (apache-spark-on-k8s#495) * make a PR comment * actually delete files * run benchmarks on test build branch * oops forgot to enable upload * add sort shuffle writer benchmarks * add stdev * cleanup sort a bit * fix stdev text * fix sort shuffle * initial code for read side * format * use times and sample stdev * add assert for at least one iteration * cleanup shuffle write to use fewer mocks and single base interface * shuffle read works with transport client... needs lots of cleaning * test running in cicle * scalastyle * dont publish results yet * cleanup writer code * get only git message * fix command to get PR number * add SortshuffleWriterBenchmark * writer code * cleanup * fix benchmark script * use ArgumentMatchers * also in shufflewriterbenchmarkbase * scalastyle * add apache license * fix some scale stuff * fix up tests * only copy benchmarks we care about * increase size for reader again * delete two writers and reader for PR * SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506) * Revert "SPARK-25299: Add shuffle reader benchmarks (apache-spark-on-k8s#506)" This reverts commit 9d46fae. * add -e to bash script * blah * enable upload as a PR comment and prevent running benchmarks on this branch * Revert "enable upload as a PR comment and prevent running benchmarks on this branch" This reverts commit 13703fa. * try machine execution * try uploading benchmarks (apache-spark-on-k8s#498) * only upload results when merging into the feature branch * lock down machine image * don't write input data to disk * run benchmark test * stop creating file cleanup threads for every block manager * use alphanumeric again * use a new random everytime * close the writers -__________- * delete branch and publish results as comment * close in finally
1 parent 4bf2a75 commit 11a37ed

File tree

5 files changed

+462
-6
lines changed

5 files changed

+462
-6
lines changed

.circleci/config.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,12 @@ defaults: &defaults
88
TERM: dumb
99
BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache"
1010

11+
spark-25299-config: &spark-25299-config
12+
machine:
13+
image: circleci/classic:201808-01
14+
environment: &defaults-environment
15+
TERM: dumb
16+
BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache"
1117

1218
test-defaults: &test-defaults
1319
<<: *defaults
@@ -23,6 +29,12 @@ all-branches-and-tags: &all-branches-and-tags
2329
tags:
2430
only: /.*/
2531

32+
spark-25299-branch-only: &spark-25299-branch-only
33+
filters:
34+
branches:
35+
only:
36+
- spark-25299
37+
2638
deployable-branches-and-tags: &deployable-branches-and-tags
2739
filters:
2840
tags:
@@ -452,6 +464,22 @@ jobs:
452464
key: v1-maven-dependency-cache-versioned-{{ checksum "pom.xml" }}
453465
paths: ~/.m2
454466

467+
run-spark-25299-benchmarks:
468+
<<: *spark-25299-config
469+
steps:
470+
- *checkout-code
471+
- attach_workspace:
472+
at: .
473+
- *restore-build-sbt-cache
474+
- *link-in-build-sbt-cache
475+
- *restore-ivy-cache
476+
- *restore-build-binaries-cache
477+
- *restore-home-sbt-cache
478+
- run:
479+
command: ./dev/run-spark-25299-benchmarks.sh -u
480+
- store_artifacts:
481+
path: /tmp/artifacts/
482+
455483
deploy-gradle:
456484
<<: *defaults
457485
docker:
@@ -512,6 +540,10 @@ workflows:
512540
requires:
513541
- build-sbt
514542
<<: *all-branches-and-tags
543+
- run-spark-25299-benchmarks:
544+
requires:
545+
- build-sbt
546+
<<: *spark-25299-branch-only
515547
- run-scala-tests:
516548
requires:
517549
- build-sbt

core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,15 @@ private[spark] class Benchmark(
111111
// The results are going to be processor specific so it is useful to include that.
112112
out.println(Benchmark.getJVMOSInfo())
113113
out.println(Benchmark.getProcessorName())
114-
out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
114+
out.printf("%-40s %14s %14s %11s %12s %13s %10s\n", name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)",
115115
"Per Row(ns)", "Relative")
116-
out.println("-" * 96)
116+
out.println("-" * 120)
117117
results.zip(benchmarks).foreach { case (result, benchmark) =>
118-
out.printf("%-40s %16s %12s %13s %10s\n",
118+
out.printf("%-40s %14s %14s %11s %12s %13s %10s\n",
119119
benchmark.name,
120-
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
120+
"%5.0f" format result.bestMs,
121+
"%4.0f" format result.avgMs,
122+
"%5.0f" format result.stdevMs,
121123
"%10.1f" format result.bestRate,
122124
"%6.1f" format (1000 / result.bestRate),
123125
"%3.1fX" format (firstBest / result.bestMs))
@@ -156,9 +158,13 @@ private[spark] class Benchmark(
156158
// scalastyle:off
157159
println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
158160
// scalastyle:on
161+
assert(runTimes.nonEmpty)
159162
val best = runTimes.min
160163
val avg = runTimes.sum / runTimes.size
161-
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
164+
val stdev = if (runTimes.size > 1) {
165+
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
166+
} else 0
167+
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
162168
}
163169
}
164170

@@ -191,7 +197,7 @@ private[spark] object Benchmark {
191197
}
192198

193199
case class Case(name: String, fn: Timer => Unit, numIters: Int)
194-
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
200+
case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)
195201

196202
/**
197203
* This should return a user helpful processor information. Getting at this depends on the OS.
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort
19+
20+
import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
21+
import java.util.UUID
22+
23+
import org.apache.commons.io.FileUtils
24+
import org.mockito.{Mock, MockitoAnnotations}
25+
import org.mockito.Answers.RETURNS_SMART_NULLS
26+
import org.mockito.ArgumentMatchers.any
27+
import org.mockito.Mockito.when
28+
import scala.collection.mutable
29+
import scala.collection.mutable.ArrayBuffer
30+
import scala.util.Random
31+
32+
import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext}
33+
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
34+
import org.apache.spark.executor.TaskMetrics
35+
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
36+
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
37+
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
38+
import org.apache.spark.shuffle.IndexShuffleBlockResolver
39+
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
40+
import org.apache.spark.util.Utils
41+
42+
abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {
43+
44+
protected val DEFAULT_DATA_STRING_SIZE = 5
45+
46+
// This is only used in the writer constructors, so it's ok to mock
47+
@Mock(answer = RETURNS_SMART_NULLS) protected var dependency:
48+
ShuffleDependency[String, String, String] = _
49+
// This is only used in the stop() function, so we can safely mock this without affecting perf
50+
@Mock(answer = RETURNS_SMART_NULLS) protected var taskContext: TaskContext = _
51+
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEnv: RpcEnv = _
52+
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEndpointRef: RpcEndpointRef = _
53+
54+
protected val defaultConf: SparkConf = new SparkConf(loadDefaults = false)
55+
protected val serializer: Serializer = new KryoSerializer(defaultConf)
56+
protected val partitioner: HashPartitioner = new HashPartitioner(10)
57+
protected val serializerManager: SerializerManager =
58+
new SerializerManager(serializer, defaultConf)
59+
protected val shuffleMetrics: TaskMetrics = new TaskMetrics
60+
61+
protected val tempFilesCreated: ArrayBuffer[File] = new ArrayBuffer[File]
62+
protected val filenameToFile: mutable.Map[String, File] = new mutable.HashMap[String, File]
63+
64+
class TestDiskBlockManager(tempDir: File) extends DiskBlockManager(defaultConf, false) {
65+
override def getFile(filename: String): File = {
66+
if (filenameToFile.contains(filename)) {
67+
filenameToFile(filename)
68+
} else {
69+
val outputFile = File.createTempFile("shuffle", null, tempDir)
70+
filenameToFile(filename) = outputFile
71+
outputFile
72+
}
73+
}
74+
75+
override def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
76+
var blockId = new TempShuffleBlockId(UUID.randomUUID())
77+
val file = getFile(blockId)
78+
tempFilesCreated += file
79+
(blockId, file)
80+
}
81+
}
82+
83+
class TestBlockManager(tempDir: File, memoryManager: MemoryManager) extends BlockManager("0",
84+
rpcEnv,
85+
null,
86+
serializerManager,
87+
defaultConf,
88+
memoryManager,
89+
null,
90+
null,
91+
null,
92+
null,
93+
1) {
94+
override val diskBlockManager = new TestDiskBlockManager(tempDir)
95+
override val remoteBlockTempFileManager = null
96+
}
97+
98+
protected var tempDir: File = _
99+
100+
protected var blockManager: BlockManager = _
101+
protected var blockResolver: IndexShuffleBlockResolver = _
102+
103+
protected var memoryManager: TestMemoryManager = _
104+
protected var taskMemoryManager: TaskMemoryManager = _
105+
106+
MockitoAnnotations.initMocks(this)
107+
when(dependency.partitioner).thenReturn(partitioner)
108+
when(dependency.serializer).thenReturn(serializer)
109+
when(dependency.shuffleId).thenReturn(0)
110+
when(taskContext.taskMetrics()).thenReturn(shuffleMetrics)
111+
when(rpcEnv.setupEndpoint(any[String], any[RpcEndpoint])).thenReturn(rpcEndpointRef)
112+
113+
def setup(): Unit = {
114+
memoryManager = new TestMemoryManager(defaultConf)
115+
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES)
116+
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
117+
tempDir = Utils.createTempDir()
118+
blockManager = new TestBlockManager(tempDir, memoryManager)
119+
blockResolver = new IndexShuffleBlockResolver(
120+
defaultConf,
121+
blockManager)
122+
}
123+
124+
def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
125+
benchmark.addTimerCase(name) { timer =>
126+
setup()
127+
func(timer)
128+
teardown()
129+
}
130+
}
131+
132+
def teardown(): Unit = {
133+
FileUtils.deleteDirectory(tempDir)
134+
tempFilesCreated.clear()
135+
filenameToFile.clear()
136+
}
137+
138+
protected class DataIterator (size: Int)
139+
extends Iterator[Product2[String, String]] {
140+
val random = new Random(123)
141+
var count = 0
142+
override def hasNext: Boolean = {
143+
count < size
144+
}
145+
146+
override def next(): Product2[String, String] = {
147+
count+=1
148+
val string = random.alphanumeric.take(5).mkString
149+
(string, string)
150+
}
151+
}
152+
153+
154+
def createDataIterator(size: Int): DataIterator = {
155+
new DataIterator(size)
156+
}
157+
158+
}

0 commit comments

Comments
 (0)