diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 51257fe64a90..26b7f44b2bab 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -18,19 +18,12 @@ -feature(maybe_expr, enable). --behaviour(gen_server). - -include_lib("rabbit_common/include/rabbit_framing.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit/include/amqqueue.hrl"). %% API --export([init/1, - handle_call/3, - handle_cast/2, - handle_info/2]). --export([start_link/1, - create/4, +-export([create/4, delete/3, create_super_stream/6, delete_super_stream/3, @@ -42,27 +35,53 @@ partitions/2, partition_index/3]). --record(state, {configuration}). - -start_link(Conf) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Conf], []). - -init([Conf]) -> - {ok, #state{configuration = Conf}}. - -spec create(binary(), binary(), #{binary() => binary()}, binary()) -> - {ok, map()} | - {error, reference_already_exists} | - {error, internal_error} | - {error, validation_failed}. + {ok, map()} | + {error, reference_already_exists} | + {error, internal_error} | + {error, validation_failed}. create(VirtualHost, Reference, Arguments, Username) -> - gen_server:call(?MODULE, - {create, VirtualHost, Reference, Arguments, Username}). + StreamQueueArguments = stream_queue_arguments(Arguments), + maybe + ok ?= validate_stream_queue_arguments(StreamQueueArguments), + do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) + else + error -> + {error, validation_failed}; + {error, _} = Err -> + Err + end. -spec delete(binary(), binary(), binary()) -> - {ok, deleted} | {error, reference_not_found}. + {ok, deleted} | {error, reference_not_found}. delete(VirtualHost, Reference, Username) -> - gen_server:call(?MODULE, {delete, VirtualHost, Reference, Username}). + Name = + #resource{virtual_host = VirtualHost, + kind = queue, + name = Reference}, + rabbit_log:debug("Trying to delete stream ~tp", [Reference]), + case rabbit_amqqueue:lookup(Name) of + {ok, Q} -> + rabbit_log:debug("Found queue record ~tp, checking if it is a stream", + [Reference]), + case is_stream_queue(Q) of + true -> + rabbit_log:debug("Queue record ~tp is a stream, trying to delete it", + [Reference]), + {ok, _} = + rabbit_stream_queue:delete(Q, false, false, Username), + rabbit_log:debug("Stream ~tp deleted", [Reference]), + {ok, deleted}; + _ -> + rabbit_log:debug("Queue record ~tp is NOT a stream, returning error", + [Reference]), + {error, reference_not_found} + end; + {error, not_found} -> + rabbit_log:debug("Stream ~tp not found, cannot delete it", + [Reference]), + {error, reference_not_found} + end. -spec create_super_stream(binary(), binary(), @@ -70,77 +89,319 @@ delete(VirtualHost, Reference, Username) -> #{binary() => binary()}, [binary()], binary()) -> - ok | {error, term()}. + ok | {error, term()}. create_super_stream(VirtualHost, Name, Partitions, Arguments, BindingKeys, Username) -> - gen_server:call(?MODULE, - {create_super_stream, - VirtualHost, - Name, - Partitions, - Arguments, - BindingKeys, - Username}). + case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of + {error, Reason} -> + {error, Reason}; + ok -> + case declare_super_stream_exchange(VirtualHost, Name, Username) of + ok -> + RollbackOperations = + [fun() -> + delete_super_stream_exchange(VirtualHost, Name, + Username) + end], + QueueCreationsResult = + lists:foldl(fun (Partition, {ok, RollbackOps}) -> + Args = + default_super_stream_arguments(Arguments), + case create(VirtualHost, + Partition, + Args, + Username) + of + {ok, _} -> + {ok, + [fun() -> + delete(VirtualHost, + Partition, + Username) + end] + ++ RollbackOps}; + {error, Reason} -> + {{error, Reason}, + RollbackOps} + end; + (_, + {{error, _Reason}, _RollbackOps} = + Acc) -> + Acc + end, + {ok, RollbackOperations}, Partitions), + case QueueCreationsResult of + {ok, RollbackOps} -> + BindingsResult = + add_super_stream_bindings(VirtualHost, + Name, + Partitions, + BindingKeys, + Username), + case BindingsResult of + ok -> + ok; + Error -> + _ = [Fun() || Fun <- RollbackOps], + Error + end; + {{error, Reason}, RollbackOps} -> + _ = [Fun() || Fun <- RollbackOps], + {error, Reason} + end; + {error, Msg} -> + {error, Msg} + end + end. -spec delete_super_stream(binary(), binary(), binary()) -> - ok | {error, term()}. -delete_super_stream(VirtualHost, Name, Username) -> - gen_server:call(?MODULE, - {delete_super_stream, VirtualHost, Name, Username}). + ok | {error, term()}. +delete_super_stream(VirtualHost, SuperStream, Username) -> + case super_stream_partitions(VirtualHost, SuperStream) of + {ok, Partitions} -> + case delete_super_stream_exchange(VirtualHost, SuperStream, + Username) + of + ok -> + ok; + {error, Error} -> + rabbit_log:warning("Error while deleting super stream exchange ~tp, " + "~tp", + [SuperStream, Error]), + ok + end, + [begin + case delete(VirtualHost, Stream, Username) of + {ok, deleted} -> + ok; + {error, Err} -> + rabbit_log:warning("Error while delete partition ~tp of super stream " + "~tp, ~tp", + [Stream, SuperStream, Err]), + ok + end + end + || Stream <- Partitions], + ok; + {error, Error} -> + {error, Error} + end. -spec lookup_leader(binary(), binary()) -> - {ok, pid()} | {error, not_available} | - {error, not_found}. + {ok, pid()} | {error, not_available} | + {error, not_found}. lookup_leader(VirtualHost, Stream) -> - gen_server:call(?MODULE, {lookup_leader, VirtualHost, Stream}). + case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + LeaderPid = amqqueue:get_pid(Q), + case process_alive(LeaderPid) of + true -> + {ok, LeaderPid}; + false -> + case leader_from_members(Q) of + {ok, Pid} -> + {ok, Pid}; + _ -> + {error, not_available} + end + end; + R -> + R + end. -spec lookup_local_member(binary(), binary()) -> - {ok, pid()} | {error, not_found} | - {error, not_available}. + {ok, pid()} | {error, not_found} | + {error, not_available}. lookup_local_member(VirtualHost, Stream) -> - gen_server:call(?MODULE, {lookup_local_member, VirtualHost, Stream}). + case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, timeout} -> + {error, not_available}; + _ -> + {error, not_available} + end; + R -> + R + end. -spec lookup_member(binary(), binary()) -> - {ok, pid()} | {error, not_found} | - {error, not_available}. + {ok, pid()} | {error, not_found} | + {error, not_available}. lookup_member(VirtualHost, Stream) -> - gen_server:call(?MODULE, {lookup_member, VirtualHost, Stream}). + case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + #{name := StreamName} = amqqueue:get_type_state(Q), + % FIXME check if pid is alive in case of stale information + case rabbit_stream_coordinator:local_pid(StreamName) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + _ -> + case rabbit_stream_coordinator:members(StreamName) of + {ok, Members} -> + case lists:search(fun ({undefined, _Role}) -> + false; + ({P, _Role}) + when is_pid(P) -> + process_alive(P); + (_) -> + false + end, + maps:values(Members)) + of + {value, {Pid, _Role}} -> + {ok, Pid}; + _ -> + {error, not_available} + end; + _ -> + {error, not_available} + end + end; + R -> + R + end. -spec topology(binary(), binary()) -> - {ok, - #{leader_node => undefined | pid(), - replica_nodes => [pid()]}} | - {error, stream_not_found} | {error, stream_not_available}. + {ok, + #{leader_node => undefined | pid(), + replica_nodes => [pid()]}} | + {error, stream_not_found} | {error, stream_not_available}. topology(VirtualHost, Stream) -> - gen_server:call(?MODULE, {topology, VirtualHost, Stream}). + case lookup_stream(VirtualHost, Stream) of + {ok, Q} -> + QState = amqqueue:get_type_state(Q), + #{name := StreamName} = QState, + case rabbit_stream_coordinator:members(StreamName) of + {ok, Members} -> + {ok, + maps:fold(fun (_Node, {undefined, _Role}, Acc) -> + Acc; + (LeaderNode, {_Pid, writer}, Acc) -> + Acc#{leader_node => LeaderNode}; + (ReplicaNode, {_Pid, replica}, Acc) -> + #{replica_nodes := ReplicaNodes} = + Acc, + Acc#{replica_nodes => + ReplicaNodes + ++ [ReplicaNode]}; + (_Node, _, Acc) -> + Acc + end, + #{leader_node => undefined, + replica_nodes => []}, + Members)}; + Err -> + rabbit_log:info("Error locating ~tp stream members: ~tp", + [StreamName, Err]), + {error, stream_not_available} + end; + {error, not_found} -> + {error, stream_not_found}; + {error, not_available} -> + {error, stream_not_available} + end. -spec route(binary(), binary(), binary()) -> - {ok, [binary()] | no_route} | {error, stream_not_found}. + {ok, [binary()] | no_route} | {error, stream_not_found}. route(RoutingKey, VirtualHost, SuperStream) -> - gen_server:call(?MODULE, - {route, RoutingKey, VirtualHost, SuperStream}). + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + try + Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + Content = #content{properties = #'P_basic'{}}, + {ok, DummyMsg} = mc_amqpl:message(ExchangeName, + RoutingKey, + Content), + case rabbit_exchange:route(Exchange, DummyMsg) of + [] -> + {ok, no_route}; + Routes -> + {ok, + [Stream + || #resource{name = Stream} = R <- Routes, + is_resource_stream_queue(R)]} + end + catch + exit:Error -> + rabbit_log:warning("Error while looking up exchange ~tp, ~tp", + [rabbit_misc:rs(ExchangeName), Error]), + {error, stream_not_found} + end. -spec partitions(binary(), binary()) -> - {ok, [binary()]} | {error, stream_not_found}. + {ok, [binary()]} | {error, stream_not_found}. partitions(VirtualHost, SuperStream) -> - gen_server:call(?MODULE, {partitions, VirtualHost, SuperStream}). + super_stream_partitions(VirtualHost, SuperStream). -spec partition_index(binary(), binary(), binary()) -> - {ok, integer()} | {error, stream_not_found}. + {ok, integer()} | {error, stream_not_found}. partition_index(VirtualHost, SuperStream, Stream) -> - gen_server:call(?MODULE, - {partition_index, VirtualHost, SuperStream, Stream}). + ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), + rabbit_log:debug("Looking for partition index of stream ~tp in " + "super stream ~tp (virtual host ~tp)", + [Stream, SuperStream, VirtualHost]), + try + _ = rabbit_exchange:lookup_or_die(ExchangeName), + UnorderedBindings = + _ = [Binding + || Binding = #binding{destination = #resource{name = Q} = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D), Q == Stream], + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_log:debug("Bindings: ~tp", [OrderedBindings]), + case OrderedBindings of + [] -> + {error, stream_not_found}; + Bindings -> + Binding = lists:nth(1, Bindings), + #binding{args = Args} = Binding, + case rabbit_misc:table_lookup(Args, + <<"x-stream-partition-order">>) + of + {_, Order} -> + Index = rabbit_data_coercion:to_integer(Order), + {ok, Index}; + _ -> + Pattern = <<"-">>, + Size = byte_size(Pattern), + case string:find(Stream, Pattern, trailing) of + nomatch -> + {ok, -1}; + <> -> + try + Index = binary_to_integer(Rest), + {ok, Index} + catch + error:_ -> + {ok, -1} + end; + _ -> + {ok, -1} + end + end + end + catch + exit:Error -> + rabbit_log:error("Error while looking up exchange ~tp, ~tp", + [ExchangeName, Error]), + {error, stream_not_found} + end. stream_queue_arguments(Arguments) -> stream_queue_arguments([{<<"x-queue-type">>, longstr, <<"stream">>}], Arguments). stream_queue_arguments(ArgumentsAcc, Arguments) - when map_size(Arguments) =:= 0 -> + when map_size(Arguments) =:= 0 -> ArgumentsAcc; stream_queue_arguments(ArgumentsAcc, #{<<"max-length-bytes">> := Value} = Arguments) -> @@ -155,7 +416,7 @@ stream_queue_arguments(ArgumentsAcc, maps:remove(<<"max-age">>, Arguments)); stream_queue_arguments(ArgumentsAcc, #{<<"stream-max-segment-size-bytes">> := Value} = - Arguments) -> + Arguments) -> stream_queue_arguments([{<<"x-stream-max-segment-size-bytes">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc, @@ -187,7 +448,7 @@ validate_stream_queue_arguments([]) -> validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) - when ClusterSize =< 0 -> + when ClusterSize =< 0 -> error; validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} @@ -203,7 +464,7 @@ validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, validate_stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long, FilterSize} | _]) - when FilterSize < 16 orelse FilterSize > 255 -> + when FilterSize < 16 orelse FilterSize > 255 -> error; validate_stream_queue_arguments([_ | T]) -> validate_stream_queue_arguments(T). @@ -216,323 +477,6 @@ default_super_stream_arguments(Arguments) -> Arguments#{<<"queue-leader-locator">> => <<"balanced">>} end. -handle_call({create, VirtualHost, Reference, Arguments, Username}, - _From, State) -> - {reply, create_stream(VirtualHost, Reference, Arguments, Username), - State}; -handle_call({delete, VirtualHost, Reference, Username}, _From, - State) -> - {reply, delete_stream(VirtualHost, Reference, Username), State}; -handle_call({create_super_stream, - VirtualHost, - Name, - Partitions, - Arguments, - BindingKeys, - Username}, - _From, State) -> - case validate_super_stream_creation(VirtualHost, Name, Partitions, BindingKeys) of - {error, Reason} -> - {reply, {error, Reason}, State}; - ok -> - case declare_super_stream_exchange(VirtualHost, Name, Username) of - ok -> - RollbackOperations = - [fun() -> - delete_super_stream_exchange(VirtualHost, Name, - Username) - end], - QueueCreationsResult = - lists:foldl(fun (Partition, {ok, RollbackOps}) -> - Args = - default_super_stream_arguments(Arguments), - case create_stream(VirtualHost, - Partition, - Args, - Username) - of - {ok, _} -> - {ok, - [fun() -> - delete_stream(VirtualHost, - Partition, - Username) - end] - ++ RollbackOps}; - {error, Reason} -> - {{error, Reason}, - RollbackOps} - end; - (_, - {{error, _Reason}, _RollbackOps} = - Acc) -> - Acc - end, - {ok, RollbackOperations}, Partitions), - case QueueCreationsResult of - {ok, RollbackOps} -> - BindingsResult = - add_super_stream_bindings(VirtualHost, - Name, - Partitions, - BindingKeys, - Username), - case BindingsResult of - ok -> - {reply, ok, State}; - Error -> - _ = [Fun() || Fun <- RollbackOps], - {reply, Error, State} - end; - {{error, Reason}, RollbackOps} -> - _ = [Fun() || Fun <- RollbackOps], - {reply, {error, Reason}, State} - end; - {error, Msg} -> - {reply, {error, Msg}, State} - end - end; -handle_call({delete_super_stream, VirtualHost, SuperStream, Username}, - _From, State) -> - case super_stream_partitions(VirtualHost, SuperStream) of - {ok, Partitions} -> - case delete_super_stream_exchange(VirtualHost, SuperStream, - Username) - of - ok -> - ok; - {error, Error} -> - rabbit_log:warning("Error while deleting super stream exchange ~tp, " - "~tp", - [SuperStream, Error]), - ok - end, - [begin - case delete_stream(VirtualHost, Stream, Username) of - {ok, deleted} -> - ok; - {error, Err} -> - rabbit_log:warning("Error while delete partition ~tp of super stream " - "~tp, ~tp", - [Stream, SuperStream, Err]), - ok - end - end - || Stream <- Partitions], - {reply, ok, State}; - {error, Error} -> - {reply, {error, Error}, State} - end; -handle_call({lookup_leader, VirtualHost, Stream}, _From, State) -> - Res = case lookup_stream(VirtualHost, Stream) of - {ok, Q} -> - LeaderPid = amqqueue:get_pid(Q), - case process_alive(LeaderPid) of - true -> - {ok, LeaderPid}; - false -> - case leader_from_members(Q) of - {ok, Pid} -> - {ok, Pid}; - _ -> - {error, not_available} - end - end; - R -> - R - end, - {reply, Res, State}; -handle_call({lookup_local_member, VirtualHost, Stream}, _From, - State) -> - Res = case lookup_stream(VirtualHost, Stream) of - {ok, Q} -> - #{name := StreamName} = amqqueue:get_type_state(Q), - % FIXME check if pid is alive in case of stale information - case rabbit_stream_coordinator:local_pid(StreamName) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {error, timeout} -> - {error, not_available}; - _ -> - {error, not_available} - end; - R -> - R - end, - {reply, Res, State}; -handle_call({lookup_member, VirtualHost, Stream}, _From, State) -> - Res = case lookup_stream(VirtualHost, Stream) of - {ok, Q} -> - #{name := StreamName} = amqqueue:get_type_state(Q), - % FIXME check if pid is alive in case of stale information - case rabbit_stream_coordinator:local_pid(StreamName) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - _ -> - case rabbit_stream_coordinator:members(StreamName) of - {ok, Members} -> - case lists:search(fun ({undefined, _Role}) -> - false; - ({P, _Role}) - when is_pid(P) -> - process_alive(P); - (_) -> - false - end, - maps:values(Members)) - of - {value, {Pid, _Role}} -> - {ok, Pid}; - _ -> - {error, not_available} - end; - _ -> - {error, not_available} - end - end; - R -> - R - end, - {reply, Res, State}; -handle_call({topology, VirtualHost, Stream}, _From, State) -> - Res = case lookup_stream(VirtualHost, Stream) of - {ok, Q} -> - QState = amqqueue:get_type_state(Q), - #{name := StreamName} = QState, - case rabbit_stream_coordinator:members(StreamName) of - {ok, Members} -> - {ok, - maps:fold(fun (_Node, {undefined, _Role}, Acc) -> - Acc; - (LeaderNode, {_Pid, writer}, Acc) -> - Acc#{leader_node => LeaderNode}; - (ReplicaNode, {_Pid, replica}, Acc) -> - #{replica_nodes := ReplicaNodes} = - Acc, - Acc#{replica_nodes => - ReplicaNodes - ++ [ReplicaNode]}; - (_Node, _, Acc) -> - Acc - end, - #{leader_node => undefined, - replica_nodes => []}, - Members)}; - Err -> - rabbit_log:info("Error locating ~tp stream members: ~tp", - [StreamName, Err]), - {error, stream_not_available} - end; - {error, not_found} -> - {error, stream_not_found}; - {error, not_available} -> - {error, stream_not_available} - end, - {reply, Res, State}; -handle_call({route, RoutingKey, VirtualHost, SuperStream}, _From, - State) -> - ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), - Res = try - Exchange = rabbit_exchange:lookup_or_die(ExchangeName), - Content = #content{properties = #'P_basic'{}}, - {ok, DummyMsg} = mc_amqpl:message(ExchangeName, - RoutingKey, - Content), - case rabbit_exchange:route(Exchange, DummyMsg) of - [] -> - {ok, no_route}; - Routes -> - {ok, - [Stream - || #resource{name = Stream} = R <- Routes, - is_resource_stream_queue(R)]} - end - catch - exit:Error -> - rabbit_log:warning("Error while looking up exchange ~tp, ~tp", - [rabbit_misc:rs(ExchangeName), Error]), - {error, stream_not_found} - end, - {reply, Res, State}; -handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> - Res = super_stream_partitions(VirtualHost, SuperStream), - {reply, Res, State}; -handle_call({partition_index, VirtualHost, SuperStream, Stream}, - _From, State) -> - ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), - rabbit_log:debug("Looking for partition index of stream ~tp in " - "super stream ~tp (virtual host ~tp)", - [Stream, SuperStream, VirtualHost]), - Res = try - _ = rabbit_exchange:lookup_or_die(ExchangeName), - UnorderedBindings = - _ = [Binding - || Binding = #binding{destination = #resource{name = Q} = D} - <- rabbit_binding:list_for_source(ExchangeName), - is_resource_stream_queue(D), Q == Stream], - OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), - rabbit_log:debug("Bindings: ~tp", [OrderedBindings]), - case OrderedBindings of - [] -> - {error, stream_not_found}; - Bindings -> - Binding = lists:nth(1, Bindings), - #binding{args = Args} = Binding, - case rabbit_misc:table_lookup(Args, - <<"x-stream-partition-order">>) - of - {_, Order} -> - Index = rabbit_data_coercion:to_integer(Order), - {ok, Index}; - _ -> - Pattern = <<"-">>, - Size = byte_size(Pattern), - case string:find(Stream, Pattern, trailing) of - nomatch -> - {ok, -1}; - <> -> - try - Index = binary_to_integer(Rest), - {ok, Index} - catch - error:_ -> - {ok, -1} - end; - _ -> - {ok, -1} - end - end - end - catch - exit:Error -> - rabbit_log:error("Error while looking up exchange ~tp, ~tp", - [ExchangeName, Error]), - {error, stream_not_found} - end, - {reply, Res, State}; -handle_call(which_children, _From, State) -> - {reply, [], State}. - -handle_cast(_, State) -> - {noreply, State}. - -handle_info(Info, State) -> - rabbit_log:info("Received info ~tp", [Info]), - {noreply, State}. - -create_stream(VirtualHost, Reference, Arguments, Username) -> - StreamQueueArguments = stream_queue_arguments(Arguments), - maybe - ok ?= validate_stream_queue_arguments(StreamQueueArguments), - do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) - else - error -> - {error, validation_failed}; - {error, _} = Err -> - Err - end. - do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) -> Name = #resource{virtual_host = VirtualHost, kind = queue, @@ -614,49 +558,20 @@ do_create_stream(VirtualHost, Reference, StreamQueueArguments, Username) -> end end. -delete_stream(VirtualHost, Reference, Username) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Reference}, - rabbit_log:debug("Trying to delete stream ~tp", [Reference]), - case rabbit_amqqueue:lookup(Name) of - {ok, Q} -> - rabbit_log:debug("Found queue record ~tp, checking if it is a stream", - [Reference]), - case is_stream_queue(Q) of - true -> - rabbit_log:debug("Queue record ~tp is a stream, trying to delete it", - [Reference]), - {ok, _} = - rabbit_stream_queue:delete(Q, false, false, Username), - rabbit_log:debug("Stream ~tp deleted", [Reference]), - {ok, deleted}; - _ -> - rabbit_log:debug("Queue record ~tp is NOT a stream, returning error", - [Reference]), - {error, reference_not_found} - end; - {error, not_found} -> - rabbit_log:debug("Stream ~tp not found, cannot delete it", - [Reference]), - {error, reference_not_found} - end. - super_stream_partitions(VirtualHost, SuperStream) -> ExchangeName = rabbit_misc:r(VirtualHost, exchange, SuperStream), try _ = rabbit_exchange:lookup_or_die(ExchangeName), UnorderedBindings = - [Binding - || Binding = #binding{destination = D} - <- rabbit_binding:list_for_source(ExchangeName), - is_resource_stream_queue(D)], + [Binding + || Binding = #binding{destination = D} + <- rabbit_binding:list_for_source(ExchangeName), + is_resource_stream_queue(D)], OrderedBindings = - rabbit_stream_utils:sort_partitions(UnorderedBindings), + rabbit_stream_utils:sort_partitions(UnorderedBindings), {ok, lists:foldl(fun (#binding{destination = - #resource{kind = queue, name = Q}}, + #resource{kind = queue, name = Q}}, Acc) -> Acc ++ [Q]; (_Binding, Acc) -> @@ -672,7 +587,7 @@ super_stream_partitions(VirtualHost, SuperStream) -> validate_super_stream_creation(_VirtualHost, _Name, Partitions, BindingKeys) when length(Partitions) =/= length(BindingKeys) -> - {error, {validation_failed, "There must be the same number of partitions and binding keys"}}; + {error, {validation_failed, "There must be the same number of partitions and binding keys"}}; validate_super_stream_creation(VirtualHost, Name, Partitions, _BindingKeys) -> maybe ok ?= validate_super_stream_partitions(Partitions), @@ -763,10 +678,10 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> case rabbit_stream_utils:enforce_correct_name(Name) of {ok, CorrectName} -> Args = - rabbit_misc:set_table_value([], - <<"x-super-stream">>, - bool, - true), + rabbit_misc:set_table_value([], + <<"x-super-stream">>, + bool, + true), CheckedType = rabbit_exchange:check_type(<<"direct">>), ExchangeName = rabbit_misc:r(VirtualHost, exchange, CorrectName), XResult = case rabbit_exchange:lookup(ExchangeName) of @@ -785,12 +700,12 @@ declare_super_stream_exchange(VirtualHost, Name, Username) -> {ok, X} -> try ok = - rabbit_exchange:assert_equivalence(X, - CheckedType, - true, - false, - false, - Args) + rabbit_exchange:assert_equivalence(X, + CheckedType, + true, + false, + false, + Args) catch exit:ExitError -> % likely to be a problem of inequivalent args on an existing stream @@ -813,23 +728,23 @@ add_super_stream_bindings(VirtualHost, Username) -> PartitionsBindingKeys = lists:zip(Partitions, BindingKeys), BindingsResult = - lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) -> - case add_super_stream_binding(VirtualHost, - Name, - Partition, - BindingKey, - Order, - Username) - of - ok -> - {ok, Order + 1}; - {error, Reason} -> - {{error, Reason}, 0} - end; - (_, {{error, _Reason}, _Order} = Acc) -> - Acc - end, - {ok, 0}, PartitionsBindingKeys), + lists:foldl(fun ({Partition, BindingKey}, {ok, Order}) -> + case add_super_stream_binding(VirtualHost, + Name, + Partition, + BindingKey, + Order, + Username) + of + ok -> + {ok, Order + 1}; + {error, Reason} -> + {{error, Reason}, 0} + end; + (_, {{error, _Reason}, _Order} = Acc) -> + Acc + end, + {ok, 0}, PartitionsBindingKeys), case BindingsResult of {ok, _} -> ok; @@ -844,17 +759,17 @@ add_super_stream_binding(VirtualHost, Order, Username) -> {ok, ExchangeNameBin} = - rabbit_stream_utils:enforce_correct_name(SuperStream), + rabbit_stream_utils:enforce_correct_name(SuperStream), {ok, QueueNameBin} = - rabbit_stream_utils:enforce_correct_name(Partition), + rabbit_stream_utils:enforce_correct_name(Partition), ExchangeName = rabbit_misc:r(VirtualHost, exchange, ExchangeNameBin), QueueName = rabbit_misc:r(VirtualHost, queue, QueueNameBin), Pid = self(), Arguments = - rabbit_misc:set_table_value([], - <<"x-stream-partition-order">>, - long, - Order), + rabbit_misc:set_table_value([], + <<"x-stream-partition-order">>, + long, + Order), case rabbit_binding:add(#binding{source = ExchangeName, destination = QueueName, key = BindingKey, @@ -906,10 +821,9 @@ delete_super_stream_exchange(VirtualHost, Name, Username) -> end. lookup_stream(VirtualHost, Stream) -> - Name = - #resource{virtual_host = VirtualHost, - kind = queue, - name = Stream}, + Name = #resource{virtual_host = VirtualHost, + kind = queue, + name = Stream}, case rabbit_amqqueue:lookup(Name) of {ok, Q} -> case is_stream_queue(Q) of diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 054657cfccec..51a3673d4057 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -2819,18 +2819,7 @@ handle_frame_post_auth(Transport, CorrelationId, ?RESPONSE_CODE_STREAM_DOES_NOT_EXIST), increase_protocol_counter(?STREAM_DOES_NOT_EXIST), - {Connection, State}; - {error, Error} -> - rabbit_log:warning("Error while trying to delete super stream ~tp: ~tp", - [SuperStream, Error]), - response(Transport, - Connection, - delete_super_stream, - CorrelationId, - ?RESPONSE_CODE_PRECONDITION_FAILED), - increase_protocol_counter(?PRECONDITION_FAILED), {Connection, State} - end; error -> response(Transport, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl index f94f7165be7f..3fe5e25f7ef5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_sup.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_sup.erl @@ -47,9 +47,6 @@ init([]) -> SslListeners0} end, - Nodes = rabbit_nodes:list_members(), - OsirisConf = #{nodes => Nodes}, - ServerConfiguration = #{initial_credits => application:get_env(rabbitmq_stream, initial_credits, @@ -65,11 +62,6 @@ init([]) -> application:get_env(rabbitmq_stream, heartbeat, ?DEFAULT_HEARTBEAT)}, - StreamManager = - #{id => rabbit_stream_manager, - type => worker, - start => {rabbit_stream_manager, start_link, [OsirisConf]}}, - MetricsGc = #{id => rabbit_stream_metrics_gc, type => worker, @@ -77,7 +69,7 @@ init([]) -> {ok, {{one_for_all, 10, 10}, - [StreamManager, MetricsGc] + [MetricsGc] ++ listener_specs(fun tcp_listener_spec/1, [SocketOpts, ServerConfiguration, NumTcpAcceptors], Listeners)