Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
ff865ab
[DSIP-23][TaskPlugin] EmrTask resource leak repair
Nov 25, 2025
e59d585
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Nov 25, 2025
78f4625
Merge branch 'DSIP-23-EmrTask' of github.com:niumy0701/dolphinschedul…
Nov 25, 2025
250a25b
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Nov 26, 2025
75174e2
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 26, 2025
91140be
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 26, 2025
0b5bdfb
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 27, 2025
ada7556
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 27, 2025
d969b51
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
639edf6
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
a715e47
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Nov 28, 2025
6c98ca0
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 1, 2025
8152804
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 2, 2025
0653b6c
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 2, 2025
31b0a55
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 3, 2025
38d6d4d
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 4, 2025
921a2da
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 4, 2025
30e3336
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 5, 2025
6eb40c0
[Improvement-17724][TaskPlugin] EmrTask resource leak repair
Dec 8, 2025
d233b44
Merge branch 'dev' into DSIP-23-EmrTask
niumy0701 Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ public void trackApplicationStatus() throws TaskException {
final int exitStatusCode = calculateExitStatusCode(stepStatus);
setExitStatusCode(exitStatusCode);
log.info("emr task finished with step status : {}", stepStatus);

// shutdown emrclient
if (emrClient != null) {
emrClient.shutdown();
}
}
}

Expand Down Expand Up @@ -187,26 +192,37 @@ private StepStatus getStepStatus() {

@Override
public void cancelApplication() throws TaskException {
log.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest = new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);
try {
log.info("trying cancel emr step, taskId:{}, clusterId:{}, stepId:{}",
this.taskExecutionContext.getTaskInstanceId(), clusterId, stepId);
CancelStepsRequest cancelStepsRequest =
new CancelStepsRequest().withClusterId(clusterId).withStepIds(stepId);
CancelStepsResult cancelStepsResult = emrClient.cancelSteps(cancelStepsRequest);

if (cancelStepsResult == null) {
throw new EmrTaskException("cancel emr step failed");
}

if (cancelStepsResult == null) {
throw new EmrTaskException("cancel emr step failed");
}
CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
.stream()
.filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
.findFirst()
.orElseThrow(() -> new EmrTaskException("cancel emr step failed"));

CancelStepsInfo cancelEmrStepInfo = cancelStepsResult.getCancelStepsInfoList()
.stream()
.filter(cancelStepsInfo -> cancelStepsInfo.getStepId().equals(stepId))
.findFirst()
.orElseThrow(() -> new EmrTaskException("cancel emr step failed"));
if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());
}

if (CancelStepsRequestStatus.FAILED.toString().equals(cancelEmrStepInfo.getStatus())) {
throw new EmrTaskException("cancel emr step failed, message:" + cancelEmrStepInfo.getReason());
log.info("the result of cancel emr step is:{}", cancelStepsResult);
} catch (EmrTaskException | SdkBaseException e) {
log.error("cancel emr step failed", e);
throw new TaskException("cancel emr step failed", e);
} finally {
// shutdown emrclient
if (emrClient != null) {
emrClient.shutdown();
}
}

log.info("the result of cancel emr step is:{}", cancelStepsResult);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public void trackApplicationStatus() throws TaskException {
final int exitStatusCode = calculateExitStatusCode(clusterStatus);
setExitStatusCode(exitStatusCode);
log.info("emr task finished with cluster status : {}", clusterStatus);

// shutdown emrclient
if (emrClient != null) {
emrClient.shutdown();
}
}
}

