Skip to content

Commit 549b113

Browse files
New command group: streams
1 parent cbb95fb commit 549b113

File tree

4 files changed

+140
-0
lines changed

4 files changed

+140
-0
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* `nodes` is a new command group for operations on nodes
99
* `parameters` is a new command group for operations on [runtime parameters](https://www.rabbitmq.com/docs/parameters)
1010
* `queues` is a new command group for operations on queues
11+
* `streams` is a new command group for operations on streams
1112
* `users` is a new command group for operations on users
1213
* `vhosts` is a new command group for operations on virtual hosts
1314
* Command groups are now ordered alphabetically

src/cli.rs

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,12 @@ pub fn parser(pre_flight_settings: PreFlightSettings) -> Command {
236236
))
237237
.subcommand_value_name("shovels")
238238
.subcommands(shovel_subcommands(pre_flight_settings.clone()));
239+
let streams_group = Command::new("streams")
240+
.about("Operations on streams")
241+
.infer_subcommands(pre_flight_settings.infer_subcommands)
242+
.infer_long_args(pre_flight_settings.infer_long_options)
243+
.subcommand_value_name("stream")
244+
.subcommands(streams_subcommands(pre_flight_settings.clone()));
239245
let tanzu_group = Command::new("tanzu")
240246
.about("Tanzu RabbitMQ-specific commands")
241247
.infer_subcommands(pre_flight_settings.infer_subcommands)
@@ -285,6 +291,7 @@ pub fn parser(pre_flight_settings: PreFlightSettings) -> Command {
285291
rebalance_group,
286292
show_group,
287293
shovels_group,
294+
streams_group,
288295
tanzu_group,
289296
users_group,
290297
vhosts_group,
@@ -1306,6 +1313,68 @@ fn queues_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 5] {
13061313
.map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
13071314
}
13081315

1316+
fn streams_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 3] {
1317+
let declare_cmd = Command::new("declare")
1318+
.about("Declares a stream")
1319+
.after_help(color_print::cformat!(
1320+
"<bold>Doc guide:</bold>: {}",
1321+
STREAM_GUIDE_URL
1322+
))
1323+
.arg(Arg::new("name").long("name").required(true).help("name"))
1324+
.arg(
1325+
Arg::new("expiration")
1326+
.long("expiration")
1327+
.help("stream expiration, e.g. 12h for 12 hours, 7D for 7 days, or 1M for 1 month")
1328+
.required(true)
1329+
.value_parser(value_parser!(String)),
1330+
)
1331+
.arg(
1332+
Arg::new("max_length_bytes")
1333+
.long("max-length-bytes")
1334+
.help("maximum stream length in bytes")
1335+
.required(false)
1336+
.value_parser(value_parser!(u64)),
1337+
)
1338+
.arg(
1339+
Arg::new("max_segment_length_bytes")
1340+
.long("stream-max-segment-size-bytes")
1341+
.help("maximum stream segment file length in bytes")
1342+
.required(false)
1343+
.value_parser(value_parser!(u64)),
1344+
)
1345+
.arg(
1346+
Arg::new("arguments")
1347+
.long("arguments")
1348+
.help("additional exchange arguments")
1349+
.required(false)
1350+
.default_value("{}")
1351+
.value_parser(value_parser!(String)),
1352+
);
1353+
let idempotently_arg = Arg::new("idempotently")
1354+
.long("idempotently")
1355+
.value_parser(value_parser!(bool))
1356+
.action(ArgAction::SetTrue)
1357+
.help("do not consider 404 Not Found API responses to be errors")
1358+
.required(false);
1359+
let delete_cmd = Command::new("delete")
1360+
.about("Deletes a queue")
1361+
.arg(
1362+
Arg::new("name")
1363+
.long("name")
1364+
.help("queue name")
1365+
.required(true),
1366+
)
1367+
.arg(idempotently_arg.clone());
1368+
let list_cmd = Command::new("list")
1369+
.long_about("Lists streams and queues and")
1370+
.after_help(color_print::cformat!(
1371+
"<bold>Doc guide</bold>: {}",
1372+
STREAM_GUIDE_URL
1373+
));
1374+
[declare_cmd, delete_cmd, list_cmd]
1375+
.map(|cmd| cmd.infer_long_args(pre_flight_settings.infer_long_options))
1376+
}
1377+
13091378
fn parameters_subcommands(pre_flight_settings: PreFlightSettings) -> [Command; 3] {
13101379
let list_cmd = Command::new("list")
13111380
.arg(

src/main.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,18 @@ fn dispatch_common_subcommand(
730730
let result = commands::list_shovels(client);
731731
res_handler.tabular_result(result)
732732
}
733+
("streams", "declare") => {
734+
let result = commands::declare_stream(client, &vhost, second_level_args);
735+
res_handler.no_output_on_success(result);
736+
}
737+
("streams", "delete") => {
738+
let result = commands::delete_queue(client, &vhost, second_level_args);
739+
res_handler.delete_operation_result(result);
740+
}
741+
("streams", "list") => {
742+
let result = commands::list_queues(client, &vhost);
743+
res_handler.tabular_result(result)
744+
}
733745
("users", "connections") => {
734746
let result = commands::list_user_connections(client, second_level_args);
735747
res_handler.tabular_result(result)

tests/streams_tests.rs

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,3 +74,61 @@ fn list_streams() -> Result<(), Box<dyn std::error::Error>> {
7474

7575
Ok(())
7676
}
77+
78+
#[test]
79+
fn streams_list() -> Result<(), Box<dyn std::error::Error>> {
80+
let vh1 = "stream_vhost_3";
81+
let vh2 = "stream_vhost_4";
82+
let s1 = "new_stream1";
83+
let s2 = "new_stream2";
84+
85+
delete_vhost(vh1).expect("failed to delete a virtual host");
86+
delete_vhost(vh2).expect("failed to delete a virtual host");
87+
88+
// declare vhost 1
89+
run_succeeds(["vhosts", "declare", "--name", vh1]);
90+
91+
// declare vhost 2
92+
run_succeeds(["vhosts", "declare", "--name", vh2]);
93+
94+
// declare a new stream in vhost 1
95+
run_succeeds([
96+
"-V",
97+
vh1,
98+
"streams",
99+
"declare",
100+
"--name",
101+
s1,
102+
"--expiration",
103+
"2D",
104+
]);
105+
106+
// declare new stream in vhost 2
107+
run_succeeds([
108+
"-V",
109+
vh2,
110+
"streams",
111+
"declare",
112+
"--name",
113+
s2,
114+
"--expiration",
115+
"12h",
116+
]);
117+
118+
await_queue_metric_emission();
119+
120+
// list streams in vhost 1
121+
run_succeeds(["-V", vh1, "streams", "list"])
122+
.stdout(predicate::str::contains(s1).and(predicate::str::contains("random_stream").not()));
123+
124+
// delete the stream in vhost 1
125+
run_succeeds(["-V", vh1, "streams", "delete", "--name", s1]);
126+
127+
// list streams in vhost 1
128+
run_succeeds(["-V", vh1, "streams", "list"]).stdout(predicate::str::contains(s1).not());
129+
130+
delete_vhost(vh1).expect("failed to delete a virtual host");
131+
delete_vhost(vh2).expect("failed to delete a virtual host");
132+
133+
Ok(())
134+
}

0 commit comments

Comments
 (0)