diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 415d56abb600..876d33d739a4 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -406,7 +406,7 @@ stream_queue_arguments(ArgumentsAcc, Arguments) stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) -> stream_queue_arguments([{<<"x-max-length-bytes">>, long, - binary_to_integer(Value)}] + rabbit_data_coercion:to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"max-length-bytes">>, Arguments)); stream_queue_arguments(ArgumentsAcc, @@ -418,14 +418,14 @@ stream_queue_arguments(ArgumentsAcc, #{<<"stream-max-segment-size-bytes">> := Value} = Arguments) -> stream_queue_arguments([{<<"x-stream-max-segment-size-bytes">>, long, - binary_to_integer(Value)}] + rabbit_data_coercion:to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"stream-max-segment-size-bytes">>, Arguments)); stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Arguments) -> stream_queue_arguments([{<<"x-initial-cluster-size">>, long, - binary_to_integer(Value)}] + rabbit_data_coercion:to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"initial-cluster-size">>, Arguments)); stream_queue_arguments(ArgumentsAcc, @@ -437,7 +437,7 @@ stream_queue_arguments(ArgumentsAcc, stream_queue_arguments(ArgumentsAcc, #{<<"stream-filter-size-bytes">> := Value} = Arguments) -> stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long, - binary_to_integer(Value)}] + rabbit_data_coercion:to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"stream-filter-size-bytes">>, Arguments)); stream_queue_arguments(ArgumentsAcc, _Arguments) -> diff --git a/deps/rabbitmq_stream_management/priv/www/js/stream.js b/deps/rabbitmq_stream_management/priv/www/js/stream.js index 753eb68c9d11..9f615459276b 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/stream.js +++ b/deps/rabbitmq_stream_management/priv/www/js/stream.js @@ -10,7 +10,15 @@ dispatcher_add(function(sammy) { 'consumers': '/stream/connections/' + vhost + '/' + name + '/consumers', 'publishers': '/stream/connections/' + vhost + '/' + name + '/publishers'}, 'streamConnection', '#/stream/connections'); - }); + }); + sammy.get('#/stream/super-streams', function() { + render({'vhosts': '/vhosts'}, 'superStreams', '#/stream/super-streams') + }); + sammy.put('#/stream/super-streams', function() { + put_cast_params(this, '/stream/super-streams/:vhost/:name', + ['name', 'pattern', 'policy'], ['priority'], []); + location.href = "/#/queues"; + }); // not exactly dispatcher stuff, but we have to make sure this is called before // HTTP requests are made in case of refresh of the queue page QUEUE_EXTRA_CONTENT_REQUESTS.push(function(vhost, queue) { @@ -33,6 +41,7 @@ dispatcher_add(function(sammy) { }); NAVIGATION['Stream Connections'] = ['#/stream/connections', "monitoring"]; +NAVIGATION['Super Streams'] = ['#/stream/super-streams', "management"]; var ALL_STREAM_CONNECTION_COLUMNS = {'Overview': [['user', 'User name', true], diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs new file mode 100644 index 000000000000..5934c8d79191 --- /dev/null +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/superStreams.ejs @@ -0,0 +1,70 @@ +

Super Streams

+ +<% if (ac.canAccessVhosts()) { %> +
+

Add a new super stream

+
+
+ +<% if (display.vhosts) { %> + + + + +<% } else { %> + +<% } %> + + + + + + + + + + + + +
+ +
*
+ + +
+ + *
+
+ +
+
+ + + + + + +
+ +
+
+
+<% } %> diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl new file mode 100644 index 000000000000..2301e9d5e0a5 --- /dev/null +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_super_stream_mgmt.erl @@ -0,0 +1,165 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% + +-module(rabbit_stream_super_stream_mgmt). + +-behaviour(rabbit_mgmt_extension). + +-export([dispatcher/0, + web_ui/0]). +-export([init/2, + content_types_accepted/2, + is_authorized/2, + resource_exists/2, + allowed_methods/2, + accept_content/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +-define(DEFAULT_RPC_TIMEOUT, 30_000). + +dispatcher() -> + [{"/stream/super-streams/:vhost/:name", ?MODULE, []}]. + +web_ui() -> + []. + +%%-------------------------------------------------------------------- + +init(Req, _State) -> + {cowboy_rest, + rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), + #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +content_types_accepted(ReqData, Context) -> + {[{{<<"application">>, <<"json">>, '*'}, accept_content}], ReqData, Context}. + +allowed_methods(ReqData, Context) -> + {[<<"PUT">>, <<"OPTIONS">>], ReqData, Context}. + +resource_exists(ReqData, Context) -> + %% just checking that the vhost requested exists + {case rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (_) -> [] end) of + vhost_not_found -> false; + _ -> true + end, ReqData, Context}. + +is_authorized(ReqData, Context) -> + rabbit_mgmt_util:is_authorized_vhost(ReqData, Context). + +accept_content(ReqData0, #context{user = #user{username = ActingUser}} = Context) -> + %% TODO validate arguments? + VHost = rabbit_mgmt_util:id(vhost, ReqData0), + Name = rabbit_mgmt_util:id(name, ReqData0), + rabbit_mgmt_util:with_decode( + [], ReqData0, Context, + fun([], BodyMap, ReqData) -> + PartitionsBin = maps:get(partitions, BodyMap, undefined), + BindingKeysStr = maps:get('binding-keys', BodyMap, undefined), + case validate_partitions_or_binding_keys(PartitionsBin, BindingKeysStr, ReqData, Context) of + ok -> + Arguments = maps:get(arguments, BodyMap, #{}), + Node = get_node(BodyMap), + case PartitionsBin of + undefined -> + BindingKeys = binding_keys(BindingKeysStr), + Streams = streams_from_binding_keys(Name, BindingKeys), + create_super_stream(Node, VHost, Name, Streams, + Arguments, BindingKeys, ActingUser, + ReqData, Context); + _ -> + case validate_partitions(PartitionsBin, ReqData, Context) of + Partitions when is_integer(Partitions) -> + Streams = streams_from_partitions(Name, Partitions), + RoutingKeys = routing_keys(Partitions), + create_super_stream(Node, VHost, Name, Streams, + Arguments, RoutingKeys, ActingUser, + ReqData, Context); + Error -> + Error + end + end; + Error -> + Error + end + end). + +%%------------------------------------------------------------------- +get_node(Props) -> + case maps:get(<<"node">>, Props, undefined) of + undefined -> node(); + N -> rabbit_nodes:make( + binary_to_list(N)) + end. + +binding_keys(BindingKeysStr) -> + [rabbit_data_coercion:to_binary( + string:strip(K)) + || K + <- string:tokens( + rabbit_data_coercion:to_list(BindingKeysStr), ",")]. + +routing_keys(Partitions) -> + [integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)]. + +streams_from_binding_keys(Name, BindingKeys) -> + [list_to_binary(binary_to_list(Name) + ++ "-" + ++ binary_to_list(K)) + || K <- BindingKeys]. + +streams_from_partitions(Name, Partitions) -> + [list_to_binary(binary_to_list(Name) + ++ "-" + ++ integer_to_list(K)) + || K <- lists:seq(0, Partitions - 1)]. + +create_super_stream(NodeName, VHost, SuperStream, Streams, Arguments, + RoutingKeys, ActingUser, ReqData, Context) -> + case rabbit_misc:rpc_call(NodeName, + rabbit_stream_manager, + create_super_stream, + [VHost, + SuperStream, + Streams, + Arguments, + RoutingKeys, + ActingUser], + ?DEFAULT_RPC_TIMEOUT) of + ok -> + {true, ReqData, Context}; + {error, Reason} -> + rabbit_mgmt_util:bad_request(io_lib:format("~p", [Reason]), + ReqData, Context) + end. + +validate_partitions_or_binding_keys(undefined, undefined, ReqData, Context) -> + rabbit_mgmt_util:bad_request("Must specify partitions or binding keys", ReqData, Context); +validate_partitions_or_binding_keys(_, undefined, _, _) -> + ok; +validate_partitions_or_binding_keys(undefined, _, _, _) -> + ok; +validate_partitions_or_binding_keys(_, _, ReqData, Context) -> + rabbit_mgmt_util:bad_request("Specify partitions or binding keys, not both", ReqData, Context). + +validate_partitions(PartitionsBin, ReqData, Context) -> + try + case rabbit_data_coercion:to_integer(PartitionsBin) of + Int when Int < 1 -> + rabbit_mgmt_util:bad_request("The partition number must be greater than 0", ReqData, Context); + Int -> + Int + end + catch + _:_ -> + rabbit_mgmt_util:bad_request("The partitions must be a number", ReqData, Context) + end. diff --git a/deps/rabbitmq_stream_management/test/http_SUITE.erl b/deps/rabbitmq_stream_management/test/http_SUITE.erl index baa95a5c375a..b8cf83f02203 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE.erl +++ b/deps/rabbitmq_stream_management/test/http_SUITE.erl @@ -10,13 +10,20 @@ -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl"). +-import(rabbit_mgmt_test_util, [ + http_put/4 + ]). + -compile(export_all). all() -> [{group, non_parallel_tests}]. groups() -> - [{non_parallel_tests, [], [stream_management]}]. + [{non_parallel_tests, [], [ + stream_management, + create_super_stream + ]}]. %% ------------------------------------------------------------------- %% Testsuite setup/teardown. @@ -27,6 +34,7 @@ init_per_suite(Config) -> true -> {skip, "suite is not mixed versions compatible"}; _ -> + inets:start(), rabbit_ct_helpers:log_environment(), Config1 = rabbit_ct_helpers:set_config(Config, @@ -108,6 +116,30 @@ stream_management(Config) -> {"MANAGEMENT_PORT=~b", [ManagementPortNode]}]), {ok, _} = MakeResult. +create_super_stream(Config) -> + http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3, + 'binding-keys' => "streamA"}, + ?BAD_REQUEST), + http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => "this is not a partition"}, + ?BAD_REQUEST), + http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3}, + {group, '2xx'}), + http_put(Config, "/stream/super-streams/%2F/cucumber", #{'binding-keys' => "fresh-cucumber"}, + {group, '2xx'}), + http_put(Config, "/stream/super-streams/%2F/aubergine", + #{partitions => 3, + arguments => #{'max-length-bytes' => 1000000, + 'max-age' => <<"1h">>, + 'stream-max-segment-size' => 500, + 'initial-cluster-size' => 2, + 'queue-leader-locator' => <<"client-local">>}}, + {group, '2xx'}), + http_put(Config, "/stream/super-streams/%2F/watermelon", + #{partitions => 3, + arguments => #{'queue-leader-locator' => <<"remote">>}}, + ?BAD_REQUEST), + ok. + get_stream_port(Config) -> rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).