Skip to content

Commit 04b9c50

Browse files
authored
Merge pull request #14884 from rabbitmq/md/idempotent-stream-delete
rabbit_stream_coordinator: Handle delete_stream idempotently
2 parents a82d9ab + ebe3471 commit 04b9c50

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
585585
State0#?MODULE{streams = Streams0#{StreamId => Stream},
586586
monitors = Monitors}, Reply, Effects)
587587
end;
588-
Reply ->
588+
{reply, Reply} ->
589589
return(Meta, State0, Reply, [])
590590
end;
591591
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
@@ -1372,10 +1372,14 @@ filter_command(_Meta, {delete_replica, _, #{node := Node}}, #stream{id = StreamI
13721372
?LOG_WARNING(
13731373
"~ts failed to delete replica on node ~ts for stream ~ts: refusing to delete the only replica",
13741374
[?MODULE, Node, StreamId]),
1375-
{error, last_stream_member};
1375+
{reply, {error, last_stream_member}};
13761376
false ->
13771377
ok
13781378
end;
1379+
filter_command(_Meta, {delete_stream, _StreamId, #{}}, undefined) ->
1380+
%% Attempting to delete a stream which does not exist. Reply 'ok' to the
1381+
%% caller so that this action is idempotent.
1382+
{reply, ok};
13791383
filter_command(_, _, _) ->
13801384
ok.
13811385

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ all_tests() ->
3434
add_replica,
3535
restart_stream,
3636
delete_stream,
37+
delete_stream_idempotent,
3738
delete_replica_leader,
3839
delete_replica,
3940
delete_two_replicas,
@@ -949,8 +950,29 @@ delete_stream(_) ->
949950
S7 = update_stream(meta(?LINE), {member_deleted, StreamId, #{node => N3}},
950951
S6),
951952
?assertEqual(undefined, S7),
952-
%% idempotency test
953-
_ = update_stream(Meta1, {delete_stream, StreamId, #{}}, S7),
953+
ok.
954+
955+
delete_stream_idempotent(_) ->
956+
S0 = rabbit_stream_coordinator:init(#{machine_version => 5}),
957+
StreamId = atom_to_list(?FUNCTION_NAME),
958+
959+
TypeState = #{name => StreamId,
960+
retention => [],
961+
nodes => [node()]},
962+
Q = new_q(list_to_binary(StreamId), TypeState),
963+
Cmd0 = {new_stream, StreamId, #{leader_node => node(),
964+
retention => [],
965+
queue => Q}},
966+
{S1, _, _} = apply_cmd(meta(?LINE), Cmd0, S0),
967+
968+
Cmd1 = {delete_stream, StreamId, #{}},
969+
{S2, ok, []} = apply_cmd(meta(?LINE), Cmd1, S1),
970+
971+
Cmd2 = {member_deleted, StreamId, #{node => node()}},
972+
{S3, '$ra_no_reply', []} = apply_cmd(meta(?LINE), Cmd2, S2),
973+
974+
{S3, ok, []} = apply_cmd(meta(?LINE), Cmd1, S3),
975+
954976
ok.
955977

956978
add_replica(_) ->

0 commit comments

Comments
 (0)