Skip to content

Commit 2498bba

Browse files
committed
code optimization
1 parent 124d5c3 commit 2498bba

File tree

4 files changed

+2
-25
lines changed

4 files changed

+2
-25
lines changed

linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/protocol/task/ResponseTaskExecute.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -58,15 +58,6 @@ case class ResponseTaskStatus(execId: String, status: ExecutionNodeStatus)
5858
extends RetryableProtocol
5959
with RequestProtocol
6060

61-
class ResponseTaskStatusWithExecuteCodeIndex(
62-
execId: String,
63-
status: ExecutionNodeStatus,
64-
private var _errorIndex: Int = -1
65-
) extends ResponseTaskStatus(execId, status) {
66-
def errorIndex: Int = _errorIndex
67-
def errorIndex_=(value: Int): Unit = _errorIndex = value
68-
}
69-
7061
case class ResponseTaskResultSet(execId: String, output: String, alias: String)
7162
extends RetryableProtocol
7263
with RequestProtocol {

linkis-orchestrator/linkis-computation-orchestrator/src/main/scala/org/apache/linkis/orchestrator/computation/service/ComputationTaskExecutionReceiver.scala

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -93,21 +93,10 @@ class ComputationTaskExecutionReceiver extends TaskExecutionReceiver with Loggin
9393
.getByEngineConnAndTaskId(serviceInstance, taskStatus.execId)
9494
.foreach { codeExecutor =>
9595
OrchestratorLoggerUtils.setJobIdMDC(codeExecutor.getExecTask)
96-
97-
taskStatus match {
98-
case rts: ResponseTaskStatusWithExecuteCodeIndex =>
99-
logger.info(s"execute error with index: ${rts.errorIndex}")
100-
codeExecutor.getExecTask.updateParams(
101-
"execute.error.code.index",
102-
rts.errorIndex.toString
103-
)
104-
case _ =>
105-
}
10696
val event = TaskStatusEvent(codeExecutor.getExecTask, taskStatus.status)
10797
logger.info(
10898
s"From engineConn receive status info:$taskStatus, now post to listenerBus event: $event"
10999
)
110-
111100
codeExecutor.getExecTask.getPhysicalContext.broadcastSyncEvent(event)
112101
codeExecutor.getEngineConnExecutor.updateLastUpdateTime()
113102
isExist = true

linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/plans/physical/PhysicalContextImpl.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class PhysicalContextImpl(private var rootTask: ExecTask, private var leafTasks:
7070
OrchestratorErrorCodeSummary.EXECUTION_ERROR_CODE,
7171
cause
7272
)
73+
// 标识失败代码索引,以便重试的时候只执行未执行代码
74+
failedResponse.errorIndex = this.rootTask.params.getOrElse("execute.error.code.index", "-1").toInt
7375
this.response = failedResponse
7476
syncListenerBus.postToAll(RootTaskResponseEvent(getRootTask, failedResponse))
7577
}

linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/strategy/async/AsyncExecTaskRunnerImpl.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,6 @@ class AsyncExecTaskRunnerImpl(override val task: ExecTask)
7575
case succeed: SucceedTaskResponse =>
7676
logger.info(s"Succeed to execute ExecTask(${task.getIDInfo})")
7777
transientStatus(ExecutionNodeStatus.Succeed)
78-
case failedTaskResponse: DefaultFailedTaskResponse =>
79-
logger.info(s"DefaultFailed to execute ExecTask(${task.getIDInfo})")
80-
failedTaskResponse.errorIndex =
81-
task.params.getOrElse("execute.error.code.index", "-1").toInt
82-
transientStatus(ExecutionNodeStatus.Failed)
8378
case failedTaskResponse: FailedTaskResponse =>
8479
logger.info(s"Failed to execute ExecTask(${task.getIDInfo})")
8580
transientStatus(ExecutionNodeStatus.Failed)

0 commit comments

Comments
 (0)