From 55dc45cee7c99c0e720a2b25a1925a23ce3f6944 Mon Sep 17 00:00:00 2001 From: Michal Kuratczyk Date: Fri, 20 Dec 2024 08:56:25 +0100 Subject: [PATCH] Fix channel crash when publishing to a new stream (#12969) The following scenario led to a channel crash: 1. Publish to a non-existing stream: `perf-test -y 0 -p -e amq.default -t direct -k stream` 2. Declare the stream: `rabbitmqadmin declare queue name=stream queue_type=stream` There is no pid yet, so we got a function_clause with `none` ``` {function_clause, [{osiris_writer,write, [none,<0.877.0>,<<"<0.877.0>_-65ZKFz18ll5lau0phi7CsQ">>,1, [[0,"Sp",[192,6,5,"B@@AC"]], [0,"Sr", [193,38,4, [[[163,10,<<"x-exchange">>],[161,0,<<>>]], [[163,13,<<"x-routing-key">>],[161,6,<<"stream">>]]]]], [0,"Su",[160,12,[<<0,19,252,1,0,0,98,171,20,16,108,167>>]]]]], [{file,"src/osiris_writer.erl"},{line,158}]}, {rabbit_stream_queue,deliver0,4, [{file,"rabbit_stream_queue.erl"},{line,540}]}, {rabbit_stream_queue,'-deliver/3-fun-0-',4, [{file,"rabbit_stream_queue.erl"},{line,526}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}, {rabbit_queue_type,'-deliver0/4-fun-5-',5, [{file,"rabbit_queue_type.erl"},{line,707}]}, {maps,fold_1,4,[{file,"maps.erl"},{line,860}]}, {rabbit_queue_type,deliver0,4, [{file,"rabbit_queue_type.erl"},{line,704}]}, {rabbit_queue_type,deliver,4, [{file,"rabbit_queue_type.erl"},{line,662}]}]} ``` Co-authored-by: Karl Nilsson (cherry picked from commit 68de3fdb77826152e15528d01854d6e835a58c8c) --- deps/rabbit/src/rabbit_stream_queue.erl | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index a7aa3a5a18cc..0fab693cf8f2 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -923,8 +923,31 @@ readers(QName) -> {node(), 0} end. +get_writer_pid(Q) -> + case amqqueue:get_pid(Q) of + none -> + %% the stream is still starting; wait up to 5 seconds + %% and ask the coordinator as it has the Pid sooner + #{name := StreamId} = amqqueue:get_type_state(Q), + get_writer_pid(StreamId, 50); + Pid -> + Pid + end. + +get_writer_pid(_StreamId, 0) -> + stream_not_found; +get_writer_pid(StreamId, N) -> + case rabbit_stream_coordinator:writer_pid(StreamId) of + {ok, Pid} -> + Pid; + _ -> + timer:sleep(100), + get_writer_pid(StreamId, N - 1) + end. + + init(Q) when ?is_amqqueue(Q) -> - Leader = amqqueue:get_pid(Q), + Leader = get_writer_pid(Q), QName = amqqueue:get_name(Q), #{name := StreamId} = amqqueue:get_type_state(Q), %% tell us about leader changes so we can fail over