Skip to content

Commit 6d36242

Browse files
committed
Make POS count failed task info and perform dedup on them
1 parent c9df88d commit 6d36242

File tree

2 files changed

+63
-5
lines changed

2 files changed

+63
-5
lines changed

presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkQueryExecutionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,7 @@ else if (preparedQuery.isExplainTypeValidate()) {
679679
planAndMore = queryPlanner.createQueryPlan(session, preparedQuery, warningCollector, variableAllocator, planNodeIdAllocator, sparkContext, sql);
680680
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkContext);
681681
CollectionAccumulator<SerializedTaskInfo> taskInfoCollector = new CollectionAccumulator<>();
682-
taskInfoCollector.register(sparkContext, Option.empty(), false);
682+
taskInfoCollector.register(sparkContext, Option.empty(), true);
683683
CollectionAccumulator<PrestoSparkShuffleStats> shuffleStatsCollector = new CollectionAccumulator<>();
684684
shuffleStatsCollector.register(sparkContext, Option.empty(), false);
685685
TempStorage tempStorage = tempStorageManager.getTempStorage(storageBasedBroadcastJoinStorage);

presto-spark-base/src/main/java/com/facebook/presto/spark/execution/AbstractPrestoSparkQueryExecution.java

Lines changed: 62 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@
2929
import com.facebook.presto.execution.QueryState;
3030
import com.facebook.presto.execution.QueryStateTimer;
3131
import com.facebook.presto.execution.StageInfo;
32+
import com.facebook.presto.execution.TaskId;
3233
import com.facebook.presto.execution.TaskInfo;
34+
import com.facebook.presto.execution.TaskState;
3335
import com.facebook.presto.execution.scheduler.ExecutionWriterTarget;
3436
import com.facebook.presto.execution.scheduler.StreamingPlanSection;
3537
import com.facebook.presto.execution.scheduler.StreamingSubPlan;
@@ -116,6 +118,7 @@
116118
import java.util.TreeMap;
117119
import java.util.concurrent.TimeoutException;
118120
import java.util.concurrent.atomic.AtomicReference;
121+
import java.util.stream.Collectors;
119122
import java.util.stream.IntStream;
120123

121124
import static com.facebook.airlift.units.DataSize.Unit.BYTE;
@@ -572,23 +575,78 @@ protected void validateStorageCapabilities(TempStorage tempStorage)
572575
}
573576
}
574577

578+
/**
579+
* Updates the taskInfoMap to ensure it stores the most relevant {@link TaskInfo} for each
580+
* logical task, identified by task ID (excluding attempt number).
581+
* <p>
582+
* This method ensures that, for each logical task, the map retains the latest successful
583+
* attempt if available, or otherwise the most recent attempt based on attempt number. Warnings
584+
* are logged in cases of unexpected duplicate or multiple successful attempts.
585+
*
586+
* @param taskInfoMap the map from logical task ID (taskId excluding attempt number) to
587+
* {@link TaskInfo}
588+
* @param taskInfo the {@link TaskInfo} to consider for updating the map
589+
*/
590+
private void updateTaskInfoMap(HashMap<String, TaskInfo> taskInfoMap, TaskInfo taskInfo)
591+
{
592+
TaskId newTaskId = taskInfo.getTaskId();
593+
String taskIdWithoutAttemptId = new StringBuilder()
594+
.append(newTaskId.getStageExecutionId().toString())
595+
.append(".")
596+
.append(newTaskId.getId())
597+
.toString();
598+
if (!taskInfoMap.containsKey(taskIdWithoutAttemptId)) {
599+
taskInfoMap.put(taskIdWithoutAttemptId, taskInfo);
600+
return;
601+
}
602+
603+
TaskInfo storedTaskInfo = taskInfoMap.get(taskIdWithoutAttemptId);
604+
TaskId storedTaskId = storedTaskInfo.getTaskId();
605+
TaskState storedTaskState = storedTaskInfo.getTaskStatus().getState();
606+
TaskState newTaskState = taskInfo.getTaskStatus().getState();
607+
if (storedTaskState == TaskState.FINISHED) {
608+
if (newTaskState == TaskState.FINISHED) {
609+
log.warn("Multiple attempts of the same task have succeeded %s vs %s",
610+
storedTaskId.toString(), newTaskId.toString());
611+
}
612+
// Successful one has been stored. Nothing needs to be done.
613+
return;
614+
}
615+
616+
int storedAttemptNumber = storedTaskId.getAttemptNumber();
617+
int newAttemptNumber = newTaskId.getAttemptNumber();
618+
if (newTaskState == TaskState.FINISHED || storedAttemptNumber < newAttemptNumber) {
619+
taskInfoMap.put(taskIdWithoutAttemptId, taskInfo);
620+
}
621+
if (storedAttemptNumber == newAttemptNumber) {
622+
log.warn("Received multiple identical TaskId %s vs %s",
623+
storedTaskId.toString(), newTaskId.toString());
624+
}
625+
}
626+
575627
protected void queryCompletedEvent(Optional<ExecutionFailureInfo> failureInfo, OptionalLong updateCount)
576628
{
577629
List<SerializedTaskInfo> serializedTaskInfos = taskInfoCollector.value();
578-
ImmutableList.Builder<TaskInfo> taskInfos = ImmutableList.builder();
630+
HashMap<String, TaskInfo> taskInfoMap = new HashMap<>();
579631
long totalSerializedTaskInfoSizeInBytes = 0;
580632
for (SerializedTaskInfo serializedTaskInfo : serializedTaskInfos) {
581633
byte[] bytes = serializedTaskInfo.getBytesAndClear();
582634
totalSerializedTaskInfoSizeInBytes += bytes.length;
583635
TaskInfo taskInfo = deserializeZstdCompressed(taskInfoCodec, bytes);
584-
taskInfos.add(taskInfo);
636+
updateTaskInfoMap(taskInfoMap, taskInfo);
585637
}
586638
taskInfoCollector.reset();
587639

588-
log.info("Total serialized task info size: %s", DataSize.succinctBytes(totalSerializedTaskInfoSizeInBytes));
640+
log.info("Total serialized task info count %s size: %s. Total deduped task info count %s",
641+
serializedTaskInfos.size(),
642+
DataSize.succinctBytes(totalSerializedTaskInfoSizeInBytes),
643+
taskInfoMap.size());
589644

590645
Optional<StageInfo> stageInfoOptional = getFinalFragmentedPlan().map(finalFragmentedPlan ->
591-
PrestoSparkQueryExecutionFactory.createStageInfo(session.getQueryId(), finalFragmentedPlan, taskInfos.build()));
646+
PrestoSparkQueryExecutionFactory.createStageInfo(
647+
session.getQueryId(),
648+
finalFragmentedPlan,
649+
taskInfoMap.values().stream().collect(Collectors.toList())));
592650
QueryState queryState = failureInfo.isPresent() ? FAILED : FINISHED;
593651

594652
QueryInfo queryInfo = PrestoSparkQueryExecutionFactory.createQueryInfo(

0 commit comments

Comments
 (0)