Skip to content

Commit 0fd13f7

Browse files
authored
[DXP-1336] Publishing with properties
1 parent 4506dc0 commit 0fd13f7

File tree

4 files changed

+37
-22
lines changed

4 files changed

+37
-22
lines changed

core/src/main/java/dev/streamx/cli/command/ingestion/IngestionMessageJsonFactory.java

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,6 @@ public class IngestionMessageJsonFactory {
1212

1313
private static final ObjectMapper mapper = new ObjectMapper();
1414

15-
/**
16-
* @param key of resource
17-
* @param action publish or unpublish
18-
* @param payloadContent to include as a payload in returned JsonNode
19-
* @param payloadType type matching registered ingestion API schema
20-
* @return JsonNode representation of {@link dev.streamx.clients.ingestion.publisher.Message}
21-
*/
22-
23-
public JsonNode from(
24-
String key,
25-
String action,
26-
JsonNode payloadContent,
27-
String payloadType
28-
) {
29-
return from(key, action, payloadContent, Map.of(), payloadType);
30-
}
31-
3215
/**
3316
* @param key of resource
3417
* @param action publish or unpublish

core/src/main/java/dev/streamx/cli/command/ingestion/publish/PublishCommand.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
import jakarta.inject.Inject;
1515
import java.util.Collection;
1616
import java.util.List;
17+
import java.util.Map;
1718
import java.util.Optional;
1819
import java.util.stream.Stream;
1920
import org.jetbrains.annotations.NotNull;
2021
import picocli.CommandLine.ArgGroup;
2122
import picocli.CommandLine.Command;
23+
import picocli.CommandLine.Option;
2224

2325
@Command(name = PublishCommand.COMMAND_NAME,
2426
mixinStandardHelpOptions = true,
@@ -34,6 +36,10 @@ public class PublishCommand extends BaseIngestionCommand {
3436
@ArgGroup(exclusive = false)
3537
PayloadArguments payloadArguments;
3638

39+
@Option(names = { "-p", "--property" },
40+
description = "Message property")
41+
Map<String, String> properties;
42+
3743
@Inject
3844
PayloadResolver payloadResolver;
3945

@@ -62,6 +68,7 @@ private JsonNode prepareIngestionMessage() {
6268
publishArguments.getKey(),
6369
COMMAND_NAME,
6470
payload,
71+
properties,
6572
payloadPropertyName
6673
);
6774
}

core/src/test/java/dev/streamx/cli/command/ingestion/IngestionArgumentsValidationTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,10 @@ public void shouldRejectIngestionWithMissingParameter(QuarkusMainLauncher launch
4949
// then
5050
expectError(result, """
5151
Missing required parameter for option '--string-fragment' (<string>)
52-
Usage: streamx publish [-hV] [[--ingestion-url=<restIngestionServiceUrl>]]
53-
(<channel> <key> [payloadFile]) [[[-s=<string> |
54-
-b=<binary> | -j=<json>]]...]
52+
Usage: streamx publish [-hV] [-p=<String=String>]...
53+
[[--ingestion-url=<restIngestionServiceUrl>]] (<channel>
54+
<key> [payloadFile]) [[[-s=<string> | -b=<binary> |
55+
-j=<json>]]...]
5556
5657
Try 'streamx publish --help' for more information.""");
5758
}

core/src/test/java/dev/streamx/cli/command/ingestion/publish/PublishPayloadCommandTest.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,26 @@ public void shouldPublishReplacedJsonPath(QuarkusMainLauncher launcher) {
120120
"{\"content\": {\"bytes\": \"<h1>Hello changed value!</h1>\"}}"))));
121121
}
122122

123+
@Test
124+
public void shouldPublishWithProperties(QuarkusMainLauncher launcher) {
125+
// when
126+
LaunchResult result = launcher.launch("publish",
127+
"--ingestion-url=" + getIngestionUrl(),
128+
"-s", "content.bytes=<h1>Hello changed value!</h1>",
129+
"-p", "sx:type=type/subtype",
130+
CHANNEL, KEY);
131+
132+
// then
133+
expectSuccess(result);
134+
wm.verify(postRequestedFor(urlEqualTo(
135+
getPublicationPath(CHANNEL)))
136+
.withRequestBody(
137+
equalToJson(
138+
buildResponseWith(
139+
"{\"content\": {\"bytes\": \"<h1>Hello changed value!</h1>\"}}",
140+
"{ \"sx:type\": \"type/subtype\" }"))));
141+
}
142+
123143
@Test
124144
public void shouldPublishReplacedWithObjectJsonPath(QuarkusMainLauncher launcher) {
125145
// when
@@ -253,18 +273,22 @@ public void shouldPublishReplacedJsonPathWithBinaryValue(QuarkusMainLauncher lau
253273
}
254274

255275
private String buildResponseWith(String content) {
276+
return buildResponseWith(content, "{ }");
277+
}
278+
279+
private String buildResponseWith(String content, String properties) {
256280
return
257281
"""
258282
{
259283
"key" : "index.html",
260284
"action" : "publish",
261285
"eventTime" : null,
262-
"properties" : { },
286+
"properties" : %s,
263287
"payload" : {
264288
"dev.streamx.blueprints.data.Page" : %s
265289
}
266290
}
267-
""".formatted(content);
291+
""".formatted(properties, content);
268292
}
269293

270294

0 commit comments

Comments
 (0)