Skip to content

Commit b9a2bc9

Browse files
mccheahifilonenko
authored andcommitted
[SPARK-25299] Shuffle locations api (apache-spark-on-k8s#517)
Implements the shuffle locations API as part of SPARK-25299. This adds an additional field to all `MapStatus` objects: a `MapShuffleLocations` that indicates where a task's map output is stored. This module is optional and implementations of the pluggable shuffle writers and readers can ignore it accordingly. This API is designed with the use case in mind of future plugin implementations desiring to have the driver store metadata about where shuffle blocks are stored. There are a few caveats to this design: - We originally wanted to remove the `BlockManagerId` from `MapStatus` entirely and replace it with this object. However, doing this proves to be very difficult, as many places use the block manager ID for other kinds of shuffle data bookkeeping. As a result, we concede to storing the block manager ID redundantly here. However, the overhead should be minimal: because we cache block manager ids and default map shuffle locations, the two fields in `MapStatus` should point to the same object on the heap. Thus we add `O(M)` storage overhead on the driver, where for each map status we're storing an additional pointer to the same on-heap object. We will run benchmarks against the TPC-DS workload to see if there are significant performance repercussions for this implementation. - `KryoSerializer` expects `CompressedMapStatus` and `HighlyCompressedMapStatus` to be serialized via reflection, so originally all fields of these classes needed to be registered with Kryo. However, the `MapShuffleLocations` is now pluggable. We think however that previously Kryo was defaulting to Java serialization anyways, so we now just explicitly tell Kryo to use `ExternalizableSerializer` to deal with these objects. There's a small hack in the serialization protocol that attempts to avoid serializing the same `BlockManagerId` twice in the case that the map shuffle locations is a `DefaultMapShuffleLocations`.
1 parent b0ca599 commit b9a2bc9

29 files changed

+463
-125
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.api.shuffle;
18+
19+
import org.apache.spark.annotation.Experimental;
20+
21+
import java.io.Serializable;
22+
23+
/**
24+
* Represents metadata about where shuffle blocks were written in a single map task.
25+
* <p>
26+
* This is optionally returned by shuffle writers. The inner shuffle locations may
27+
* be accessed by shuffle readers. Shuffle locations are only necessary when the
28+
* location of shuffle blocks needs to be managed by the driver; shuffle plugins
29+
* may choose to use an external database or other metadata management systems to
30+
* track the locations of shuffle blocks instead.
31+
*/
32+
@Experimental
33+
public interface MapShuffleLocations extends Serializable {
34+
35+
/**
36+
* Get the location for a given shuffle block written by this map task.
37+
*/
38+
ShuffleLocation getLocationForBlock(int reduceId);
39+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.shuffle;
19+
20+
/**
21+
* Marker interface representing a location of a shuffle block. Implementations of shuffle readers
22+
* and writers are expected to cast this down to an implementation-specific representation.
23+
*/
24+
public interface ShuffleLocation {
25+
}

core/src/main/java/org/apache/spark/api/shuffle/ShuffleMapOutputWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121

2222
import org.apache.spark.annotation.Experimental;
23+
import org.apache.spark.api.java.Optional;
2324

2425
/**
2526
* :: Experimental ::
@@ -31,7 +32,7 @@
3132
public interface ShuffleMapOutputWriter {
3233
ShufflePartitionWriter getNextPartitionWriter() throws IOException;
3334

34-
void commitAllPartitions() throws IOException;
35+
Optional<MapShuffleLocations> commitAllPartitions() throws IOException;
3536

3637
void abort(Throwable error) throws IOException;
3738
}

core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import java.nio.channels.WritableByteChannel;
2626
import javax.annotation.Nullable;
2727

28+
import org.apache.spark.api.java.Optional;
29+
import org.apache.spark.api.shuffle.MapShuffleLocations;
2830
import scala.None$;
2931
import scala.Option;
3032
import scala.Product2;
@@ -134,8 +136,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
134136
try {
135137
if (!records.hasNext()) {
136138
partitionLengths = new long[numPartitions];
137-
mapOutputWriter.commitAllPartitions();
138-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
139+
Optional<MapShuffleLocations> blockLocs = mapOutputWriter.commitAllPartitions();
140+
mapStatus = MapStatus$.MODULE$.apply(
141+
blockManager.shuffleServerId(),
142+
blockLocs.orNull(),
143+
partitionLengths);
139144
return;
140145
}
141146
final SerializerInstance serInstance = serializer.newInstance();
@@ -168,8 +173,11 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
168173
}
169174

170175
partitionLengths = writePartitionedData(mapOutputWriter);
171-
mapOutputWriter.commitAllPartitions();
172-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
176+
Optional<MapShuffleLocations> mapLocations = mapOutputWriter.commitAllPartitions();
177+
mapStatus = MapStatus$.MODULE$.apply(
178+
blockManager.shuffleServerId(),
179+
mapLocations.orNull(),
180+
partitionLengths);
173181
} catch (Exception e) {
174182
try {
175183
mapOutputWriter.abort(e);
@@ -178,6 +186,10 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
178186
}
179187
throw e;
180188
}
189+
mapStatus = MapStatus$.MODULE$.apply(
190+
blockManager.shuffleServerId(),
191+
DefaultMapShuffleLocations.get(blockManager.shuffleServerId()),
192+
partitionLengths);
181193
}
182194

183195
@VisibleForTesting
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.shuffle.sort;
19+
20+
import com.google.common.cache.CacheBuilder;
21+
import com.google.common.cache.CacheLoader;
22+
import com.google.common.cache.LoadingCache;
23+
24+
import org.apache.spark.api.shuffle.MapShuffleLocations;
25+
import org.apache.spark.api.shuffle.ShuffleLocation;
26+
import org.apache.spark.storage.BlockManagerId;
27+
28+
import java.util.Objects;
29+
30+
public class DefaultMapShuffleLocations implements MapShuffleLocations, ShuffleLocation {
31+
32+
/**
33+
* We borrow the cache size from the BlockManagerId's cache - around 1MB, which should be
34+
* feasible.
35+
*/
36+
private static final LoadingCache<BlockManagerId, DefaultMapShuffleLocations>
37+
DEFAULT_SHUFFLE_LOCATIONS_CACHE =
38+
CacheBuilder.newBuilder()
39+
.maximumSize(BlockManagerId.blockManagerIdCacheSize())
40+
.build(new CacheLoader<BlockManagerId, DefaultMapShuffleLocations>() {
41+
@Override
42+
public DefaultMapShuffleLocations load(BlockManagerId blockManagerId) {
43+
return new DefaultMapShuffleLocations(blockManagerId);
44+
}
45+
});
46+
47+
private final BlockManagerId location;
48+
49+
public DefaultMapShuffleLocations(BlockManagerId blockManagerId) {
50+
this.location = blockManagerId;
51+
}
52+
53+
public static DefaultMapShuffleLocations get(BlockManagerId blockManagerId) {
54+
return DEFAULT_SHUFFLE_LOCATIONS_CACHE.getUnchecked(blockManagerId);
55+
}
56+
57+
@Override
58+
public ShuffleLocation getLocationForBlock(int reduceId) {
59+
return this;
60+
}
61+
62+
public BlockManagerId getBlockManagerId() {
63+
return location;
64+
}
65+
66+
@Override
67+
public boolean equals(Object other) {
68+
return other instanceof DefaultMapShuffleLocations
69+
&& Objects.equals(((DefaultMapShuffleLocations) other).location, location);
70+
}
71+
72+
@Override
73+
public int hashCode() {
74+
return Objects.hashCode(location);
75+
}
76+
}

core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import java.nio.channels.WritableByteChannel;
2424
import java.util.Iterator;
2525

26+
import org.apache.spark.api.java.Optional;
27+
import org.apache.spark.api.shuffle.MapShuffleLocations;
2628
import scala.Option;
2729
import scala.Product2;
2830
import scala.collection.JavaConverters;
@@ -221,6 +223,7 @@ void closeAndWriteOutput() throws IOException {
221223
final ShuffleMapOutputWriter mapWriter = shuffleWriteSupport
222224
.createMapOutputWriter(shuffleId, mapId, partitioner.numPartitions());
223225
final long[] partitionLengths;
226+
Optional<MapShuffleLocations> mapLocations;
224227
try {
225228
try {
226229
partitionLengths = mergeSpills(spills, mapWriter);
@@ -231,7 +234,7 @@ void closeAndWriteOutput() throws IOException {
231234
}
232235
}
233236
}
234-
mapWriter.commitAllPartitions();
237+
mapLocations = mapWriter.commitAllPartitions();
235238
} catch (Exception e) {
236239
try {
237240
mapWriter.abort(e);
@@ -240,7 +243,10 @@ void closeAndWriteOutput() throws IOException {
240243
}
241244
throw e;
242245
}
243-
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
246+
mapStatus = MapStatus$.MODULE$.apply(
247+
blockManager.shuffleServerId(),
248+
mapLocations.orNull(),
249+
partitionLengths);
244250
}
245251

246252
@VisibleForTesting

core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleExecutorComponents.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,6 @@ public ShuffleWriteSupport writes() {
4646
throw new IllegalStateException(
4747
"Executor components must be initialized before getting writers.");
4848
}
49-
return new DefaultShuffleWriteSupport(sparkConf, blockResolver);
49+
return new DefaultShuffleWriteSupport(sparkConf, blockResolver, blockManager.shuffleServerId());
5050
}
5151
}

