Skip to content

Commit 6fee8e5

Browse files
zustonJunfan Zhang
andcommitted
[apache#291] feat(client): Introduce PrefetchableClientReadHandler to support async read (apache#2365)
1. Introduce PrefetchableClientReadHandler to support async read. And this will be disabled by default. 2. Apply for the memory/localfile/hdfs read handler Recently I found some important spark jobs are slow due to the lots of shuffle read operations. If we could support async read, the job's performance will be improved. So this PR is the callback for apache#291. almost 3 years ago! Yes. Some configs are introduced 1. `rss.client.read.prefetch.enabled` 2. `rss.client.read.prefetch.capacity` 3. `rss.client.read.prefetch.timeoutSec` 1. Unit tests --------- Co-authored-by: Junfan Zhang <zhangjunfan@qiyi.com>
1 parent 427b5cb commit 6fee8e5

File tree

11 files changed

+355
-23
lines changed

11 files changed

+355
-23
lines changed

common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -228,8 +228,26 @@ public class RssClientConf {
228228
.withDescription("the extra java properties could be configured by this option");
229229

230230
public static final ConfigOption<Integer> RSS_CLIENT_GRPC_EVENT_LOOP_THREADS =
231-
ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
232-
.intType()
233-
.defaultValue(-1)
234-
.withDescription("the event loop threads of netty impl for grpc");
231+
ConfigOptions.key("rss.client.grpc.nettyEventLoopThreads")
232+
.intType()
233+
.defaultValue(-1)
234+
.withDescription("the event loop threads of netty impl for grpc");
235+
236+
public static final ConfigOption<Boolean> RSS_CLIENT_PREFETCH_ENABLED =
237+
ConfigOptions.key("rss.client.read.prefetch.enabled")
238+
.booleanType()
239+
.defaultValue(false)
240+
.withDescription("Read prefetch switch that will be disabled by default");
241+
242+
public static final ConfigOption<Integer> RSS_CLIENT_PREFETCH_CAPACITY =
243+
ConfigOptions.key("rss.client.read.prefetch.capacity")
244+
.intType()
245+
.defaultValue(4)
246+
.withDescription("Read prefetch capacity");
247+
248+
public static final ConfigOption<Integer> READ_CLIENT_PREFETCH_TIMEOUT_SEC =
249+
ConfigOptions.key("rss.client.read.prefetch.timeoutSec")
250+
.intType()
251+
.defaultValue(120)
252+
.withDescription("Read prefetch timeout seconds");
235253
}

docs/client_guide/client_guide.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ The important configuration of client is listed as following. These configuratio
5757
| <client_type>.rss.client.max.concurrency.of.per-partition.write | - | The maximum number of files that can be written concurrently to a single partition is determined. This value will only be respected by the remote shuffle server if it is greater than 0. |
5858
| <client_type>.rss.client.rpc.timeout.ms | 60000 | Timeout in milliseconds for RPC calls. |
5959
| <client_type>.rss.client.rpc.maxAttempts | 3 | When we fail to send RPC calls, we will retry for maxAttempts times. |
60+
| <client_type>.rss.client.rpc.netty.pageSize | 4096 | The value of pageSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
61+
| <client_type>.rss.client.rpc.netty.maxOrder | 3 | The value of maxOrder for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
62+
| <client_type>.rss.client.rpc.netty.smallCacheSize | 1024 | The value of smallCacheSize for PooledByteBufAllocator when using gRPC internal Netty on the client-side. This configuration will only take effect when rss.rpc.server.type is set to GRPC_NETTY. |
63+
| <client_type>.rss.client.blockIdManagerClass | - | The block id manager class of server for this application, the implementation of this interface to manage the shuffle block ids |
64+
| <client_type>.rss.client.reportExcludeProperties | - | The value of exclude properties specify a list of client configuration properties that should not be reported to the coordinator by the DelegationRssShuffleManager. |
65+
| <client_type>.rss.client.reportIncludeProperties | - | The value of include properties specify a list of client configuration properties that should be exclusively reported to the coordinator by the DelegationRssShuffleManager. |
66+
| <client_type>.rss.client.read.prefetch.enabled | false | Read prefetch switch that will be disabled by default |
67+
| <client_type>.rss.client.read.prefetch.capacity | 4 | Read prefetch capacity |
68+
| <client_type>.rss.client.read.prefetch.timeoutSec | 120 | Read prefetch timeout seconds |
6069
Notice:
6170

6271
1. `<client_type>` should be `mapreduce` `tez` or `spark`

storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ private ClientReadHandler getMemoryClientReadHandler(
136136
shuffleServerClient,
137137
expectTaskIds,
138138
request.getRetryMax(),
139-
request.getRetryIntervalMax());
139+
request.getRetryIntervalMax(),
140+
request.getPrefetchOption());
140141
return memoryClientReadHandler;
141142
}
142143

