Skip to content
Closed
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cdee83e
[DSIP-23][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
3394bb2
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
ed00657
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 25, 2025
451bf9f
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 26, 2025
80b1ee7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 26, 2025
6de20c9
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
cd182de
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
a08701a
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
2929213
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
03c9b3f
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
8c3d2d7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 29, 2025
7be50e7
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 29, 2025
56115d3
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 29, 2025
f61aadc
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 1, 2025
4137abb
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f905728
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f259d12
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 3, 2025
1718eb2
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
5eac924
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
0b04ffe
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 5, 2025
ff93f77
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
e1af574
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
e8175f5
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 8, 2025
9c8b7f0
Revert "[Improvement-17723][TaskPlugin] DmsTask resource leak repair"
Dec 8, 2025
7809aa3
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
5a12213
Revert "[Improvement-17723][TaskPlugin] DmsTask resource leak repair"
Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,19 @@ public void stopReplicationTask() {
if (replicationTaskArn == null) {
return;
}
StopReplicationTaskRequest request = new StopReplicationTaskRequest()
.withReplicationTaskArn(replicationTaskArn);
client.stopReplicationTask(request);
awaitReplicationTaskStatus(STATUS.STOPPED);
try {
StopReplicationTaskRequest request = new StopReplicationTaskRequest()
.withReplicationTaskArn(replicationTaskArn);
client.stopReplicationTask(request);
awaitReplicationTaskStatus(STATUS.STOPPED);
} catch (Exception e) {
log.error("stopReplicationTask error: ", e);
} finally {
if (client != null) {
// shutdown client
client.shutdown();
}
}
}

public Boolean deleteReplicationTask() {
Expand Down Expand Up @@ -268,8 +277,8 @@ public String replaceFileParameters(String parameter) throws IOException {
}
if (parameter.startsWith("file://")) {
String filePath = parameter.substring(7);
try {
return IOUtils.toString(new FileInputStream(filePath), StandardCharsets.UTF_8);
try (FileInputStream fis = new FileInputStream(filePath)) {
return IOUtils.toString(fis, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IOException("Error reading file: " + filePath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import org.apache.commons.beanutils.BeanUtils;
Expand Down Expand Up @@ -78,6 +80,42 @@ public void init() throws TaskException {
initDmsHook();
}

/**
* If appIds is empty, submit a new remote application; otherwise, just track application status.
*
* @param taskCallBack
* @throws TaskException
*/
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// if appIds is not empty, just track application status, avoid resubmitting remote task
if (StringUtils.isNotEmpty(taskRequest.getAppIds())) {
setAppIds(taskRequest.getAppIds());
trackApplicationStatus();
return;
}

// submit a remote application
submitApplication();

if (StringUtils.isNotEmpty(getAppIds())) {
taskRequest.setAppIds(getAppIds());
// callback to update remote application info
taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(),
new ApplicationInfo(getAppIds()));
}

// keep tracking application status
trackApplicationStatus();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS DMS task types will not be submitted to yarn or k8s for execution, so this logic should not be added.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AWS DMS task types will not be submitted to yarn or k8s for execution, so this logic should not be added.

DMSTask also executes the handle method. When rewriting the handle method, I added the logic to close resources

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two do not conflict or even have nothing to do with each other.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two do not conflict or even have nothing to do with each other.

Do you mean that DMS task does not involve resource leakage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We've already safely close connection in cancelApplication

Copy link
Author

@niumy0701 niumy0701 Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We've already safely close connection in cancelApplication

Based on the previous discussion with @ruanwenjun , it is necessary to uniformly close resources in the handle method.
Shall we discuss and confirm again

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niumy0701 For such issues, please first provide evidence of resource leakage.

Copy link
Author

@niumy0701 niumy0701 Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@niumy0701 For such issues, please first provide evidence of resource leakage.

okay,I am trying to simulate a resource leakage

} finally {
// shutdown dmsHook client
if (dmsHook.getClient() != null) {
dmsHook.getClient().shutdown();
}
}
}

@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

Expand Down Expand Up @@ -238,4 +243,63 @@ public void testAwaitReplicationTaskStatus() {
}
});
}

@Test
public void testReplaceFileParametersWithNull() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = null;
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertNull(result);
}
});
}

@Test
public void testReplaceFileParametersWithNormalString() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "normal string";
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(parameter, result);
}
});
}

@Test
public void testReplaceFileParametersWithExistingFile() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
File tempFile = new File("tempFile.txt");
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String fileContent = "content of the file";
FileUtils.writeStringToFile(tempFile, fileContent, StandardCharsets.UTF_8);
String parameter = "file://" + tempFile.getAbsolutePath();
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(fileContent, result);
} finally {
tempFile.delete();
}
});
}

@Test
public void testReplaceFileParametersWithNonexistentFile() {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "file://nonexistentfile.txt";
Assertions.assertThrows(IOException.class, () -> {
dmsHook.replaceFileParameters(parameter);
});
}
});
}

}