Skip to content

Commit dcebf26

Browse files
authored
[Improvement-17695][ProcedureTask] Support cancel procedure task (#17696)
1 parent 550884b commit dcebf26

File tree

1 file changed

+28
-7
lines changed
  • dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure

1 file changed

+28
-7
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-procedure/src/main/java/org/apache/dolphinscheduler/plugin/task/procedure/ProcedureTask.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import java.sql.CallableStatement;
4444
import java.sql.Connection;
4545
import java.sql.SQLException;
46+
import java.sql.Statement;
4647
import java.sql.Types;
4748
import java.util.HashMap;
4849
import java.util.Map;
@@ -60,6 +61,8 @@ public class ProcedureTask extends AbstractTask {
6061

6162
private final ProcedureTaskExecutionContext procedureTaskExecutionContext;
6263

64+
private volatile Statement sessionStatement;
65+
6366
/**
6467
* constructor
6568
*
@@ -105,30 +108,48 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
105108
}
106109
String proceduerSql = formatSql(sqlParamsMap, paramsMap);
107110
// call method
108-
try (CallableStatement stmt = connection.prepareCall(proceduerSql)) {
111+
try (CallableStatement tmpStatement = connection.prepareCall(proceduerSql)) {
112+
sessionStatement = tmpStatement;
109113
// set timeout
110-
setTimeout(stmt);
114+
setTimeout(tmpStatement);
111115

112116
// outParameterMap
113-
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, sqlParamsMap, paramsMap);
117+
Map<Integer, Property> outParameterMap = getOutParameterMap(tmpStatement, sqlParamsMap, paramsMap);
114118

115-
stmt.executeUpdate();
119+
tmpStatement.executeUpdate();
116120

117121
// print the output parameters to the log
118-
printOutParameter(stmt, outParameterMap);
122+
printOutParameter(tmpStatement, outParameterMap);
119123

120124
setExitStatusCode(EXIT_CODE_SUCCESS);
121125
}
122126
} catch (Exception e) {
127+
if (exitStatusCode == TaskConstants.EXIT_CODE_KILL) {
128+
log.info("This procedure task has been killed");
129+
return;
130+
}
123131
setExitStatusCode(EXIT_CODE_FAILURE);
124-
log.error("procedure task error", e);
132+
log.error("Failed to execute this procedure task", e);
125133
throw new TaskException("Execute procedure task failed", e);
126134
}
127135
}
128136

129137
@Override
130138
public void cancel() throws TaskException {
131-
139+
if (sessionStatement != null) {
140+
try {
141+
log.info("Try to cancel this procedure task");
142+
sessionStatement.cancel();
143+
setExitStatusCode(TaskConstants.EXIT_CODE_KILL);
144+
log.info("This procedure task was canceled");
145+
} catch (Exception ex) {
146+
log.warn("Failed to cancel this procedure task", ex);
147+
throw new TaskException("Cancel this procedure task failed", ex);
148+
}
149+
} else {
150+
log.info(
151+
"Attempted to cancel this procedure task, but no active statement exists. Possible reasons: task not started, already completed, or canceled.");
152+
}
132153
}
133154

134155
private String formatSql(Map<Integer, Property> sqlParamsMap, Map<String, Property> paramsMap) {

0 commit comments

Comments
 (0)