Skip to content

Commit d061087

Browse files
committed
Fix channel crash when publishing to a new stream
The following scenario led to a channel crash: 1. Publish to a non-existing stream: `perf-test -y 0 -p -e amq.default -t direct -k stream` 2. Declare the stream: `rabbitmqadmin declare queue name=stream queue_type=stream` There is no pid yet, so we got a function_clause with `none` ``` {function_clause, [{osiris_writer,write, [none,<0.877.0>,<<"<0.877.0>_-65ZKFz18ll5lau0phi7CsQ">>,1, [[0,"Sp",[192,6,5,"B@@ac"]], [0,"Sr", [193,38,4, [[[163,10,<<"x-exchange">>],[161,0,<<>>]], [[163,13,<<"x-routing-key">>],[161,6,<<"stream">>]]]]], [0,"Su",[160,12,[<<0,19,252,1,0,0,98,171,20,16,108,167>>]]]]], [{file,"src/osiris_writer.erl"},{line,158}]}, {rabbit_stream_queue,deliver0,4, [{file,"rabbit_stream_queue.erl"},{line,540}]}, {rabbit_stream_queue,'-deliver/3-fun-0-',4, [{file,"rabbit_stream_queue.erl"},{line,526}]}, {lists,foldl,3,[{file,"lists.erl"},{line,2146}]}, {rabbit_queue_type,'-deliver0/4-fun-5-',5, [{file,"rabbit_queue_type.erl"},{line,707}]}, {maps,fold_1,4,[{file,"maps.erl"},{line,860}]}, {rabbit_queue_type,deliver0,4, [{file,"rabbit_queue_type.erl"},{line,704}]}, {rabbit_queue_type,deliver,4, [{file,"rabbit_queue_type.erl"},{line,662}]}]} ```
1 parent f413880 commit d061087

File tree

1 file changed

+24
-2
lines changed

1 file changed

+24
-2
lines changed

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -926,10 +926,32 @@ readers(QName) ->
926926
{node(), 0}
927927
end.
928928

929+
get_writer_pid(Q) ->
930+
case amqqueue:get_pid(Q) of
931+
none ->
932+
%% the stream is still starting; wait up to 5 seconds
933+
get_writer_pid(Q, 50);
934+
Pid ->
935+
Pid
936+
end.
937+
938+
get_writer_pid(_Q, 0) ->
939+
stream_not_found;
940+
get_writer_pid(Q, N) ->
941+
#{name := StreamId} = amqqueue:get_type_state(Q),
942+
case rabbit_stream_coordinator:writer_pid(StreamId) of
943+
{ok, Pid} ->
944+
Pid;
945+
_ ->
946+
timer:sleep(100),
947+
get_writer_pid(Q, N - 1)
948+
end.
949+
950+
929951
init(Q) when ?is_amqqueue(Q) ->
930-
Leader = amqqueue:get_pid(Q),
931-
QName = amqqueue:get_name(Q),
932952
#{name := StreamId} = amqqueue:get_type_state(Q),
953+
Leader = get_writer_pid(Q),
954+
QName = amqqueue:get_name(Q),
933955
%% tell us about leader changes so we can fail over
934956
case rabbit_stream_coordinator:register_listener(Q) of
935957
{ok, ok, _} ->

0 commit comments

Comments
 (0)