Skip to content

Commit 42c5d9f

Browse files
authored
[#2644] feat(spark): Involve shuffle failure into the event logs (#2645)
### What changes were proposed in this pull request? Involve shuffle failure into the event logs ### Why are the changes needed? for #2644 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Internal job tests
1 parent 11881ab commit 42c5d9f

File tree

11 files changed

+94
-9
lines changed

11 files changed

+94
-9
lines changed

client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleReadInfoEvent.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class TaskShuffleReadInfoEvent extends UniffleEvent {
2929
private boolean isShuffleReadFailed;
3030
private String failureReason;
3131
private ShuffleReadTimes shuffleReadTimes;
32+
private long taskAttemptNumber;
3233

3334
public TaskShuffleReadInfoEvent(
3435
int stageId,
@@ -37,14 +38,16 @@ public TaskShuffleReadInfoEvent(
3738
Map<String, ShuffleReadMetric> metrics,
3839
boolean isShuffleReadFailed,
3940
String failureReason,
40-
ShuffleReadTimes shuffleReadTimes) {
41+
ShuffleReadTimes shuffleReadTimes,
42+
long taskAttemptNumber) {
4143
this.stageId = stageId;
4244
this.shuffleId = shuffleId;
4345
this.taskId = taskId;
4446
this.metrics = metrics;
4547
this.isShuffleReadFailed = isShuffleReadFailed;
4648
this.failureReason = failureReason;
4749
this.shuffleReadTimes = shuffleReadTimes;
50+
this.taskAttemptNumber = taskAttemptNumber;
4851
}
4952

5053
public int getStageId() {
@@ -74,4 +77,8 @@ public String getFailureReason() {
7477
public ShuffleReadTimes getShuffleReadTimes() {
7578
return shuffleReadTimes;
7679
}
80+
81+
public long getTaskAttemptNumber() {
82+
return taskAttemptNumber;
83+
}
7784
}

client-spark/common/src/main/java/org/apache/spark/shuffle/events/TaskShuffleWriteInfoEvent.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ public class TaskShuffleWriteInfoEvent extends UniffleEvent {
2828
private boolean isShuffleWriteFailed;
2929
private String failureReason;
3030
private long uncompressedByteSize;
31+
private long taskAttemptNumber;
3132

3233
public TaskShuffleWriteInfoEvent(
3334
int stageId,
@@ -37,7 +38,8 @@ public TaskShuffleWriteInfoEvent(
3738
ShuffleWriteTimes writeTimes,
3839
boolean isShuffleWriteFailed,
3940
String failureReason,
40-
long uncompressedByteSize) {
41+
long uncompressedByteSize,
42+
long taskAttemptNumber) {
4143
this.stageId = stageId;
4244
this.shuffleId = shuffleId;
4345
this.taskId = taskId;
@@ -46,6 +48,7 @@ public TaskShuffleWriteInfoEvent(
4648
this.isShuffleWriteFailed = isShuffleWriteFailed;
4749
this.failureReason = failureReason;
4850
this.uncompressedByteSize = uncompressedByteSize;
51+
this.taskAttemptNumber = taskAttemptNumber;
4952
}
5053

5154
public int getStageId() {
@@ -79,4 +82,8 @@ public String getFailureReason() {
7982
public long getUncompressedByteSize() {
8083
return uncompressedByteSize;
8184
}
85+
86+
public long getTaskAttemptNumber() {
87+
return taskAttemptNumber;
88+
}
8289
}

client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/ShuffleManagerGrpcService.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -735,7 +735,8 @@ public void reportShuffleWriteMetric(
735735
ShuffleWriteTimes.fromProto(request.getShuffleWriteTimes()),
736736
request.getIsTaskWriteFailed(),
737737
request.getShuffleWriteFailureReason(),
738-
request.getUncompressedByteSize());
738+
request.getUncompressedByteSize(),
739+
request.getTaskAttemptNumber());
739740
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
740741
RssProtos.ReportShuffleWriteMetricResponse reply =
741742
RssProtos.ReportShuffleWriteMetricResponse.newBuilder()
@@ -770,7 +771,8 @@ public void reportShuffleReadMetric(
770771
x.getValue().getHadoopByteSize()))),
771772
request.getIsTaskReadFailed(),
772773
request.getShuffleReadFailureReason(),
773-
ShuffleReadTimes.fromProto(request.getShuffleReadTimes()));
774+
ShuffleReadTimes.fromProto(request.getShuffleReadTimes()),
775+
request.getTaskAttemptNumber());
774776
RssSparkShuffleUtils.getActiveSparkContext().listenerBus().post(event);
775777
RssProtos.ReportShuffleReadMetricResponse reply =
776778
RssProtos.ReportShuffleReadMetricResponse.newBuilder()

client-spark/extension/src/main/scala/org/apache/spark/UniffleListener.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ import scala.collection.JavaConverters.mapAsScalaMapConverter
3131
class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
3232
extends SparkListener with Logging {
3333

34+
private val writeTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.WRITE)
35+
private val readTaskInfo = ShuffleTaskSummary(shuffleType = ShuffleType.READ)
36+
3437
private val aggregatedShuffleReadTimes = new ShuffleReadTimes()
3538
private val aggregatedShuffleWriteTimes = new ShuffleWriteTimes()
3639
private val aggregatedShuffleWriteMetric = new ConcurrentHashMap[String, AggregatedShuffleWriteMetric]
@@ -70,6 +73,8 @@ class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
7073
kvstore.write(
7174
AggregatedShuffleReadTimesUIData(aggregatedShuffleReadTimes)
7275
)
76+
kvstore.write(writeTaskInfo)
77+
kvstore.write(readTaskInfo)
7378
}
7479
}
7580

@@ -122,6 +127,14 @@ class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
122127
}
123128
aggregatedShuffleWriteTimes.inc(event.getWriteTimes)
124129
totalUncompressedShuffleBytes.addAndGet(event.getUncompressedByteSize)
130+
131+
if (event.isShuffleWriteFailed) {
132+
writeTaskInfo.failedTaskNumber += 1
133+
if (event.getTaskAttemptNumber > writeTaskInfo.failedTaskMaxAttemptNumber) {
134+
writeTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
135+
}
136+
writeTaskInfo.failureReasons.add(event.getFailureReason)
137+
}
125138
}
126139