Expand Down Expand Up @@ -180,11 +185,21 @@ private ClusterStatus getClusterStatus() {

@Override
public void cancelApplication() throws TaskException {
log.info("trying terminate job flow, taskId:{}, clusterId:{}", this.taskExecutionContext.getTaskInstanceId(),
clusterId);
TerminateJobFlowsRequest terminateJobFlowsRequest = new TerminateJobFlowsRequest().withJobFlowIds(clusterId);
TerminateJobFlowsResult terminateJobFlowsResult = emrClient.terminateJobFlows(terminateJobFlowsRequest);
log.info("the result of terminate job flow is:{}", terminateJobFlowsResult);
try {
log.info("trying terminate job flow, taskId:{}, clusterId:{}", this.taskExecutionContext.getTaskInstanceId(),
clusterId);
TerminateJobFlowsRequest terminateJobFlowsRequest = new TerminateJobFlowsRequest().withJobFlowIds(clusterId);
TerminateJobFlowsResult terminateJobFlowsResult = emrClient.terminateJobFlows(terminateJobFlowsRequest);
log.info("the result of terminate job flow is:{}", terminateJobFlowsResult);
} catch (Exception e) {
log.error("emr job flow task cancel error: {}", e.getMessage(), e);
throw new TaskException("emr job flow task cancel error: " + e.getMessage(), e);
} finally {
// shutdown emrclient
if (emrClient != null) {
emrClient.shutdown();
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
Expand All @@ -44,6 +46,10 @@
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsResult;
import com.amazonaws.services.elasticmapreduce.model.AmazonElasticMapReduceException;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsInfo;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsRequestStatus;
import com.amazonaws.services.elasticmapreduce.model.CancelStepsResult;
import com.amazonaws.services.elasticmapreduce.model.DescribeStepResult;
import com.amazonaws.services.elasticmapreduce.model.Step;
import com.amazonaws.services.elasticmapreduce.model.StepState;
Expand Down Expand Up @@ -198,4 +204,34 @@ private EmrParameters buildErrorEmrTaskParameters() {

return emrParameters;
}

@Test
public void cancelApplication_CancelStepsResultIsNull_ShouldThrowException() {
Mockito.when(emrClient.cancelSteps(any(CancelStepsRequest.class))).thenReturn(null);

TaskException exception = Assertions.assertThrows(TaskException.class, () -> {
emrAddStepsTask.cancelApplication();
});

verify(emrClient, times(1)).shutdown();
}

@Test
public void cancelApplication_CancelStepsRequestFails_ShouldThrowException() {
CancelStepsInfo cancelStepsInfo = new CancelStepsInfo()
.withStepId("step-123")
.withStatus(CancelStepsRequestStatus.FAILED.toString())
.withReason("Test failure");

CancelStepsResult cancelStepsResult = new CancelStepsResult()
.withCancelStepsInfoList(cancelStepsInfo);

Mockito.when(emrClient.cancelSteps(any(CancelStepsRequest.class))).thenReturn(cancelStepsResult);

TaskException exception = Assertions.assertThrows(TaskException.class, () -> {
emrAddStepsTask.cancelApplication();
});

verify(emrClient, times(1)).shutdown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_SUCCESS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
Expand Down Expand Up @@ -49,6 +51,8 @@
import com.amazonaws.services.elasticmapreduce.model.ClusterStatus;
import com.amazonaws.services.elasticmapreduce.model.DescribeClusterResult;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.TerminateJobFlowsResult;

@ExtendWith(MockitoExtension.class)
public class EmrJobFlowTaskTest {
Expand Down Expand Up @@ -210,4 +214,17 @@ private String buildEmrTaskParameters() {

return JSONUtils.toJsonString(emrParameters);
}

@Test
public void testCancelApplication_Success() throws TaskException {
TerminateJobFlowsResult terminateJobFlowsResult = Mockito.mock(TerminateJobFlowsResult.class);
Mockito.when(emrClient.terminateJobFlows(any(TerminateJobFlowsRequest.class)))
.thenReturn(terminateJobFlowsResult);
Mockito.doNothing().when(emrClient).shutdown();

Assertions.assertDoesNotThrow(() -> emrJobFlowTask.cancelApplication());

verify(emrClient, times(1)).terminateJobFlows(any(TerminateJobFlowsRequest.class));
verify(emrClient, times(1)).shutdown();
}
}