Skip to content

Commit 11881ab

Browse files
authored
[#2640] feat(spark): Involve background prefetch time in spark UI (#2641)
### What changes were proposed in this pull request? This PR is to Involve background prefetch time in spark UI ### Why are the changes needed? for #2640 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal job test
1 parent 8952913 commit 11881ab

File tree

8 files changed

+110
-20
lines changed

8 files changed

+110
-20
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ public boolean hasNext() {
166166
// finish reading records, check data consistent
167167
shuffleReadClient.checkProcessedBlockIds();
168168
shuffleReadClient.logStatics();
169+
shuffleReadClient.getShuffleReadTimes();
169170
String decInfo =
170171
!codec.isPresent()
171172
? "."

client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
4848
<td>{kv(4)}</td>
4949
<td>{kv(5)}</td>
5050
<td>{kv(6)}</td>
51+
<td>{kv(7)}</td>
5152
</tr>
5253

5354
private def shuffleWriteTimesRow(kv: Seq[String]) = <tr>
@@ -160,7 +161,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
160161
val readTimes = runtimeStatusStore.shuffleReadTimes().times
161162
val readTotal = if (readTimes.getTotal <= 0) -1 else readTimes.getTotal
162163
val readTimesUI = UIUtils.listingTable(
163-
Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress", "Background Decompress"),
164+
Seq("Total", "Fetch", "Copy", "CRC", "Deserialize", "Decompress", "Background Decompress", "Background Fetch"),
164165
shuffleReadTimesRow,
165166
Seq(
166167
Seq(
@@ -171,6 +172,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
171172
UIUtils.formatDuration(readTimes.getDeserialize),
172173
UIUtils.formatDuration(readTimes.getDecompress),
173174
UIUtils.formatDuration(readTimes.getBackgroundDecompress),
175+
UIUtils.formatDuration(readTimes.getBackgroundFetch),
174176
),
175177
Seq(
176178
1,
@@ -180,6 +182,7 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
180182
readTimes.getDeserialize.toDouble / readTotal,
181183
readTimes.getDecompress.toDouble / readTotal,
182184
readTimes.getBackgroundDecompress.toDouble / readTotal,
185+
readTimes.getBackgroundFetch.toDouble / readTotal,
183186
).map(x => roundToTwoDecimals(x).toString)
184187
),
185188
fixedWidth = true

client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,9 @@
5353
import org.apache.uniffle.common.util.IdHelper;
5454
import org.apache.uniffle.common.util.RssUtils;
5555
import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
56+
import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
5657
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
58+
import org.apache.uniffle.storage.handler.impl.AbstractClientReadHandler;
5759
import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
5860
import org.apache.uniffle.storage.request.CreateShuffleReadHandlerRequest;
5961

@@ -400,7 +402,19 @@ public ShuffleReadTimes getShuffleReadTimes() {
400402
if (decompressionWorker != null) {
401403
backgroundDecompressionTime = decompressionWorker.decompressionMillis();
402404
}
405+
406+
long backgroundFetchTime = 0;
407+
if (clientReadHandler instanceof AbstractClientReadHandler) {
408+
ClientReadHandlerMetric metric =
409+
((AbstractClientReadHandler) clientReadHandler).getReadHandlerMetric();
410+
backgroundFetchTime += metric.getPrefetchTime();
411+
}
412+
403413
return new ShuffleReadTimes(
404-
readDataTime.get(), copyTime.get(), crcCheckTime.get(), backgroundDecompressionTime);
414+
readDataTime.get(),
415+
copyTime.get(),
416+
crcCheckTime.get(),
417+
backgroundDecompressionTime,
418+
backgroundFetchTime);
405419
}
406420
}

common/src/main/java/org/apache/uniffle/common/ShuffleReadTimes.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,19 +22,24 @@
2222
/** The unit is millis */
2323
public class ShuffleReadTimes {
2424
private long fetch;
25+
private long backgroundFetch;
26+
2527
private long crc;
2628
private long copy;
2729
private long deserialize;
30+
2831
private long decompress;
2932
private long backgroundDecompress;
3033

3134
public ShuffleReadTimes() {}
3235

33-
public ShuffleReadTimes(long fetch, long crc, long copy, long backgroundDecompress) {
36+
public ShuffleReadTimes(
37+
long fetch, long crc, long copy, long backgroundDecompress, long backgroundFetch) {
3438
this.fetch = fetch;
3539
this.crc = crc;
3640
this.copy = copy;
3741
this.backgroundDecompress = backgroundDecompress;
42+
this.backgroundFetch = backgroundFetch;
3843
}
3944

4045
public long getFetch() {
@@ -69,6 +74,10 @@ public long getBackgroundDecompress() {
6974
return backgroundDecompress;
7075
}
7176

77+
public long getBackgroundFetch() {
78+
return backgroundFetch;
79+
}
80+
7281
public void merge(ShuffleReadTimes other) {
7382
if (other == null) {
7483
return;
@@ -79,6 +88,7 @@ public void merge(ShuffleReadTimes other) {
7988
this.deserialize += other.deserialize;
8089
this.decompress += other.decompress;
8190
this.backgroundDecompress += other.backgroundDecompress;
91+
this.backgroundFetch += other.backgroundFetch;
8292
}
8393

8494
public long getTotal() {
@@ -93,6 +103,7 @@ public RssProtos.ShuffleReadTimes toProto() {
93103
.setDecompress(decompress)
94104
.setDeserialize(deserialize)
95105
.setBackgroundDecompress(backgroundDecompress)
106+
.setBackgroundFetch(backgroundFetch)
96107
.build();
97108
}
98109

@@ -104,6 +115,7 @@ public static ShuffleReadTimes fromProto(RssProtos.ShuffleReadTimes proto) {
104115
time.decompress = proto.getDecompress();
105116
time.deserialize = proto.getDeserialize();
106117
time.backgroundDecompress = proto.getBackgroundDecompress();
118+
time.backgroundFetch = proto.getBackgroundFetch();
107119
return time;
108120
}
109121
}

proto/src/main/proto/Rss.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -657,6 +657,7 @@ message ShuffleReadTimes {
657657
int64 deserialize = 4;
658658
int64 decompress = 5;
659659
int64 backgroundDecompress = 6;
660+
int64 backgroundFetch = 7;
660661
}
661662

662663
message ReportShuffleReadMetricResponse {

storage/src/main/java/org/apache/uniffle/storage/handler/ClientReadHandlerMetric.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,16 @@ public class ClientReadHandlerMetric {
2828
private long skippedReadLength = 0L;
2929
private long skippedReadUncompressLength = 0L;
3030

31+
private long prefetchTime = 0L;
32+
33+
public void setPrefetchTime(long prefetchTime) {
34+
this.prefetchTime = prefetchTime;
35+
}
36+
37+
public long getPrefetchTime() {
38+
return prefetchTime;
39+
}
40+
3141
public long getReadBlockNum() {
3242
return readBlockNum;
3343
}
@@ -103,4 +113,14 @@ public int hashCode() {
103113
skippedReadLength,
104114
skippedReadUncompressLength);
105115
}
116+
117+
public void merge(ClientReadHandlerMetric other) {
118+
this.readBlockNum += other.readBlockNum;
119+
this.readLength += other.readLength;
120+
this.readUncompressLength += other.readUncompressLength;
121+
this.skippedReadBlockNum += other.skippedReadBlockNum;
122+
this.skippedReadLength += other.skippedReadLength;
123+
this.skippedReadUncompressLength += other.skippedReadUncompressLength;
124+
this.prefetchTime += other.prefetchTime;
125+
}
106126
}

storage/src/main/java/org/apache/uniffle/storage/handler/impl/ComposedClientReadHandler.java

Lines changed: 40 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,16 +63,9 @@ Tier next() {
6363
private final ShuffleServerInfo serverInfo;
6464
private final Map<Tier, Supplier<ClientReadHandler>> supplierMap = new EnumMap<>(Tier.class);
6565
private final Map<Tier, ClientReadHandler> handlerMap = new EnumMap<>(Tier.class);
66-
private final Map<Tier, ClientReadHandlerMetric> metricsMap = new EnumMap<>(Tier.class);
6766
private Tier currentTier = Tier.VALUES[0]; // == Tier.HOT
6867
private final int numTiers;
6968

70-
{
71-
for (Tier tier : Tier.VALUES) {
72-
metricsMap.put(tier, new ClientReadHandlerMetric());
73-
}
74-
}
75-
7669
public ComposedClientReadHandler(ShuffleServerInfo serverInfo, ClientReadHandler... handlers) {
7770
Preconditions.checkArgument(
7871
handlers.length <= Tier.VALUES.length,
@@ -100,14 +93,30 @@ public ComposedClientReadHandler(
10093
}
10194
}
10295

103-
@Override
104-
public ShuffleDataResult readShuffleData() {
96+
private ClientReadHandlerMetric getMetric(Tier tier) {
97+
ClientReadHandler handler = getHandler(tier);
98+
if (handler != null && handler instanceof AbstractClientReadHandler) {
99+
return ((AbstractClientReadHandler) handler).getReadHandlerMetric();
100+
}
101+
return new ClientReadHandlerMetric();
102+
}
103+
104+
private ClientReadHandler getOrCreateHandler(Tier tier) {
105105
ClientReadHandler handler =
106-
handlerMap.computeIfAbsent(
107-
currentTier, key -> supplierMap.getOrDefault(key, () -> null).get());
106+
handlerMap.computeIfAbsent(tier, key -> supplierMap.getOrDefault(key, () -> null).get());
108107
if (handler == null) {
109108
throw new RssException("Unexpected null when getting " + currentTier.name() + " handler");
110109
}
110+
return handler;
111+
}
112+
113+
private ClientReadHandler getHandler(Tier tier) {
114+
return handlerMap.get(tier);
115+
}
116+
117+
@Override
118+
public ShuffleDataResult readShuffleData() {
119+
ClientReadHandler handler = getOrCreateHandler(currentTier);
111120
ShuffleDataResult shuffleDataResult;
112121
try {
113122
shuffleDataResult = handler.readShuffleData();
@@ -147,8 +156,11 @@ public void updateConsumedBlockInfo(BufferSegment bs, boolean isSkippedMetrics)
147156
if (bs == null) {
148157
return;
149158
}
150-
super.updateConsumedBlockInfo(bs, isSkippedMetrics);
151-
updateBlockMetric(metricsMap.get(currentTier), bs, isSkippedMetrics);
159+
ClientReadHandler handler = getHandler(currentTier);
160+
if (handler == null) {
161+
throw new RssException("Unexpected null when getting " + currentTier.name() + " handler");
162+
}
163+
handler.updateConsumedBlockInfo(bs, isSkippedMetrics);
152164
}
153165

154166
@Override
@@ -188,7 +200,7 @@ private String getMetricsInfo(
188200
Function<ClientReadHandlerMetric, Long> skipped) {
189201
StringBuilder sb =
190202
new StringBuilder("Client read ")
191-
.append(consumed.apply(readHandlerMetric))
203+
.append(consumed.apply(getReadHandlerMetric()))
192204
.append(" ")
193205
.append(name)
194206
.append(" from [")
@@ -198,16 +210,28 @@ private String getMetricsInfo(
198210
sb.append(" ")
199211
.append(tier.name().toLowerCase())
200212
.append(":")
201-
.append(consumed.apply(metricsMap.get(tier)));
213+
.append(consumed.apply(getMetric(tier)));
202214
}
203215
sb.append(" ], Skipped[");
204216
for (Tier tier : Tier.VALUES) {
205217
sb.append(" ")
206218
.append(tier.name().toLowerCase())
207219
.append(":")
208-
.append(skipped.apply(metricsMap.get(tier)));
220+
.append(skipped.apply(getMetric(tier)));
209221
}
210222
sb.append(" ]");
211223
return sb.toString();
212224
}
225+
226+
@Override
227+
public ClientReadHandlerMetric getReadHandlerMetric() {
228+
ClientReadHandlerMetric metric = new ClientReadHandlerMetric();
229+
for (Tier tier : Tier.VALUES) {
230+
ClientReadHandlerMetric tierMetric = getMetric(tier);
231+
if (tierMetric != null) {
232+
metric.merge(tierMetric);
233+
}
234+
}
235+
return metric;
236+
}
213237
}

storage/src/main/java/org/apache/uniffle/storage/handler/impl/PrefetchableClientReadHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131

3232
import org.apache.uniffle.common.ShuffleDataResult;
3333
import org.apache.uniffle.common.exception.RssException;
34+
import org.apache.uniffle.storage.handler.ClientReadHandlerMetric;
3435

3536
public abstract class PrefetchableClientReadHandler extends AbstractClientReadHandler {
3637
private static final Logger LOG = LoggerFactory.getLogger(PrefetchableClientReadHandler.class);
@@ -144,13 +145,27 @@ public void close() {
144145
}
145146
}
146147

148+
private long getBackgroundFetchTime() {
149+
long fetch = 0;
150+
if (fetchTime != null) {
151+
fetch = fetchTime.get();
152+
}
153+
return fetch;
154+
}
155+
147156
@Override
148157
public void logConsumedBlockInfo() {
149158
LOG.info(
150159
"Metrics for shuffleId[{}], partitionId[{}], background fetch cost {} ms",
151160
shuffleId,
152161
partitionId,
153-
fetchTime);
162+
getBackgroundFetchTime());
154163
super.logConsumedBlockInfo();
155164
}
165+
166+
@Override
167+
public ClientReadHandlerMetric getReadHandlerMetric() {
168+
readHandlerMetric.setPrefetchTime(getBackgroundFetchTime());
169+
return readHandlerMetric;
170+
}
156171
}

0 commit comments

Comments
 (0)