@@ -159,7 +160,8 @@ private ClientReadHandler getLocalfileClientReaderHandler(
159160
request.getDistributionType(),
160161
request.getExpectTaskIds(),
161162
request.getRetryMax(),
162-
request.getRetryIntervalMax());
163+
request.getRetryIntervalMax(),
164+
request.getPrefetchOption());
163165
}
164166

165167
private ClientReadHandler getHadoopClientReadHandler(
@@ -179,7 +181,8 @@ private ClientReadHandler getHadoopClientReadHandler(
179181
request.getDistributionType(),
180182
request.getExpectTaskIds(),
181183
ssi.getId(),
182-
request.isOffHeapEnabled());
184+
request.isOffHeapEnabled(),
185+
request.getPrefetchOption());
183186
}
184187

185188
public ShuffleDeleteHandler createShuffleDeleteHandler(

storage/src/main/java/org/apache/uniffle/storage/handler/impl/DataSkippableReadHandler.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.uniffle.storage.handler.impl;
1919

2020
import java.util.List;
21+
import java.util.Optional;
2122

2223
import com.google.common.collect.Lists;
2324
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -30,7 +31,7 @@
3031
import org.apache.uniffle.common.ShuffleIndexResult;
3132
import org.apache.uniffle.common.segment.SegmentSplitterFactory;
3233

33-
public abstract class DataSkippableReadHandler extends AbstractClientReadHandler {
34+
public abstract class DataSkippableReadHandler extends PrefetchableClientReadHandler {
3435
private static final Logger LOG = LoggerFactory.getLogger(DataSkippableReadHandler.class);
3536

3637
protected List<ShuffleDataSegment> shuffleDataSegments = Lists.newArrayList();
@@ -50,7 +51,9 @@ public DataSkippableReadHandler(
5051
Roaring64NavigableMap expectBlockIds,
5152
Roaring64NavigableMap processBlockIds,
5253
ShuffleDataDistributionType distributionType,
53-
Roaring64NavigableMap expectTaskIds) {
54+
Roaring64NavigableMap expectTaskIds,
55+
Optional<PrefetchOption> prefetchOption) {
56+
super(prefetchOption);
5457
this.appId = appId;
5558
this.shuffleId = shuffleId;
5659
this.partitionId = partitionId;
@@ -66,7 +69,7 @@ public DataSkippableReadHandler(
6669
protected abstract ShuffleDataResult readShuffleData(ShuffleDataSegment segment);
6770

6871
@Override
69-
public ShuffleDataResult readShuffleData() {
72+
public ShuffleDataResult doReadShuffleData() {
7073
if (shuffleDataSegments.isEmpty()) {
7174
ShuffleIndexResult shuffleIndexResult = readShuffleIndex();
7275
if (shuffleIndexResult == null || shuffleIndexResult.isEmpty()) {

storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopClientReadHandler.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.FileNotFoundException;
2121
import java.util.Collections;
2222
import java.util.List;
23+
import java.util.Optional;
2324
import java.util.stream.Collectors;
2425

2526
import com.google.common.collect.Lists;
@@ -55,6 +56,7 @@ public class HadoopClientReadHandler extends AbstractClientReadHandler {
5556
private ShuffleDataDistributionType distributionType;
5657
private Roaring64NavigableMap expectTaskIds;
5758
private boolean offHeapEnable = false;
59+
private Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption;
5860

5961
public HadoopClientReadHandler(
6062
String appId,
@@ -71,7 +73,8 @@ public HadoopClientReadHandler(
7173
ShuffleDataDistributionType distributionType,
7274
Roaring64NavigableMap expectTaskIds,
7375
String shuffleServerId,
74-
boolean offHeapEnable) {
76+
boolean offHeapEnable,
77+
Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
7578
this.appId = appId;
7679
this.shuffleId = shuffleId;
7780
this.partitionId = partitionId;
@@ -87,6 +90,7 @@ public HadoopClientReadHandler(
8790
this.expectTaskIds = expectTaskIds;
8891
this.shuffleServerId = shuffleServerId;
8992
this.offHeapEnable = offHeapEnable;
93+
this.prefetchOption = prefetchOption;
9094
}
9195

9296
// Only for test
@@ -117,7 +121,8 @@ public HadoopClientReadHandler(
117121
ShuffleDataDistributionType.NORMAL,
118122
Roaring64NavigableMap.bitmapOf(),
119123
null,
120-
false);
124+
false,
125+
Optional.empty());
121126
}
122127

123128
protected void init(String fullShufflePath) {
@@ -174,7 +179,8 @@ protected void init(String fullShufflePath) {
174179
hadoopConf,
175180
distributionType,
176181
expectTaskIds,
177-
offHeapEnable);
182+
offHeapEnable,
183+
prefetchOption);
178184
readHandlers.add(handler);
179185
} catch (Exception e) {
180186
LOG.warn("Can't create ShuffleReaderHandler for " + filePrefix, e);

storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleReadHandler.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
2222
import java.util.List;
23+
import java.util.Optional;
2324

2425
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.fs.Path;
@@ -57,7 +58,8 @@ public HadoopShuffleReadHandler(
5758
Configuration conf,
5859
ShuffleDataDistributionType distributionType,
5960
Roaring64NavigableMap expectTaskIds,
60-
boolean offHeapEnabled)
61+
boolean offHeapEnabled,
62+
Optional<PrefetchOption> prefetchOption)
6163
throws Exception {
6264
super(
6365
appId,
@@ -67,7 +69,8 @@ public HadoopShuffleReadHandler(
6769
expectBlockIds,
6870
processBlockIds,
6971
distributionType,
70-
expectTaskIds);
72+
expectTaskIds,
73+
prefetchOption);
7174
this.filePrefix = filePrefix;
7275
this.indexReader =
7376
createHadoopReader(ShuffleStorageUtils.generateIndexFileName(filePrefix), conf);
@@ -98,7 +101,8 @@ public HadoopShuffleReadHandler(
98101
conf,
99102
ShuffleDataDistributionType.NORMAL,
100103
Roaring64NavigableMap.bitmapOf(),
101-
false);
104+
false,
105+
Optional.empty());
102106
}
103107

104108
@Override

storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileClientReadHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.uniffle.storage.handler.impl;
1919

20+
import java.util.Optional;
21+
2022
import com.google.common.annotations.VisibleForTesting;
2123
import org.roaringbitmap.longlong.Roaring64NavigableMap;
2224
import org.slf4j.Logger;
@@ -55,7 +57,8 @@ public LocalFileClientReadHandler(
5557
ShuffleDataDistributionType distributionType,
5658
Roaring64NavigableMap expectTaskIds,
5759
int retryMax,
58-
long retryIntervalMax) {
60+
long retryIntervalMax,
61+
Optional<PrefetchOption> prefetchOption) {
5962
super(
6063
appId,
6164
shuffleId,
@@ -64,7 +67,8 @@ public LocalFileClientReadHandler(
6467
expectBlockIds,
6568
processBlockIds,
6669
distributionType,
67-
expectTaskIds);
70+
expectTaskIds,
71+
prefetchOption);
6872
this.shuffleServerClient = shuffleServerClient;
6973
this.partitionNumPerRange = partitionNumPerRange;
7074
this.partitionNum = partitionNum;
@@ -98,7 +102,8 @@ public LocalFileClientReadHandler(
98102
ShuffleDataDistributionType.NORMAL,
99103
Roaring64NavigableMap.bitmapOf(),
100104
1,
101-
0);
105+
0,
106+
Optional.empty());
102107
}
103108

104109
@Override

storage/src/main/java/org/apache/uniffle/storage/handler/impl/MemoryClientReadHandler.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.uniffle.storage.handler.impl;
1919

2020
import java.util.List;
21+
import java.util.Optional;
2122

2223
import com.google.common.annotations.VisibleForTesting;
2324
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -32,7 +33,7 @@
3233
import org.apache.uniffle.common.exception.RssFetchFailedException;
3334
import org.apache.uniffle.common.util.Constants;
3435

35-
public class MemoryClientReadHandler extends AbstractClientReadHandler {
36+
public class MemoryClientReadHandler extends PrefetchableClientReadHandler {
3637

3738
private static final Logger LOG = LoggerFactory.getLogger(MemoryClientReadHandler.class);
3839
private long lastBlockId = Constants.INVALID_BLOCK_ID;
@@ -49,7 +50,9 @@ public MemoryClientReadHandler(
4950
ShuffleServerClient shuffleServerClient,
5051
Roaring64NavigableMap expectTaskIds,
5152
int retryMax,
52-
long retryIntervalMax) {
53+
long retryIntervalMax,
54+
Optional<PrefetchableClientReadHandler.PrefetchOption> prefetchOption) {
55+
super(prefetchOption);
5356
this.appId = appId;
5457
this.shuffleId = shuffleId;
5558
this.partitionId = partitionId;
@@ -68,11 +71,20 @@ public MemoryClientReadHandler(
6871
int readBufferSize,
6972
ShuffleServerClient shuffleServerClient,
7073
Roaring64NavigableMap expectTaskIds) {
71-
this(appId, shuffleId, partitionId, readBufferSize, shuffleServerClient, expectTaskIds, 1, 0);
74+
this(
75+
appId,
76+
shuffleId,
77+
partitionId,
78+
readBufferSize,
79+
shuffleServerClient,
80+
expectTaskIds,
81+
1,
82+
0,
83+
Optional.empty());
7284
}
7385

7486
@Override
75-
public ShuffleDataResult readShuffleData() {
87+
public ShuffleDataResult doReadShuffleData() {
7688
ShuffleDataResult result = null;
7789

7890
RssGetInMemoryShuffleDataRequest request =

0 commit comments

Comments
 (0)