127140
private def onTaskShuffleReadInfo(event: TaskShuffleReadInfoEvent): Unit = {
@@ -143,6 +156,13 @@ class UniffleListener(conf: SparkConf, kvstore: ElementTrackingStore)
143156
agg_metric.hadoopDurationMillis += rmetric.getHadoopDurationMillis
144157
}
145158
aggregatedShuffleReadTimes.merge(event.getShuffleReadTimes)
159+
160+
if (event.isShuffleReadFailed) {
161+
readTaskInfo.failedTaskNumber += 1
162+
if (event.getTaskAttemptNumber > readTaskInfo.failedTaskMaxAttemptNumber) {}
163+
readTaskInfo.failedTaskMaxAttemptNumber = event.getTaskAttemptNumber
164+
}
165+
readTaskInfo.failureReasons.add(event.getFailureReason)
146166
}
147167

148168
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {

client-spark/extension/src/main/scala/org/apache/spark/UniffleStatusStore.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,22 @@ import org.apache.uniffle.common.ShuffleReadTimes
2626

2727
import java.util.concurrent.ConcurrentHashMap
2828
import scala.collection.JavaConverters.asScalaIteratorConverter
29+
import scala.collection.mutable
2930

3031
class UniffleStatusStore(store: KVStore) {
3132
private def viewToSeq[T](view: KVStoreView[T]): Seq[T] = {
3233
Utils.tryWithResource(view.closeableIterator())(iter => iter.asScala.toList)
3334
}
3435

36+
def shuffleTaskSummary(shuffleType: ShuffleType): ShuffleTaskSummary = {
37+
val kClass = classOf[ShuffleTaskSummary]
38+
try {
39+
store.read(kClass, s"${kClass.getName}_$shuffleType")
40+
} catch {
41+
case _: NoSuchElementException => new ShuffleTaskSummary(shuffleType = shuffleType)
42+
}
43+
}
44+
3545
def uniffleProperties(): UniffleProperties = {
3646
val kClass = classOf[UniffleProperties]
3747
try {
@@ -184,4 +194,19 @@ case class ReassignInfoUIData(event: TaskReassignInfoEvent) {
184194
@JsonIgnore
185195
@KVIndex
186196
def id: String = classOf[ReassignInfoUIData].getName()
197+
}
198+
199+
sealed abstract class ShuffleType private ()
200+
object ShuffleType {
201+
val READ: ShuffleType = new ShuffleType {}
202+
val WRITE: ShuffleType = new ShuffleType {}
203+
}
204+
205+
case class ShuffleTaskSummary(shuffleType: ShuffleType,
206+
var failureReasons: mutable.HashSet[String] = new mutable.HashSet[String](),
207+
var failedTaskNumber: Long = -1,
208+
var failedTaskMaxAttemptNumber: Long = -1) {
209+
@JsonIgnore
210+
@KVIndex
211+
def id: String = s"${classOf[ShuffleTaskSummary].getName}_${shuffleType.toString}"
187212
}

client-spark/extension/src/main/scala/org/apache/spark/ui/ShufflePage.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.ui
2020
import org.apache.spark.internal.Logging
2121
import org.apache.spark.shuffle.events.ShuffleWriteTimes
2222
import org.apache.spark.util.Utils
23-
import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData}
23+
import org.apache.spark.{AggregatedShuffleMetric, AggregatedShuffleReadMetric, AggregatedShuffleWriteMetric, AggregatedTaskInfoUIData, ShuffleType}
2424

2525
import java.util.concurrent.ConcurrentHashMap
2626
import javax.servlet.http.HttpServletRequest
@@ -139,6 +139,10 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
139139
// reassign info
140140
val reassignInfo = runtimeStatusStore.reassignInfo().event
141141

142+
// task failure summary
143+
val writeSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType = ShuffleType.WRITE)
144+
val readSummary = runtimeStatusStore.shuffleTaskSummary(shuffleType = ShuffleType.READ)
145+
142146
// render build info
143147
val buildInfo = runtimeStatusStore.buildInfo()
144148
val buildInfoTableUI = UIUtils.listingTable(
@@ -344,6 +348,14 @@ class ShufflePage(parent: ShuffleTab) extends WebUIPage("") with Logging {
344348
</a>
345349
partitionSplit={reassignInfo.isReassignTriggeredOnPartitionSplit}, blockSentFailure={reassignInfo.isReassignTriggeredOnBlockSendFailure}, stageRetry={reassignInfo.isReassignTriggeredOnStageRetry}
346350
</li>
351+
352+
<li>
353+
<a>
354+
<strong>Shuffle Task Failure Summary (failure write/read):</strong>
355+
</a>
356+
{writeSummary.failedTaskNumber > 0} / {readSummary.failedTaskNumber > 0}
357+
</li>
358+
347359
</ul>
348360
</div>
349361

client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,8 @@ private void postShuffleReadMetricsToDriver() {
414414
x.getValue().getHadoopReadLocalFileBytes()))),
415415
isShuffleReadFailed,
416416
shuffleReadReason,
417-
shuffleReadTimes));
417+
shuffleReadTimes,
418+
context.attemptNumber()));
418419
if (response != null && response.getStatusCode() != StatusCode.SUCCESS) {
419420
LOG.error("Errors on reporting shuffle read metrics to driver");
420421
}

