Skip to content

Commit 26fe6fe

Browse files
authored
[Fix-17780][TaskPlugin] Fix shell output log might loss due to incorrect usage of log buffer (#17790)
1 parent 4d72bc4 commit 26fe6fe

File tree

20 files changed

+94
-330
lines changed

20 files changed

+94
-330
lines changed

docs/docs/en/guide/upgrade/incompatible.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@ This document records the incompatible updates between each version. You need to
3737
* Renamed the publicKey field to privateKey in the SSH connection parameters under the datasource configuration. ([#17666])(https://github.com/apache/dolphinscheduler/pull/17666)
3838
* Add table t_ds_serial_command. ([#17531])(https://github.com/apache/dolphinscheduler/pull/17531)
3939
* Remove the default value of `python-gateway.auth-token` at `api-server/application.yaml`. ([#17801])(https://github.com/apache/dolphinscheduler/pull/17801)
40+
* Refactor the task plugins which use ShellCommandExecutor ([#17790])(https://github.com/apache/dolphinscheduler/pull/17790)
4041

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

Lines changed: 27 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.api;
1919

20-
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
2120
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
2221
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
2322
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_HARD_KILL;
@@ -42,12 +41,10 @@
4241
import java.lang.reflect.Field;
4342
import java.util.HashMap;
4443
import java.util.Map;
45-
import java.util.concurrent.ExecutionException;
44+
import java.util.Optional;
45+
import java.util.concurrent.CompletableFuture;
4646
import java.util.concurrent.ExecutorService;
47-
import java.util.concurrent.Future;
48-
import java.util.concurrent.LinkedBlockingQueue;
4947
import java.util.concurrent.TimeUnit;
50-
import java.util.function.Consumer;
5148

5249
import lombok.extern.slf4j.Slf4j;
5350
import io.fabric8.kubernetes.client.dsl.LogWatch;
@@ -59,41 +56,12 @@
5956
public abstract class AbstractCommandExecutor {
6057

6158
protected volatile Map<String, String> taskOutputParams = new HashMap<>();
62-
/**
63-
* process
64-
*/
6559
private Process process;
6660

67-
/**
68-
* log handler
69-
*/
70-
protected Consumer<LinkedBlockingQueue<String>> logHandler;
71-
72-
/**
73-
* log list
74-
*/
75-
protected LinkedBlockingQueue<String> logBuffer;
76-
77-
protected boolean processLogOutputIsSuccess = false;
78-
79-
protected boolean podLogOutputIsFinished = false;
80-
81-
/**
82-
* taskRequest
83-
*/
8461
protected TaskExecutionContext taskRequest;
8562

86-
protected Future<?> taskOutputFuture;
87-
88-
protected Future<?> podLogOutputFuture;
89-
90-
public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
91-
TaskExecutionContext taskRequest) {
92-
this.logHandler = logHandler;
63+
public AbstractCommandExecutor(TaskExecutionContext taskRequest) {
9364
this.taskRequest = taskRequest;
94-
this.logBuffer = new LinkedBlockingQueue<>();
95-
this.logBuffer.add(EMPTY_STRING);
96-
9765
}
9866

9967
// todo: We need to build the IShellActuator in outer class, since different task may have specific logic to build
@@ -135,10 +103,10 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
135103
process = iShellInterceptor.execute();
136104

137105
// parse process output
138-
parseProcessOutput(this.process);
106+
final CompletableFuture<Void> collectProcessLogFuture = collectProcessLog(this.process);
139107

140108
// collect pod log
141-
collectPodLogIfNeeded();
109+
final Optional<CompletableFuture<?>> collectPodLogFuture = collectPodLogIfNeeded();
142110

143111
int processId = getProcessId(this.process);
144112

@@ -164,24 +132,14 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
164132
TaskExecutionStatus kubernetesStatus =
165133
ProcessUtils.getApplicationStatus(taskRequest.getK8sTaskExecutionContext(), taskRequest.getTaskAppId());
166134

167-
if (taskOutputFuture != null) {
168-
try {
169-
// Wait the task log process finished.
170-
taskOutputFuture.get();
171-
} catch (ExecutionException e) {
172-
log.error("Handle task log error", e);
173-
}
174-
}
135+
// Wait the task log process finished.
136+
collectProcessLogFuture.join();
175137

176-
if (podLogOutputFuture != null) {
177-
try {
178-
// Wait kubernetes pod log collection finished
179-
podLogOutputFuture.get();
180-
// delete pod after successful execution and log collection
181-
ProcessUtils.cancelApplication(taskRequest);
182-
} catch (ExecutionException e) {
183-
log.error("Handle pod log error", e);
184-
}
138+
if (collectPodLogFuture.isPresent()) {
139+
// Wait kubernetes pod log collection finished
140+
collectPodLogFuture.get().join();
141+
// delete pod after successful execution and log collection
142+
ProcessUtils.cancelApplication(taskRequest);
185143
}
186144

187145
// if SHELL task exit
@@ -227,16 +185,15 @@ public void cancelApplication() throws InterruptedException {
227185
ProcessUtils.cancelApplication(taskRequest);
228186
}
229187

230-
private void collectPodLogIfNeeded() {
188+
private Optional<CompletableFuture<?>> collectPodLogIfNeeded() {
231189
if (null == taskRequest.getK8sTaskExecutionContext()) {
232-
podLogOutputIsFinished = true;
233-
return;
190+
return Optional.empty();
234191
}
235192

236193
ExecutorService collectPodLogExecutorService = ThreadUtils
237194
.newSingleDaemonScheduledExecutorService("CollectPodLogOutput-thread-" + taskRequest.getTaskName());
238195

239-
podLogOutputFuture = collectPodLogExecutorService.submit(() -> {
196+
final CompletableFuture<Void> collectPodLogFuture = CompletableFuture.runAsync(() -> {
240197
// wait for launching (driver) pod
241198
ThreadUtils.sleep(SLEEP_TIME_MILLIS * 5L);
242199
try (
@@ -248,67 +205,43 @@ private void collectPodLogIfNeeded() {
248205
String line;
249206
try (BufferedReader reader = new BufferedReader(new InputStreamReader(watcher.getOutput()))) {
250207
while ((line = reader.readLine()) != null) {
251-
logBuffer.add(String.format("[K8S-pod-log-%s]: %s", taskRequest.getTaskName(), line));
208+
log.info("[K8S-pod-log-{}]: {}", taskRequest.getTaskName(), line);
252209
}
253210
}
254211
}
255212
} catch (Exception e) {
213+
log.error("Collect pod log error", e);
256214
throw new RuntimeException(e);
257-
} finally {
258-
podLogOutputIsFinished = true;
259215
}
260-
261-
});
216+
}, collectPodLogExecutorService);
262217

263218
collectPodLogExecutorService.shutdown();
219+
return Optional.of(collectPodLogFuture);
264220
}
265221

266-
private void parseProcessOutput(Process process) {
222+
private CompletableFuture<Void> collectProcessLog(Process process) {
267223
// todo: remove this this thread pool.
268-
ExecutorService getOutputLogService = ThreadUtils
269-
.newSingleDaemonScheduledExecutorService("ResolveOutputLog-thread-" + taskRequest.getTaskName());
270-
getOutputLogService.execute(() -> {
224+
final ExecutorService collectProcessLogService = ThreadUtils.newSingleDaemonScheduledExecutorService(
225+
"ResolveOutputLog-thread-" + taskRequest.getTaskName());
226+
final CompletableFuture<Void> collectProcessLogFuture = CompletableFuture.runAsync(() -> {
271227
TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
272228
try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
273229
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
274230
String line;
275231
while ((line = inReader.readLine()) != null) {
276-
logBuffer.add(line);
232+
log.info(" -> {}", line);
277233
taskOutputParameterParser.appendParseLog(line);
278234
}
279-
processLogOutputIsSuccess = true;
280235
} catch (Exception e) {
281236
log.error("Parse var pool error", e);
282-
processLogOutputIsSuccess = true;
283237
} finally {
284238
LogUtils.removeTaskInstanceLogFullPathMDC();
285239
}
286240
taskOutputParams = taskOutputParameterParser.getTaskOutputParams();
287-
});
288-
289-
getOutputLogService.shutdown();
241+
}, collectProcessLogService);
290242

291-
ExecutorService parseProcessOutputExecutorService = ThreadUtils
292-
.newSingleDaemonScheduledExecutorService("TaskInstanceLogOutput-thread-" + taskRequest.getTaskName());
293-
taskOutputFuture = parseProcessOutputExecutorService.submit(() -> {
294-
try {
295-
LogUtils.setTaskInstanceLogFullPathMDC(taskRequest.getLogPath());
296-
while (logBuffer.size() > 1 || !processLogOutputIsSuccess || !podLogOutputIsFinished) {
297-
if (logBuffer.size() > 1) {
298-
logHandler.accept(logBuffer);
299-
logBuffer.clear();
300-
logBuffer.add(EMPTY_STRING);
301-
} else {
302-
Thread.sleep(TaskConstants.DEFAULT_LOG_FLUSH_INTERVAL);
303-
}
304-
}
305-
} catch (Exception e) {
306-
log.error("Output task log error", e);
307-
} finally {
308-
LogUtils.removeTaskInstanceLogFullPathMDC();
309-
}
310-
});
311-
parseProcessOutputExecutorService.shutdown();
243+
collectProcessLogService.shutdown();
244+
return collectProcessLogFuture;
312245
}
313246

314247
/**

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractTask.java

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.apache.dolphinscheduler.plugin.task.api.parameters.AbstractParameters;
2424

2525
import java.util.Map;
26-
import java.util.StringJoiner;
27-
import java.util.concurrent.LinkedBlockingQueue;
2826
import java.util.regex.Matcher;
2927

3028
import lombok.Getter;
@@ -149,20 +147,6 @@ public TaskExecutionStatus getExitStatus() {
149147
return TaskExecutionStatus.FAILURE;
150148
}
151149

152-
/**
153-
* log handle
154-
*
155-
* @param logs log list
156-
*/
157-
public void logHandle(LinkedBlockingQueue<String> logs) {
158-
159-
StringJoiner joiner = new StringJoiner("\n\t");
160-
while (!logs.isEmpty()) {
161-
joiner.add(logs.poll());
162-
}
163-
log.info(" -> {}", joiner);
164-
}
165-
166150
/**
167151
* regular expressions match the contents between two specified strings
168152
*

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractYarnTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,11 @@
3434
@Slf4j
3535
public abstract class AbstractYarnTask extends AbstractRemoteTask {
3636

37-
private ShellCommandExecutor shellCommandExecutor;
37+
private final ShellCommandExecutor shellCommandExecutor;
3838

3939
public AbstractYarnTask(TaskExecutionContext taskRequest) {
4040
super(taskRequest);
41-
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskRequest);
41+
this.shellCommandExecutor = new ShellCommandExecutor(taskRequest);
4242
}
4343

4444
// todo split handle to submit and track

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/ShellCommandExecutor.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,10 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.api;
1919

20-
import java.util.concurrent.LinkedBlockingQueue;
21-
import java.util.function.Consumer;
22-
2320
public class ShellCommandExecutor extends AbstractCommandExecutor {
2421

25-
public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
26-
TaskExecutionContext taskRequest) {
27-
super(logHandler, taskRequest);
22+
public ShellCommandExecutor(TaskExecutionContext taskRequest) {
23+
super(taskRequest);
2824
}
2925

3026
}

dolphinscheduler-task-plugin/dolphinscheduler-task-chunjun/src/main/java/org/apache/dolphinscheduler/plugin/task/chunjun/ChunJunTask.java

Lines changed: 6 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,36 +60,22 @@ public class ChunJunTask extends AbstractTask {
6060
*/
6161
private static final String CHUNJUN_DIST_DIR = "${CHUNJUN_HOME}/chunjun-dist";
6262

63-
/**
64-
* chunJun parameters
65-
*/
6663
private ChunJunParameters chunJunParameters;
6764

68-
/**
69-
* shell command executor
70-
*/
71-
private ShellCommandExecutor shellCommandExecutor;
72-
73-
/**
74-
* taskExecutionContext
75-
*/
76-
private TaskExecutionContext taskExecutionContext;
65+
private final ShellCommandExecutor shellCommandExecutor;
7766

7867
public ChunJunTask(TaskExecutionContext taskExecutionContext) {
7968
super(taskExecutionContext);
80-
this.taskExecutionContext = taskExecutionContext;
81-
82-
this.shellCommandExecutor = new ShellCommandExecutor(this::logHandle, taskExecutionContext);
69+
this.shellCommandExecutor = new ShellCommandExecutor(taskExecutionContext);
8370
}
8471

8572
/**
8673
* init chunjun config
8774
*/
8875
@Override
8976
public void init() {
90-
chunJunParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ChunJunParameters.class);
91-
log.info("Initialize chunjun task params {}",
92-
JSONUtils.toPrettyJsonString(taskExecutionContext.getTaskParams()));
77+
chunJunParameters = JSONUtils.parseObject(taskRequest.getTaskParams(), ChunJunParameters.class);
78+
log.info("Initialize chunjun task params {}", JSONUtils.toPrettyJsonString(taskRequest.getTaskParams()));
9379

9480
if (!chunJunParameters.checkParameters()) {
9581
throw new RuntimeException("chunjun task params is not valid");
@@ -100,7 +86,7 @@ public void init() {
10086
@Override
10187
public void handle(TaskCallBack taskCallBack) throws TaskException {
10288
try {
103-
Map<String, Property> paramsMap = taskExecutionContext.getPrepareParamsMap();
89+
Map<String, Property> paramsMap = taskRequest.getPrepareParamsMap();
10490

10591
IShellInterceptorBuilder<?, ?> shellActuatorBuilder = ShellInterceptorBuilderFactory.newBuilder()
10692
.properties(ParameterUtils.convert(paramsMap))
@@ -133,9 +119,7 @@ public void handle(TaskCallBack taskCallBack) throws TaskException {
133119
*/
134120
private String buildChunJunJsonFile(Map<String, Property> paramsMap) throws Exception {
135121
// generate json
136-
String fileName = String.format("%s/%s_job.json",
137-
taskExecutionContext.getExecutePath(),
138-
taskExecutionContext.getTaskAppId());
122+
String fileName = String.format("%s/%s_job.json", taskRequest.getExecutePath(), taskRequest.getTaskAppId());
139123

140124
String json = null;
141125

0 commit comments

Comments
 (0)