Skip to content

Commit 4506dc0

Browse files
authored
[DXP-1804] streamx batch enhancements
1 parent 0e34777 commit 4506dc0

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1047
-58
lines changed

core/src/main/java/dev/streamx/cli/command/dev/DevCommand.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import dev.streamx.cli.BannerPrinter;
66
import dev.streamx.cli.VersionProvider;
7+
import dev.streamx.cli.command.dev.event.DevReady;
78
import dev.streamx.cli.command.meshprocessing.MeshConfig;
89
import dev.streamx.cli.command.meshprocessing.MeshManager;
910
import dev.streamx.cli.command.meshprocessing.MeshResolver;
@@ -14,6 +15,7 @@
1415
import dev.streamx.runner.container.PulsarContainer;
1516
import dev.streamx.runner.exception.ContainerStartupTimeoutException;
1617
import io.quarkus.runtime.Quarkus;
18+
import jakarta.enterprise.event.Event;
1719
import jakarta.inject.Inject;
1820
import java.io.IOException;
1921
import java.nio.file.Files;
@@ -66,6 +68,9 @@ public class DevCommand implements Runnable {
6668
@Inject
6769
DashboardRunner dashboardRunner;
6870

71+
@Inject
72+
Event<DevReady> devReadyEvent;
73+
6974
@Override
7075
public void run() {
7176
try {
@@ -91,6 +96,8 @@ public void run() {
9196
meshManager.start();
9297
}
9398

99+
devReadyEvent.fire(new DevReady());
100+
94101
Quarkus.waitForExit();
95102
} catch (ContainerStartupTimeoutException e) {
96103
throw DockerException.containerStartupFailed(
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package dev.streamx.cli.command.dev.event;
2+
3+
public class DevReady {
4+
5+
}

core/src/main/java/dev/streamx/cli/command/ingestion/IngestionMessageJsonFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,14 @@ public JsonNode from(
5656
root.put("action", action);
5757
root.putNull("eventTime");
5858
root.set("properties", propertiesObject);
59-
ObjectNode payload = root.putObject("payload");
60-
payload.set(payloadType, payloadContent);
59+
60+
if (payloadContent != null) {
61+
ObjectNode payload = root.putObject("payload");
62+
payload.set(payloadType, payloadContent);
63+
} else {
64+
root.putNull("payload");
65+
}
66+
6167
return root;
6268
}
6369
}

core/src/main/java/dev/streamx/cli/command/ingestion/batch/BatchCommand.java

Lines changed: 70 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import dev.streamx.cli.command.ingestion.BaseIngestionCommand;
88
import dev.streamx.cli.command.ingestion.IngestionMessageJsonFactory;
99
import dev.streamx.cli.command.ingestion.batch.BatchIngestionArguments.ActionType;
10+
import dev.streamx.cli.command.ingestion.batch.exception.EventSourceDescriptorException;
11+
import dev.streamx.cli.command.ingestion.batch.exception.FileIngestionException;
1012
import dev.streamx.cli.command.ingestion.batch.resolver.BatchPayloadResolver;
1113
import dev.streamx.cli.command.ingestion.batch.resolver.substitutor.Substitutor;
1214
import dev.streamx.cli.command.ingestion.batch.walker.EventSourceFileTreeWalker;
@@ -19,14 +21,17 @@
1921
import jakarta.inject.Inject;
2022
import java.io.IOException;
2123
import java.nio.file.Files;
24+
import java.nio.file.NoSuchFileException;
2225
import java.nio.file.Path;
2326
import java.nio.file.Paths;
2427
import java.util.HashMap;
2528
import java.util.Map;
2629
import java.util.Map.Entry;
30+
import java.util.function.Supplier;
2731
import org.jetbrains.annotations.NotNull;
2832
import picocli.CommandLine.ArgGroup;
2933
import picocli.CommandLine.Command;
34+
import picocli.CommandLine.ParameterException;
3035

3136
@Command(name = BatchCommand.COMMAND_NAME,
3237
mixinStandardHelpOptions = true,
@@ -40,12 +45,16 @@ public class BatchCommand extends BaseIngestionCommand {
4045
private final Map<String, String> schemaTypesCache = new HashMap<>();
4146
@ArgGroup(exclusive = false, multiplicity = "1")
4247
BatchIngestionArguments batchIngestionArguments;
48+
4349
@Inject
4450
BatchPayloadResolver payloadResolver;
51+
4552
@Inject
4653
Substitutor substitutor;
54+
4755
@Inject
4856
IngestionMessageJsonFactory ingestionMessageJsonFactory;
57+
4958
private State state;
5059

5160
@Override
@@ -57,6 +66,13 @@ protected void doRun(StreamxClient client) throws StreamxClientException {
5766
Path startDir = Paths.get(batchIngestionArguments.getSourceDirectory());
5867

5968
try {
69+
if (!Files.exists(startDir)) {
70+
throw new RuntimeException("Directory '" + startDir + "' does not exists.");
71+
}
72+
if (!Files.isDirectory(startDir)) {
73+
throw new RuntimeException("Specified path '" + startDir + "' must be a directory.");
74+
}
75+
6076
Files.walkFileTree(startDir, new EventSourceFileTreeWalker((file, eventSource) -> {
6177
try {
6278
updateCommandState(file, eventSource);
@@ -67,63 +83,94 @@ protected void doRun(StreamxClient client) throws StreamxClientException {
6783
}
6884
perform(publisher);
6985
} catch (StreamxClientException ex) {
70-
// Wrap into IOException to match method signatures
71-
throw new IOException(ex);
86+
throw new FileIngestionException(file, ex);
7287
}
7388
}));
89+
} catch (FileIngestionException e) {
90+
throw new RuntimeException(
91+
ExceptionUtils.appendLogSuggestion(
92+
"Error performing batch publication while processing '" + e.getPath() + "' file.\n"
93+
+ "\n"
94+
+ "Details:\n"
95+
+ e.getCause().getMessage()), e);
96+
} catch (EventSourceDescriptorException e) {
97+
throw new RuntimeException(
98+
ExceptionUtils.appendLogSuggestion(
99+
"Invalid descriptor: '" + e.getPath() + "'.\n"
100+
+ "\n"
101+
+ "Details:\n"
102+
+ e.getCause().getMessage()), e);
103+
} catch (NoSuchFileException e) {
104+
throw new RuntimeException("File '" + e.getFile() + "' does not exists.");
74105
} catch (IOException e) {
75-
if (e.getCause() instanceof StreamxClientException) {
76-
// Rethrow original Exception to leverage generic publication error handling from super
77-
throw (StreamxClientException) e.getCause();
78-
} else {
79-
throw new RuntimeException(
80-
ExceptionUtils.appendLogSuggestion(
81-
"Error performing batch publication using '" + startDir + "' directory.\n"
82-
+ "\n"
83-
+ "Details:\n"
84-
+ e.getMessage()), e);
85-
}
106+
throw new RuntimeException(
107+
ExceptionUtils.appendLogSuggestion(
108+
"Error performing batch publication using '" + startDir + "' directory.\n"
109+
+ "\n"
110+
+ "Details:\n"
111+
+ e.getMessage()), e);
86112
}
87113
}
88114

89115
@Override
90116
protected void perform(Publisher<JsonNode> publisher) throws StreamxClientException {
91117
String schemaType = schemaTypesCache.computeIfAbsent(getChannel(),
92118
c -> getPayloadPropertyName());
119+
ActionType action = state.action();
93120
SuccessResult result = publisher.send(ingestionMessageJsonFactory.from(
94121
state.key(),
95-
state.action().toString(),
122+
action.toString(),
96123
state.message(),
97124
state.properties(),
98125
schemaType
99126
));
100127

101128
printf("Sent data %s message using batch to '%s' with key '%s' at %d%n",
102-
state.action(), state.channel(), state.key(),
129+
action, state.channel(), state.key(),
103130
result.getEventTime());
104131

105132
}
106133

107134
private void updateCommandState(Path file, EventSourceDescriptor eventSource) {
108135

109136
String relativePath = calculateRelativePath(file, eventSource);
110-
111137
Map<String, String> variables = substitutor.createSubstitutionVariables(
112138
file.toString(), eventSource.getChannel(), relativePath);
113139

114-
final String key = substitutor.substitute(variables, eventSource.getKey());
115-
final Map<String, String> properties = new HashMap<>();
116-
final JsonNode message = payloadResolver.createPayload(eventSource, variables);
140+
String key = substitutor.substitute(variables, eventSource.getKey());
141+
JsonNode message = executeHandlingException(
142+
() -> payloadResolver.createPayload(eventSource, variables),
143+
() -> "Could not resolve payload for file '" + file + "'"
144+
);
145+
146+
Map<String, String> properties = createProperties(eventSource, variables);
147+
148+
this.state = new State(
149+
eventSource.getChannel(), key, properties, message, batchIngestionArguments.getAction()
150+
);
151+
}
117152

153+
private @NotNull Map<String, String> createProperties(EventSourceDescriptor eventSource,
154+
Map<String, String> variables) {
155+
final Map<String, String> properties = new HashMap<>();
118156
if (eventSource.getProperties() != null) {
119157
for (Entry<String, String> entry : eventSource.getProperties().entrySet()) {
120158
properties.put(entry.getKey(), substitutor.substitute(variables, entry.getValue()));
121159
}
122160
}
161+
return properties;
162+
}
123163

124-
this.state = new State(
125-
eventSource.getChannel(), key, properties, message, batchIngestionArguments.getAction()
126-
);
164+
private <T> T executeHandlingException(Supplier<T> function,
165+
Supplier<String> messageSupplier) {
166+
try {
167+
return function.get();
168+
} catch (RuntimeException e) {
169+
throw new ParameterException(spec.commandLine(),
170+
messageSupplier.get() + "\n"
171+
+ "\n"
172+
+ "Details:\n" + e.getMessage());
173+
}
127174
}
128175

129176
@NotNull
@@ -143,6 +190,4 @@ private record State(String channel, String key, Map<String, String> properties,
143190
ActionType action) {
144191

145192
}
146-
147-
148-
}
193+
}

core/src/main/java/dev/streamx/cli/command/ingestion/batch/BatchIngestionArguments.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public enum ActionType {
99
}
1010

1111
@Parameters(index = "0",
12-
description = "Action to perform, either ${COMPLETION-CANDIDATES}", arity = "1")
12+
description = "Action to perform (publish, unpublish)", arity = "1")
1313
ActionType action;
1414

1515
@Parameters(index = "1", description = "Source directory for the batch publication", arity = "1")
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package dev.streamx.cli.command.ingestion.batch.exception;
2+
3+
import java.io.IOException;
4+
import java.nio.file.Path;
5+
6+
public class EventSourceDescriptorException extends IOException {
7+
8+
private final Path path;
9+
10+
public EventSourceDescriptorException(Path path, Throwable cause) {
11+
super(cause);
12+
this.path = path;
13+
}
14+
15+
public Path getPath() {
16+
return path;
17+
}
18+
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package dev.streamx.cli.command.ingestion.batch.exception;
2+
3+
import java.io.IOException;
4+
import java.nio.file.Path;
5+
6+
public class FileIngestionException extends IOException {
7+
8+
private final Path path;
9+
10+
public FileIngestionException(Path path, Throwable cause) {
11+
super(cause);
12+
this.path = path;
13+
}
14+
15+
public Path getPath() {
16+
return path;
17+
}
18+
}

core/src/main/java/dev/streamx/cli/command/ingestion/batch/resolver/BatchCompositePayloadResolver.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,4 @@ public JsonNode createPayload(EventSourceDescriptor currentDescriptor,
3030
payload = binaryResolverStep.resolve(payload, variables);
3131
return jsonResolverStep.resolve(payload, variables);
3232
}
33-
34-
3533
}

core/src/main/java/dev/streamx/cli/command/ingestion/batch/walker/EventSourceFileTreeWalker.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.fasterxml.jackson.databind.ObjectMapper;
44
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
55
import dev.streamx.cli.command.ingestion.batch.EventSourceDescriptor;
6+
import dev.streamx.cli.command.ingestion.batch.exception.EventSourceDescriptorException;
67
import dev.streamx.cli.util.FileUtils;
78
import io.quarkus.logging.Log;
89
import java.io.IOException;
@@ -13,7 +14,9 @@
1314
import java.nio.file.SimpleFileVisitor;
1415
import java.nio.file.attribute.BasicFileAttributes;
1516
import java.util.List;
17+
import java.util.Objects;
1618
import java.util.Stack;
19+
import org.jetbrains.annotations.NotNull;
1720

1821
public class EventSourceFileTreeWalker extends SimpleFileVisitor<Path> {
1922

@@ -30,9 +33,7 @@ public EventSourceFileTreeWalker(EventSourceProcessor processor) {
3033
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) throws IOException {
3134
Path configFile = dir.resolve(EventSourceDescriptor.FILENAME);
3235
if (Files.exists(configFile) && Files.isRegularFile(configFile)) {
33-
EventSourceDescriptor descriptor = yamlMapper.readValue(configFile.toFile(),
34-
EventSourceDescriptor.class);
35-
descriptor.setSource(configFile);
36+
EventSourceDescriptor descriptor = parseDescriptor(configFile);
3637

3738
eventSourceStack.push(descriptor);
3839
Log.debugf("Found event source in %s: %s", dir, descriptor);
@@ -43,6 +44,25 @@ public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs) th
4344
return FileVisitResult.CONTINUE;
4445
}
4546

47+
private @NotNull EventSourceDescriptor parseDescriptor(Path configFile)
48+
throws IOException {
49+
try {
50+
EventSourceDescriptor descriptor = yamlMapper.readValue(configFile.toFile(),
51+
EventSourceDescriptor.class);
52+
53+
Objects.requireNonNull(descriptor.getKey(),
54+
"Missing required 'key' property in config file '%s'.'".formatted(configFile));
55+
Objects.requireNonNull(descriptor.getChannel(),
56+
"Missing required 'channel' property in config file '%s'.'".formatted(configFile));
57+
Objects.requireNonNull(descriptor.getPayload(),
58+
"Missing required 'payload' property in config file '%s'.'".formatted(configFile));
59+
descriptor.setSource(configFile);
60+
return descriptor;
61+
} catch (IOException e) {
62+
throw new EventSourceDescriptorException(configFile, e);
63+
}
64+
}
65+
4666
@Override
4767
public FileVisitResult postVisitDirectory(Path dir, IOException exc) {
4868
Path configFile = dir.resolve(EventSourceDescriptor.FILENAME);

core/src/test/java/dev/streamx/cli/command/MeshStopper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public class MeshStopper {
1515
private static final ScheduledExecutorService SCHEDULER =
1616
Executors.newSingleThreadScheduledExecutor();
1717
private static final AtomicBoolean scheduled = new AtomicBoolean(false);
18+
1819
@Inject
1920
StreamxRunner streamxRunner;
2021

0 commit comments

Comments
 (0)