client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,8 @@ public Option<MapStatus> stop(boolean success) {
980980
writeTimes,
981981
isShuffleWriteFailed,
982982
shuffleWriteFailureReason,
983-
bufferManager.getUncompressedDataLen()));
983+
bufferManager.getUncompressedDataLen(),
984+
taskContext.attemptNumber()));
984985
if (response.getStatusCode() != StatusCode.SUCCESS) {
985986
LOG.error(
986987
"Errors on reporting shuffle write metrics to driver. status_code: {}",

internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleReadMetricRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ public class RssReportShuffleReadMetricRequest {
3232
private boolean isShuffleReadFailed;
3333
private Optional<String> shuffleReadReason;
3434
private ShuffleReadTimes shuffleReadTimes;
35+
private long taskAttemptNumber;
3536

3637
public RssReportShuffleReadMetricRequest(
3738
int stageId,
@@ -40,14 +41,16 @@ public RssReportShuffleReadMetricRequest(
4041
Map<String, TaskShuffleReadMetric> metrics,
4142
boolean isShuffleReadFailed,
4243
Optional<String> shuffleReadReason,
43-
ShuffleReadTimes shuffleReadTimes) {
44+
ShuffleReadTimes shuffleReadTimes,
45+
long taskAttemptNumber) {
4446
this.stageId = stageId;
4547
this.shuffleId = shuffleId;
4648
this.taskId = taskId;
4749
this.metrics = metrics;
4850
this.isShuffleReadFailed = isShuffleReadFailed;
4951
this.shuffleReadReason = shuffleReadReason;
5052
this.shuffleReadTimes = shuffleReadTimes;
53+
this.taskAttemptNumber = taskAttemptNumber;
5154
}
5255

5356
public RssProtos.ReportShuffleReadMetricRequest toProto() {
@@ -61,6 +64,7 @@ public RssProtos.ReportShuffleReadMetricRequest toProto() {
6164
.setIsTaskReadFailed(request.isShuffleReadFailed)
6265
.setShuffleReadFailureReason(request.shuffleReadReason.orElse(""))
6366
.setShuffleReadTimes(shuffleReadTimes.toProto())
67+
.setTaskAttemptNumber(taskAttemptNumber)
6468
.putAllMetrics(
6569
request.metrics.entrySet().stream()
6670
.collect(

internal-client/src/main/java/org/apache/uniffle/client/request/RssReportShuffleWriteMetricRequest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ public class RssReportShuffleWriteMetricRequest {
2727
private int stageId;
2828
private int shuffleId;
2929
private long taskId;
30+
private long taskAttemptNumber;
3031
private Map<String, TaskShuffleWriteMetric> metrics;
3132
private TaskShuffleWriteTimes writeTimes;
3233

@@ -43,7 +44,8 @@ public RssReportShuffleWriteMetricRequest(
4344
TaskShuffleWriteTimes writeTimes,
4445
boolean isShuffleWriteFailed,
4546
Optional<String> shuffleWriteFailureReason,
46-
long uncompressedByteSize) {
47+
long uncompressedByteSize,
48+
long taskAttemptNumber) {
4749
this.stageId = stageId;
4850
this.shuffleId = shuffleId;
4951
this.taskId = taskId;
@@ -52,6 +54,7 @@ public RssReportShuffleWriteMetricRequest(
5254
this.isShuffleWriteFailed = isShuffleWriteFailed;
5355
this.shuffleWriteFailureReason = shuffleWriteFailureReason;
5456
this.uncompressedByteSize = uncompressedByteSize;
57+
this.taskAttemptNumber = taskAttemptNumber;
5558
}
5659

5760
public RssProtos.ReportShuffleWriteMetricRequest toProto() {
@@ -66,6 +69,7 @@ public RssProtos.ReportShuffleWriteMetricRequest toProto() {
6669
.setIsTaskWriteFailed(isShuffleWriteFailed)
6770
.setShuffleWriteFailureReason(shuffleWriteFailureReason.orElse(""))
6871
.setUncompressedByteSize(request.uncompressedByteSize)
72+
.setTaskAttemptNumber(taskAttemptNumber)
6973
.putAllMetrics(
7074
request.metrics.entrySet().stream()
7175
.collect(

0 commit comments

Comments
 (0)