Skip to content

Commit a996b4a

Browse files
committed
[Fix #987] Refactor shell impl
Signed-off-by: fjtirado <[email protected]>
1 parent 51d85a9 commit a996b4a

File tree

1 file changed

+109
-108
lines changed

1 file changed

+109
-108
lines changed

impl/core/src/main/java/io/serverlessworkflow/impl/executors/RunShellExecutor.java

Lines changed: 109 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -17,141 +17,142 @@
1717

1818
import io.serverlessworkflow.api.types.RunShell;
1919
import io.serverlessworkflow.api.types.RunTaskConfiguration;
20+
import io.serverlessworkflow.api.types.RunTaskConfiguration.ProcessReturnType;
2021
import io.serverlessworkflow.api.types.Shell;
2122
import io.serverlessworkflow.impl.TaskContext;
22-
import io.serverlessworkflow.impl.WorkflowApplication;
2323
import io.serverlessworkflow.impl.WorkflowContext;
2424
import io.serverlessworkflow.impl.WorkflowDefinition;
25-
import io.serverlessworkflow.impl.WorkflowError;
26-
import io.serverlessworkflow.impl.WorkflowException;
2725
import io.serverlessworkflow.impl.WorkflowModel;
2826
import io.serverlessworkflow.impl.WorkflowModelFactory;
2927
import io.serverlessworkflow.impl.WorkflowUtils;
30-
import io.serverlessworkflow.impl.expressions.ExpressionUtils;
28+
import io.serverlessworkflow.impl.WorkflowValueResolver;
3129
import java.io.IOException;
30+
import java.io.UncheckedIOException;
3231
import java.nio.charset.StandardCharsets;
32+
import java.util.LinkedHashMap;
3333
import java.util.Map;
34+
import java.util.Optional;
3435
import java.util.concurrent.CompletableFuture;
36+
import java.util.stream.Collectors;
3537

3638
public class RunShellExecutor implements RunnableTask<RunShell> {
3739

38-
private ShellResultSupplier shellResultSupplier;
39-
private ProcessBuilderSupplier processBuilderSupplier;
40+
private WorkflowValueResolver<String> shellCommand;
41+
private Map<WorkflowValueResolver<String>, Optional<WorkflowValueResolver<String>>>
42+
shellArguments;
43+
private Optional<WorkflowValueResolver<Map<String, Object>>> shellEnv;
44+
private Optional<ProcessReturnType> returnType;
4045

41-
@FunctionalInterface
42-
private interface ShellResultSupplier {
43-
WorkflowModel apply(
44-
TaskContext taskContext, WorkflowModel input, ProcessBuilder processBuilder);
45-
}
46+
@Override
47+
public CompletableFuture<WorkflowModel> apply(
48+
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel model) {
49+
StringBuilder commandBuilder =
50+
new StringBuilder(shellCommand.apply(workflowContext, taskContext, model));
51+
for (var entry : shellArguments.entrySet()) {
52+
commandBuilder.append(" ").append(entry.getKey().apply(workflowContext, taskContext, model));
53+
entry
54+
.getValue()
55+
.ifPresent(
56+
v -> commandBuilder.append("=").append(v.apply(workflowContext, taskContext, model)));
57+
}
4658

47-
@FunctionalInterface
48-
private interface ProcessBuilderSupplier {
49-
ProcessBuilder apply(WorkflowContext workflowContext, TaskContext taskContext);
59+
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
60+
shellEnv.ifPresent(
61+
map -> {
62+
for (Map.Entry<String, Object> entry :
63+
map.apply(workflowContext, taskContext, model).entrySet()) {
64+
builder.environment().put(entry.getKey(), (String) entry.getValue());
65+
}
66+
});
67+
68+
return returnType
69+
.map(
70+
type ->
71+
CompletableFuture.supplyAsync(
72+
() ->
73+
buildResultFromProcess(
74+
workflowContext.definition().application().modelFactory(),
75+
uncheckedStart(builder),
76+
type)
77+
.orElse(model),
78+
workflowContext.definition().application().executorService()))
79+
.orElseGet(
80+
() -> {
81+
workflowContext
82+
.definition()
83+
.application()
84+
.executorService()
85+
.submit(() -> uncheckedStart(builder));
86+
return CompletableFuture.completedFuture(model);
87+
});
5088
}
5189

52-
@Override
53-
public CompletableFuture<WorkflowModel> apply(
54-
WorkflowContext workflowContext, TaskContext taskContext, WorkflowModel input) {
55-
ProcessBuilder processBuilder = this.processBuilderSupplier.apply(workflowContext, taskContext);
56-
return CompletableFuture.supplyAsync(
57-
() -> this.shellResultSupplier.apply(taskContext, input, processBuilder));
90+
private Process uncheckedStart(ProcessBuilder builder) {
91+
try {
92+
return builder.start();
93+
} catch (IOException e) {
94+
throw new UncheckedIOException(e);
95+
}
5896
}
5997

6098
@Override
6199
public void init(RunShell taskConfiguration, WorkflowDefinition definition) {
62100
Shell shell = taskConfiguration.getShell();
63-
final String shellCommand = shell.getCommand();
64-
65-
if (shellCommand == null || shellCommand.isBlank()) {
101+
if (!WorkflowUtils.isValid(taskConfiguration.getShell().getCommand())) {
66102
throw new IllegalStateException("Missing shell command in RunShell task configuration");
67103
}
68-
this.processBuilderSupplier =
69-
(workflowContext, taskContext) -> {
70-
WorkflowApplication application = definition.application();
71-
72-
StringBuilder commandBuilder =
73-
new StringBuilder(
74-
ExpressionUtils.isExpr(shellCommand)
75-
? WorkflowUtils.buildStringFilter(application, shellCommand)
76-
.apply(workflowContext, taskContext, taskContext.input())
77-
: shellCommand);
78-
79-
if (shell.getArguments() != null
80-
&& shell.getArguments().getAdditionalProperties() != null) {
81-
for (Map.Entry<String, Object> entry :
82-
shell.getArguments().getAdditionalProperties().entrySet()) {
83-
commandBuilder
84-
.append(" ")
85-
.append(
86-
ExpressionUtils.isExpr(entry.getKey())
87-
? WorkflowUtils.buildStringFilter(application, entry.getKey())
88-
.apply(workflowContext, taskContext, taskContext.input())
89-
: entry.getKey());
90-
if (entry.getValue() != null) {
91-
92-
commandBuilder
93-
.append("=")
94-
.append(
95-
ExpressionUtils.isExpr(entry.getValue())
96-
? WorkflowUtils.buildStringFilter(
97-
application, entry.getValue().toString())
98-
.apply(workflowContext, taskContext, taskContext.input())
99-
: entry.getValue().toString());
100-
}
101-
}
102-
}
103-
104-
// TODO: support Windows cmd.exe
105-
ProcessBuilder builder = new ProcessBuilder("sh", "-c", commandBuilder.toString());
106-
if (shell.getEnvironment() != null
107-
&& shell.getEnvironment().getAdditionalProperties() != null) {
108-
for (Map.Entry<String, Object> entry :
109-
shell.getEnvironment().getAdditionalProperties().entrySet()) {
110-
String value =
111-
ExpressionUtils.isExpr(entry.getValue())
112-
? WorkflowUtils.buildStringFilter(application, entry.getValue().toString())
113-
.apply(workflowContext, taskContext, taskContext.input())
114-
: entry.getValue().toString();
115-
116-
// configure environments
117-
builder.environment().put(entry.getKey(), value);
118-
}
119-
}
120-
return builder;
121-
};
122-
123-
this.shellResultSupplier =
124-
(taskContext, input, processBuilder) -> {
125-
try {
126-
Process process = processBuilder.start();
127-
return taskConfiguration.isAwait()
128-
? buildResultFromProcess(taskConfiguration, definition, process)
129-
: input;
130-
} catch (IOException | InterruptedException e) {
131-
throw new WorkflowException(WorkflowError.runtime(taskContext, e).build(), e);
132-
}
133-
};
104+
shellCommand =
105+
WorkflowUtils.buildStringFilter(
106+
definition.application(), taskConfiguration.getShell().getCommand());
107+
108+
shellArguments =
109+
shell.getArguments() != null && shell.getArguments().getAdditionalProperties() != null
110+
? shell.getArguments().getAdditionalProperties().entrySet().stream()
111+
.collect(
112+
Collectors.toMap(
113+
e -> WorkflowUtils.buildStringFilter(definition.application(), e.getKey()),
114+
e ->
115+
e.getValue() != null
116+
? Optional.of(
117+
WorkflowUtils.buildStringFilter(
118+
definition.application(), e.getValue().toString()))
119+
: Optional.empty(),
120+
(x, y) -> y,
121+
LinkedHashMap::new))
122+
: Map.of();
123+
124+
shellEnv =
125+
shell.getEnvironment() != null && shell.getEnvironment().getAdditionalProperties() != null
126+
? Optional.of(
127+
WorkflowUtils.buildMapResolver(
128+
definition.application(), shell.getEnvironment().getAdditionalProperties()))
129+
: Optional.empty();
130+
131+
returnType =
132+
taskConfiguration.isAwait() ? Optional.of(taskConfiguration.getReturn()) : Optional.empty();
134133
}
135134

136-
/**
137-
* Builds the WorkflowModel result from the executed process. It waits for the process to finish
138-
* and captures the exit code, stdout, and stderr based on the task configuration.
139-
*/
140-
private WorkflowModel buildResultFromProcess(
141-
RunShell taskConfiguration, WorkflowDefinition definition, Process process)
142-
throws IOException, InterruptedException {
143-
int exitCode = process.waitFor();
144-
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
145-
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
146-
147-
WorkflowModelFactory modelFactory = definition.application().modelFactory();
148-
return switch (taskConfiguration.getReturn()) {
149-
case ALL -> modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
150-
case NONE -> modelFactory.fromNull();
151-
case CODE -> modelFactory.from(exitCode);
152-
case STDOUT -> modelFactory.from(stdout.trim());
153-
case STDERR -> modelFactory.from(stderr.trim());
154-
};
135+
private static Optional<WorkflowModel> buildResultFromProcess(
136+
WorkflowModelFactory modelFactory, Process process, ProcessReturnType type) {
137+
try {
138+
int exitCode = process.waitFor();
139+
String stdout = new String(process.getInputStream().readAllBytes(), StandardCharsets.UTF_8);
140+
String stderr = new String(process.getErrorStream().readAllBytes(), StandardCharsets.UTF_8);
141+
return Optional.of(
142+
switch (type) {
143+
case ALL ->
144+
modelFactory.fromAny(new ProcessResult(exitCode, stdout.trim(), stderr.trim()));
145+
case NONE -> modelFactory.fromNull();
146+
case CODE -> modelFactory.from(exitCode);
147+
case STDOUT -> modelFactory.from(stdout.trim());
148+
case STDERR -> modelFactory.from(stderr.trim());
149+
});
150+
} catch (IOException e) {
151+
throw new UncheckedIOException(e);
152+
} catch (InterruptedException e) {
153+
Thread.currentThread().interrupt();
154+
return Optional.empty();
155+
}
155156
}
156157

157158
@Override

0 commit comments

Comments
 (0)