Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ff865ab
[DSIP-23][TaskPlugin] EmrTask resource leak repair
Nov 25, 2025
e59d585
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Nov 25, 2025
78f4625
Merge branch 'DSIP-23-EmrTask' of github.com:niumy0701/dolphinschedul…
Nov 25, 2025
250a25b
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Nov 26, 2025
75174e2
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 26, 2025
91140be
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 26, 2025
0b5bdfb
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 27, 2025
ada7556
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 27, 2025
d969b51
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
639edf6
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
a715e47
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
6c98ca0
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 1, 2025
8152804
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 2, 2025
0653b6c
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 2, 2025
31b0a55
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 3, 2025
38d6d4d
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 4, 2025
921a2da
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 4, 2025
30e3336
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 5, 2025
6eb40c0
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 8, 2025
d233b44
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,14 @@
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;

import org.apache.commons.lang3.StringUtils;

import java.util.Map;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -86,6 +91,42 @@ public void init() {
emrClient = createEmrClient();
}

/**
* If appIds is empty, submit a new remote application; otherwise, just track application status.
*
* @param taskCallBack
* @throws TaskException
*/
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// if appIds is not empty, just track application status, avoid resubmitting remote task
if (StringUtils.isNotEmpty(taskRequest.getAppIds())) {
setAppIds(taskRequest.getAppIds());
trackApplicationStatus();
return;
}

// submit a remote application
submitApplication();

if (StringUtils.isNotEmpty(getAppIds())) {
taskRequest.setAppIds(getAppIds());
// callback to update remote application info
taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(),
new ApplicationInfo(getAppIds()));
}

// keep tracking application status
trackApplicationStatus();
} finally {
// shutdown emrclient
if (emrClient != null) {
emrClient.shutdown();
}
}
}

@Override
public AbstractParameters getParameters() {
return emrParameters;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
@Slf4j
public class EmrAddStepsTask extends AbstractEmrTask {

private String stepId;
String stepId;

private final HashSet<String> waitingStateSet = Sets.newHashSet(
StepState.PENDING.toString(),
Expand Down Expand Up @@ -189,7 +189,8 @@ private StepStatus getStepStatus() {
public void cancelApplication() throws TaskException {
log.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsRequest cancelStepsRequest =
new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);

if (cancelStepsResult == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,11 @@ private ClusterStatus getClusterStatus() {

@Override
public void cancelApplication() throws TaskException {
log.info("trying terminate job flow, taskId:{}, clusterId:{}", this.taskExecutionContext.getTaskInstanceId(),
log.info("trying terminate job flow, taskId:{}, clusterId:{}",
this.taskExecutionContext.getTaskInstanceId(),
clusterId);
TerminateJobFlowsRequest terminateJobFlowsRequest = new TerminateJobFlowsRequest().withJobFlowIds(clusterId);
TerminateJobFlowsRequest terminateJobFlowsRequest =
new TerminateJobFlowsRequest().withJobFlowIds(clusterId);
TerminateJobFlowsResult terminateJobFlowsResult = emrClient.terminateJobFlows(terminateJobFlowsRequest);
log.info("the result of terminate job flow is:{}", terminateJobFlowsResult);
}
Expand Down
Loading