Skip to content

[Improvement][Task] Optimization of log processing for RemoteShellTaskΒ #17788

@JorringHsiao

Description

@JorringHsiao

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

risk

Based on the current log processing logic, if the remote executed script outputs a large amount of logs at a certain point in time, the server will generate a large strings.
Moreover, if there are many such tasks being executed simultaneously and handling so many logs at the same time, the generation of a large number of strings will cause frequent GC, and even an OOM situation.

source code

public class RemoteExecutor implements AutoCloseable {

    // ...

    public void track(String taskId) throws Exception {
        int logN = 0;
        String pid;
        log.info("Remote shell task log:");
        TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
        do {
            pid = getTaskPid(taskId);
            String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN + 1, getRemoteShellHome(), taskId);
            // πŸ‘‡  tail -n +XXX XXX.log
            String logLine = runRemote(trackCommand);
            if (StringUtils.isEmpty(logLine)) {
                Thread.sleep(TRACK_INTERVAL);
            } else {
                // πŸ‘‡ Here, many temporary string objects were created in order to obtain the number of lines.
                logN += logLine.split("\n").length;
                log.info(logLine);
                taskOutputParameterParser.appendParseLog(logLine);
            }
        } while (StringUtils.isNotEmpty(pid));
        taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
    }

    public String runRemote(String command) throws IOException {
        try (
                ChannelExec channel = getSession().createExecChannel(command);
                // πŸ‘‡ a large amount of logs this time --> large byte array
                ByteArrayOutputStream out = new ByteArrayOutputStream();
                ByteArrayOutputStream err = new ByteArrayOutputStream()) {

            channel.setOut(System.out);
            channel.setOut(out);
            channel.setErr(err);
            channel.open();
            channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
            channel.close();
            Integer exitStatus = channel.getExitStatus();
            if (exitStatus == null || exitStatus != 0) {
                throw new TaskException(
                        "Remote shell task error, exitStatus: " + exitStatus + " error message: " + err);
            }
            // πŸ‘‡ the number of large objects will multiply increase
            return out.toString();
        }
    }
}

a simple example of improvement

remote --> client -> PipedOutputStream -> PipedInputStream --> consumer read line

public void runRemote(String command, Consumer<InputStream> handlerOut) throws IOException {
    try (
            ChannelExec channel = getSession().createExecChannel(command);
            PipedInputStream in = new PipedInputStream();       // πŸ‘ˆ
            PipedOutputStream out = new PipedOutputStream(in);  // πŸ‘ˆ
            ByteArrayOutputStream err = new ByteArrayOutputStream()) {

        channel.setOut(System.out);
        channel.setOut(out);
        channel.setErr(err);
        channel.open();
        handlerOut.accept(in);  // πŸ‘ˆ
        channel.waitFor(EnumSet.of(ClientChannelEvent.CLOSED), 0);
        channel.close();
        Integer exitStatus = channel.getExitStatus();
        if (exitStatus == null || exitStatus != 0) {
            throw new TaskException(
                    "Remote shell task error, exitStatus: " + exitStatus + " error message: " + err);
        }
    }
}

public void track(String taskId) throws Exception {
    AtomicInteger logN = new AtomicInteger();
    String pid;
    log.info("Remote shell task log:");
    TaskOutputParameterParser taskOutputParameterParser = new TaskOutputParameterParser();
    do {
        pid = getTaskPid(taskId);
        int lastLogN = logN.get();  // πŸ‘ˆ
        String trackCommand = String.format(COMMAND.TRACK_COMMAND, logN.get() + 1, getRemoteShellHome(), taskId);
        // πŸ‘‡
        runRemote(trackCommand, in -> {
            try (InputStreamReader inReader = new InputStreamReader(in);
                    BufferedReader reader = new BufferedReader(inReader)
            ) {
                String logLine;
                while ((logLine = reader.readLine()) != null) {
                    logN.incrementAndGet();
                    log.info(logLine);
                    taskOutputParameterParser.appendParseLog(logLine);
                }
            } catch (IOException e) {
                // do sth ...
            }
        });
        if (lastLogN == logN.get()) {
            Thread.sleep(TRACK_INTERVAL);
        }
    } while (StringUtils.isNotEmpty(pid));
    taskOutputParams.putAll(taskOutputParameterParser.getTaskOutputParams());
}

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Labels

backendimprovementmake more easy to user or prompt friendly

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions