Skip to content

Commit 5c5b44c

Browse files
authored
[Improvement-17788] [TaskPlugin] Optimization of log processing for RemoteShellTask (#17800)
1 parent 5cac989 commit 5c5b44c

File tree

2 files changed

+154
-17
lines changed
  • dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src

2 files changed

+154
-17
lines changed

dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/main/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutor.java

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,19 @@
3434
import org.apache.sshd.sftp.client.SftpClientFactory;
3535
import org.apache.sshd.sftp.client.fs.SftpFileSystem;
3636

37+
import java.io.BufferedReader;
3738
import java.io.ByteArrayOutputStream;
3839
import java.io.IOException;
40+
import java.io.InputStream;
41+
import java.io.InputStreamReader;
42+
import java.nio.charset.StandardCharsets;
3943
import java.nio.file.Files;
4044
import java.nio.file.Path;
4145
import java.nio.file.Paths;
4246
import java.util.EnumSet;
4347
import java.util.HashMap;
4448
import java.util.Map;
49+
import java.util.function.Consumer;
4550

4651
import lombok.SneakyThrows;
4752
import lombok.extern.slf4j.Slf4j;
@@ -109,13 +114,15 @@ public void track(String taskId) throws Exception {
109114
do {
110115
pid = getTaskPid(taskId);
111116
String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId);
112-
String logLine = runRemote(trackCommand);
113-
if (StringUtils.isEmpty(logLine)) {
114-
Thread.sleep(TRACK_INTERVAL);
117+
int readLines = runRemoteAndProcessLines(trackCommand, line -> {
118+
log.info(line);
119+
taskOutputParameterParser.appendParseLog(line);
120+
});
121+
if (readLines > 0) {
122+
logN += readLines;
123+
115124
} else {
116-
logN += logLine.split("\n").length;
117-
log.info(logLine);
118-
taskOutputParameterParser.appendParseLog(logLine);
125+
Thread.sleep(TRACK_INTERVAL);
119126
}
120127
} while (StringUtils.isNotEmpty(pid));
121128
taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
@@ -220,23 +227,33 @@ public void uploadScript(String taskId, String localFile) throws IOException {
220227
}
221228

222229
public String runRemote(String command) throws IOException {
230+
StringBuilder out = new StringBuilder();
231+
runRemoteAndProcessLines(command, line -> out.append(line).append(System.lineSeparator()));
232+
return out.toString();
233+
}
234+
235+
private int runRemoteAndProcessLines(String command, Consumer<String> lineConsumer) throws IOException {
223236
try (
224237
ChannelExec channel = getSession().createExecChannel(command);
225-
ByteArrayOutputStream out = new ByteArrayOutputStream();
226238
ByteArrayOutputStream err = new ByteArrayOutputStream()) {
227-
228-
channel.setOut(System.out);
229-
channel.setOut(out);
239+
InputStream out = channel.getInvertedOut();
230240
channel.setErr(err);
231241
channel.open();
242+
int readLines = 0;
243+
try (BufferedReader reader = new BufferedReader(new InputStreamReader(out, StandardCharsets.UTF_8))) {
244+
String line;
245+
while ((line = reader.readLine()) != null) {
246+
readLines++;
247+
lineConsumer.accept(line);
248+
}
249+
}
232250
channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
233-
channel.close();
234251
Integer exitStatus = channel.getExitStatus();
235252
if (exitStatus == null || exitStatus != 0) {
236253
throw new TaskException(
237254
"Remote shell task error, exitStatus: " + exitStatus + " error message: " + err);
238255
}
239-
return out.toString();
256+
return readLines;
240257
}
241258
}
242259

dolphinscheduler-task-plugin/dolphinscheduler-task-remoteshell/src/test/java/org/apache/dolphinscheduler/plugin/task/remoteshell/RemoteExecutorTest.java

Lines changed: 125 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,22 +32,28 @@
3232
import org.apache.dolphinscheduler.plugin.datasource.ssh.param.SSHDataSourceProcessor;
3333
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
3434

35+
import org.apache.commons.io.IOUtils;
36+
import org.apache.commons.io.input.NullInputStream;
37+
import org.apache.commons.lang3.SystemUtils;
3538
import org.apache.sshd.client.channel.ChannelExec;
39+
import org.apache.sshd.client.channel.ClientChannelEvent;
3640
import org.apache.sshd.client.session.ClientSession;
3741

42+
import java.io.ByteArrayInputStream;
3843
import java.io.IOException;
44+
import java.io.InputStream;
45+
import java.nio.charset.StandardCharsets;
46+
import java.util.EnumSet;
3947

4048
import org.junit.jupiter.api.AfterEach;
4149
import org.junit.jupiter.api.Assertions;
4250
import org.junit.jupiter.api.BeforeEach;
43-
import org.junit.jupiter.api.Disabled;
4451
import org.junit.jupiter.api.Test;
4552
import org.junit.jupiter.api.extension.ExtendWith;
4653
import org.mockito.MockedStatic;
4754
import org.mockito.Mockito;
4855
import org.mockito.junit.jupiter.MockitoExtension;
4956

50-
@Disabled
5157
@ExtendWith(MockitoExtension.class)
5258
public class RemoteExecutorTest {
5359

@@ -83,9 +89,19 @@ void testRunRemote() throws IOException {
8389
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
8490
when(clientSession.createExecChannel(Mockito.anyString())).thenReturn(channel);
8591
when(channel.getExitStatus()).thenReturn(1);
92+
when(channel.getInvertedOut()).thenReturn(new NullInputStream());
8693
Assertions.assertThrows(TaskException.class, () -> remoteExecutor.runRemote("ls -l"));
94+
95+
// Mock the streaming runRemote to simulate log output
96+
String output = "total 26392\n" +
97+
"dr-xr-xr-x. 6 root root 3072 Aug 15 2023 boot\n" +
98+
"drwxr-xr-x 18 root root 3120 Sep 23 2023 dev\n" +
99+
"drwxr-xr-x. 91 root root 4096 Sep 23 2023 etc\n";
100+
InputStream inputStream = IOUtils.toInputStream(output, StandardCharsets.UTF_8);
101+
when(channel.getInvertedOut()).thenReturn(inputStream);
87102
when(channel.getExitStatus()).thenReturn(0);
88-
Assertions.assertDoesNotThrow(() -> remoteExecutor.runRemote("ls -l"));
103+
String actualOut = Assertions.assertDoesNotThrow(() -> remoteExecutor.runRemote("ls -l"));
104+
Assertions.assertEquals(output, actualOut);
89105
}
90106

91107
@Test
@@ -141,11 +157,20 @@ void testGetTaskExitCode() throws IOException {
141157
void getAllRemotePidStr() throws IOException {
142158

143159
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
144-
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
160+
// Mock pstree output based on OS
161+
if (SystemUtils.IS_OS_MAC) {
162+
doReturn("-+= 9527 root\n \\-+= 9528 root").when(remoteExecutor).runRemote(anyString());
163+
} else {
164+
doReturn("bash(9527)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
165+
}
145166
String allPidStr = remoteExecutor.getAllRemotePidStr("9527");
146167
Assertions.assertEquals("9527 9528", allPidStr);
147168

148-
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
169+
if (SystemUtils.IS_OS_MAC) {
170+
doReturn("-+= 1 root\n \\-+= 9528 root").when(remoteExecutor).runRemote(anyString());
171+
} else {
172+
doReturn("systemd(1)───sleep(9528)").when(remoteExecutor).runRemote(anyString());
173+
}
149174
allPidStr = remoteExecutor.getAllRemotePidStr("9527");
150175
Assertions.assertEquals("9527", allPidStr);
151176

@@ -154,4 +179,99 @@ void getAllRemotePidStr() throws IOException {
154179
Assertions.assertEquals("9527", allPidStr);
155180

156181
}
182+
183+
@Test
184+
void testTrack() throws Exception {
185+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
186+
String taskId = "1234";
187+
ChannelExec channel = Mockito.mock(ChannelExec.class, RETURNS_DEEP_STUBS);
188+
189+
// Mock getTaskPid to control the loop, return a valid pid 2 times, then return empty
190+
doReturn("9527")
191+
.doReturn("9527")
192+
.doReturn("").when(remoteExecutor).getTaskPid(taskId);
193+
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
194+
when(clientSession.createExecChannel(anyString())).thenReturn(channel);
195+
196+
// Mock the streaming runRemote to simulate log output
197+
String logContent = "some log line 1\n"
198+
+ "echo \"${setValue(my_prop=my_value)}\"\n"
199+
+ "some log line 2\n";
200+
InputStream inputStream = IOUtils.toInputStream(logContent, StandardCharsets.UTF_8);
201+
when(channel.getInvertedOut()).thenReturn(inputStream);
202+
when(channel.getExitStatus()).thenReturn(0);
203+
204+
remoteExecutor.track(taskId);
205+
206+
// Verify that the output parameter was parsed and stored
207+
Assertions.assertEquals(1, remoteExecutor.getTaskOutputParams().size());
208+
Assertions.assertEquals("my_value", remoteExecutor.getTaskOutputParams().get("my_prop"));
209+
}
210+
211+
@Test
212+
void testRunRemoteWithEmptyOutput() throws Exception {
213+
// Test empty output scenario (readLines = 0)
214+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
215+
ChannelExec channel = Mockito.mock(ChannelExec.class, RETURNS_DEEP_STUBS);
216+
217+
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
218+
when(clientSession.createExecChannel(anyString())).thenReturn(channel);
219+
when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new byte[0]));
220+
when(channel.getExitStatus()).thenReturn(0);
221+
when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
222+
.thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
223+
224+
String result = Assertions.assertDoesNotThrow(() -> remoteExecutor.runRemote("echo"));
225+
Assertions.assertEquals("", result);
226+
}
227+
228+
@Test
229+
void testRunRemoteWithNonZeroExitStatus() throws Exception {
230+
// Test command failure scenario (exitStatus != 0)
231+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
232+
ChannelExec channel = Mockito.mock(ChannelExec.class, RETURNS_DEEP_STUBS);
233+
234+
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
235+
when(clientSession.createExecChannel(anyString())).thenReturn(channel);
236+
when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("error output", StandardCharsets.UTF_8));
237+
when(channel.getExitStatus()).thenReturn(1);
238+
when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
239+
.thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
240+
241+
Assertions.assertThrows(TaskException.class, () -> remoteExecutor.runRemote("failing_command"));
242+
}
243+
244+
@Test
245+
void testRunRemoteWithNullExitStatus() throws Exception {
246+
// Test null exitStatus scenario
247+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
248+
ChannelExec channel = Mockito.mock(ChannelExec.class, RETURNS_DEEP_STUBS);
249+
250+
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
251+
when(clientSession.createExecChannel(anyString())).thenReturn(channel);
252+
when(channel.getInvertedOut()).thenReturn(IOUtils.toInputStream("some output", StandardCharsets.UTF_8));
253+
when(channel.getExitStatus()).thenReturn(null);
254+
when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
255+
.thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
256+
257+
Assertions.assertThrows(TaskException.class, () -> remoteExecutor.runRemote("command"));
258+
}
259+
260+
@Test
261+
void testTrackWithEmptyLogOutput() throws Exception {
262+
// Test track with empty log output (readLines = 0 scenario in track loop)
263+
RemoteExecutor remoteExecutor = spy(new RemoteExecutor(sshConnectionParam));
264+
String taskId = "1234";
265+
ChannelExec channel = Mockito.mock(ChannelExec.class, RETURNS_DEEP_STUBS);
266+
267+
doReturn("9527").doReturn("").when(remoteExecutor).getTaskPid(taskId);
268+
when(clientSession.auth().verify().isSuccess()).thenReturn(true);
269+
when(clientSession.createExecChannel(anyString())).thenReturn(channel);
270+
when(channel.getInvertedOut()).thenReturn(new ByteArrayInputStream(new byte[0]));
271+
when(channel.getExitStatus()).thenReturn(0);
272+
when(channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0))
273+
.thenReturn(EnumSet.of(ClientChannelEvent.CLOSED));
274+
275+
Assertions.assertDoesNotThrow(() -> remoteExecutor.track(taskId));
276+
}
157277
}

0 commit comments

Comments
 (0)