Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cdee83e
[DSIP-23][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
3394bb2
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 25, 2025
ed00657
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 25, 2025
451bf9f
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 26, 2025
80b1ee7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 26, 2025
6de20c9
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
cd182de
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 27, 2025
a08701a
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
2929213
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
03c9b3f
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Nov 28, 2025
8c3d2d7
Merge branch 'dev' of github.com:niumy0701/dolphinscheduler into DSIP…
Nov 29, 2025
7be50e7
Merge branch 'DSIP-23-DmsTask' of github.com:niumy0701/dolphinschedul…
Nov 29, 2025
56115d3
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Nov 29, 2025
f61aadc
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 1, 2025
4137abb
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f905728
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 2, 2025
f259d12
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 3, 2025
1718eb2
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
5eac924
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 4, 2025
0b04ffe
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 5, 2025
ff93f77
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
e1af574
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
e8175f5
Merge branch 'dev' into DSIP-23-DmsTask
niumy0701 Dec 8, 2025
9c8b7f0
Revert "[Improvement-17723][TaskPlugin] DmsTask resource leak repair"
Dec 8, 2025
7809aa3
[Improvement-17723][TaskPlugin] DmsTask resource leak repair
Dec 8, 2025
5a12213
Revert "[Improvement-17723][TaskPlugin] DmsTask resource leak repair"
Dec 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ public String replaceFileParameters(String parameter) throws IOException {
}
if (parameter.startsWith("file://")) {
String filePath = parameter.substring(7);
try {
return IOUtils.toString(new FileInputStream(filePath), StandardCharsets.UTF_8);
try (FileInputStream fis = new FileInputStream(filePath)) {
return IOUtils.toString(fis, StandardCharsets.UTF_8);
} catch (IOException e) {
throw new IOException("Error reading file: " + filePath, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.plugin.task.api.AbstractRemoteTask;
import org.apache.dolphinscheduler.plugin.task.api.TaskCallBack;
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
import org.apache.dolphinscheduler.plugin.task.api.model.ApplicationInfo;
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;

import org.apache.commons.beanutils.BeanUtils;
Expand Down Expand Up @@ -78,6 +80,42 @@ public void init() throws TaskException {
initDmsHook();
}

/**
* If appIds is empty, submit a new remote application; otherwise, just track application status.
*
* @param taskCallBack
* @throws TaskException
*/
@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
// if appIds is not empty, just track application status, avoid resubmitting remote task
if (StringUtils.isNotEmpty(taskRequest.getAppIds())) {
setAppIds(taskRequest.getAppIds());
trackApplicationStatus();
return;
}

// submit a remote application
submitApplication();

if (StringUtils.isNotEmpty(getAppIds())) {
taskRequest.setAppIds(getAppIds());
// callback to update remote application info
taskCallBack.updateRemoteApplicationInfo(taskRequest.getTaskInstanceId(),
new ApplicationInfo(getAppIds()));
}

// keep tracking application status
trackApplicationStatus();
} finally {
// shutdown dmsHook client
if (dmsHook.getClient() != null) {
dmsHook.getClient().shutdown();
}
}
}

@Override
public List<String> getApplicationIds() throws TaskException {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;

import org.apache.commons.io.FileUtils;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;

Expand Down Expand Up @@ -238,4 +243,63 @@ public void testAwaitReplicationTaskStatus() {
}
});
}

@Test
public void testReplaceFileParametersWithNull() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = null;
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertNull(result);
}
});
}

@Test
public void testReplaceFileParametersWithNormalString() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "normal string";
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(parameter, result);
}
});
}

@Test
public void testReplaceFileParametersWithExistingFile() throws IOException {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
File tempFile = new File("tempFile.txt");
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String fileContent = "content of the file";
FileUtils.writeStringToFile(tempFile, fileContent, StandardCharsets.UTF_8);
String parameter = "file://" + tempFile.getAbsolutePath();
String result = dmsHook.replaceFileParameters(parameter);
Assertions.assertEquals(fileContent, result);
} finally {
tempFile.delete();
}
});
}

@Test
public void testReplaceFileParametersWithNonexistentFile() {
Assertions.assertTimeout(Duration.ofMillis(60000), () -> {
try (MockedStatic<DmsHook> mockHook = Mockito.mockStatic(DmsHook.class)) {
mockHook.when(DmsHook::createClient).thenReturn(client);
DmsHook dmsHook = spy(new DmsHook());
String parameter = "file://nonexistentfile.txt";
Assertions.assertThrows(IOException.class, () -> {
dmsHook.replaceFileParameters(parameter);
});
}
});
}

}