diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index cb01bfadf6d..ad9bfc1c180 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -585,7 +585,7 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0, State0#?MODULE{streams = Streams0#{StreamId => Stream}, monitors = Monitors}, Reply, Effects) end; - Reply -> + {reply, Reply} -> return(Meta, State0, Reply, []) end; apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0, @@ -1372,10 +1372,14 @@ filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamI ?LOG_WARNING( "~ts failed to delete replica on node ~ts for stream ~ts: refusing to delete the only replica", [?MODULE, Node, StreamId]), - {error, last_stream_member}; + {reply, {error, last_stream_member}}; false -> ok end; +filter_command(_Meta, {delete_stream, _StreamId, #{}}, undefined) -> + %% Attempting to delete a stream which does not exist. Reply 'ok' to the + %% caller so that this action is idempotent. + {reply, ok}; filter_command(_, _, _) -> ok. diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 6e12c8c313c..4c680971404 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -34,6 +34,7 @@ all_tests() -> add_replica, restart_stream, delete_stream, + delete_stream_idempotent, delete_replica_leader, delete_replica, delete_two_replicas, @@ -949,8 +950,29 @@ delete_stream(_) -> S7 = update_stream(meta(?LINE), {member_deleted, StreamId, #{node => N3}}, S6), ?assertEqual(undefined, S7), - %% idempotency test - _ = update_stream(Meta1, {delete_stream, StreamId, #{}}, S7), + ok. + +delete_stream_idempotent(_) -> + S0 = rabbit_stream_coordinator:init(#{machine_version => 5}), + StreamId = atom_to_list(?FUNCTION_NAME), + + TypeState = #{name => StreamId, + retention => [], + nodes => [node()]}, + Q = new_q(list_to_binary(StreamId), TypeState), + Cmd0 = {new_stream, StreamId, #{leader_node => node(), + retention => [], + queue => Q}}, + {S1, _, _} = apply_cmd(meta(?LINE), Cmd0, S0), + + Cmd1 = {delete_stream, StreamId, #{}}, + {S2, ok, []} = apply_cmd(meta(?LINE), Cmd1, S1), + + Cmd2 = {member_deleted, StreamId, #{node => node()}}, + {S3, '$ra_no_reply', []} = apply_cmd(meta(?LINE), Cmd2, S2), + + {S3, ok, []} = apply_cmd(meta(?LINE), Cmd1, S3), + ok. add_replica(_) ->