core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleMapOutputWriter.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424
import java.io.OutputStream;
2525
import java.nio.channels.FileChannel;
2626

27+
import org.apache.spark.api.java.Optional;
28+
import org.apache.spark.api.shuffle.MapShuffleLocations;
29+
import org.apache.spark.shuffle.sort.DefaultMapShuffleLocations;
30+
import org.apache.spark.storage.BlockManagerId;
2731
import org.slf4j.Logger;
2832
import org.slf4j.LoggerFactory;
2933

@@ -49,6 +53,7 @@ public class DefaultShuffleMapOutputWriter implements ShuffleMapOutputWriter {
4953
private final int bufferSize;
5054
private int currPartitionId = 0;
5155
private long currChannelPosition;
56+
private final BlockManagerId shuffleServerId;
5257

5358
private final File outputFile;
5459
private File outputTempFile;
@@ -61,11 +66,13 @@ public DefaultShuffleMapOutputWriter(
6166
int shuffleId,
6267
int mapId,
6368
int numPartitions,
69+
BlockManagerId shuffleServerId,
6470
ShuffleWriteMetricsReporter metrics,
6571
IndexShuffleBlockResolver blockResolver,
6672
SparkConf sparkConf) {
6773
this.shuffleId = shuffleId;
6874
this.mapId = mapId;
75+
this.shuffleServerId = shuffleServerId;
6976
this.metrics = metrics;
7077
this.blockResolver = blockResolver;
7178
this.bufferSize =
@@ -90,10 +97,11 @@ public ShufflePartitionWriter getNextPartitionWriter() throws IOException {
9097
}
9198

9299
@Override
93-
public void commitAllPartitions() throws IOException {
100+
public Optional<MapShuffleLocations> commitAllPartitions() throws IOException {
94101
cleanUp();
95102
File resolvedTmp = outputTempFile != null && outputTempFile.isFile() ? outputTempFile : null;
96103
blockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, resolvedTmp);
104+
return Optional.of(DefaultMapShuffleLocations.get(shuffleServerId));
97105
}
98106

99107
@Override

core/src/main/java/org/apache/spark/shuffle/sort/io/DefaultShuffleWriteSupport.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,17 +22,21 @@
2222
import org.apache.spark.api.shuffle.ShuffleMapOutputWriter;
2323
import org.apache.spark.api.shuffle.ShuffleWriteSupport;
2424
import org.apache.spark.shuffle.IndexShuffleBlockResolver;
25+
import org.apache.spark.storage.BlockManagerId;
2526

2627
public class DefaultShuffleWriteSupport implements ShuffleWriteSupport {
2728

2829
private final SparkConf sparkConf;
2930
private final IndexShuffleBlockResolver blockResolver;
31+
private final BlockManagerId shuffleServerId;
3032

3133
public DefaultShuffleWriteSupport(
3234
SparkConf sparkConf,
33-
IndexShuffleBlockResolver blockResolver) {
35+
IndexShuffleBlockResolver blockResolver,
36+
BlockManagerId shuffleServerId) {
3437
this.sparkConf = sparkConf;
3538
this.blockResolver = blockResolver;
39+
this.shuffleServerId = shuffleServerId;
3640
}
3741

3842
@Override
@@ -41,7 +45,7 @@ public ShuffleMapOutputWriter createMapOutputWriter(
4145
int mapId,
4246
int numPartitions) {
4347
return new DefaultShuffleMapOutputWriter(
44-
shuffleId, mapId, numPartitions,
48+
shuffleId, mapId, numPartitions, shuffleServerId,
4549
TaskContext.get().taskMetrics().shuffleWriteMetrics(), blockResolver, sparkConf);
4650
}
4751
}

0 commit comments

Comments
 (0)