From e43cd5e542647c87faf769f2acc9c1bd68b017b1 Mon Sep 17 00:00:00 2001 From: tomyouyou Date: Wed, 25 Jun 2025 14:23:46 +0800 Subject: [PATCH] When a queue bq_init fails, its state is not updated. If a queue bq_init fails, its 'backing_queue_state' is 'undefined', while its state in mnesia remains 'live' due to the inability to update. In fact, its state in mnesia should be updated to 'crashed' in the 'terminate' function. Here is an example: 2025-04-19 16:39:24.535 [error] <0.4827.0> supervisor: {<0.4827.0>,rabbit_amqqueue_sup}, errorContext: child_terminated, reason: {function_clause, [{rabbit_queue_index,segment_plus_journal1, [undefined,{no_pub,no_del,ack}], [{file,"rabbit_queue_index.erl"},{line,1344}]}, {rabbit_queue_index,'-segment_plus_journal/2-fun-0-',3, [{file,"rabbit_queue_index.erl"},{line,1331}]}, {array,sparse_foldl_3,7,[{file,"array.erl"},{line,1687}]}, {array,sparse_foldl_2,9,[{file,"array.erl"},{line,1681}]}, {rabbit_queue_index,segment_entries_foldr,3, [{file,"rabbit_queue_index.erl"},{line,1230}]}, {rabbit_queue_index,read_bounded_segment,5, [{file,"rabbit_queue_index.erl"},{line,1211}]}, {rabbit_queue_index,read,3, [{file,"rabbit_queue_index.erl"},{line,500}]}, {rabbit_variable_queue,maybe_deltas_to_betas,3, [{file,"rabbit_variable_queue.erl"},{line,2635}]}]}, offender: [{pid,<0.519784.0>}, {id,rabbit_amqqueue}, {mfargs, {rabbit_prequeue,start_link, [{amqqueue, {resource,<<"/">>,queue, <<"versioned_notifications.info">>}, false,false,none,[],none,[],[],[], [{vhost,<<"/">>}, {name,<<"ttl">>}, {pattern,<<".*">>}, {'apply-to',<<"queues">>}, {definition,[{<<"message-ttl">>,600000}]}, {priority,0}], undefined,[],[],live,0,[],<<"/">>, #{user => <<"guest">>, system_creation => 1744865495083934694, recover_on_declare => false, creator =>, {1744865646,"[2111:222::144]",54011, "nova-monitor"}}, rabbit_classic_queue,#{}}, {declare,new}, <0.4826.0>]}}, {restart_type,transient}, {significant,true}, {shutdown,600000}, {child_type,worker}], 2025-04-19 16:39:25.619 [error] <0.534924.0> Restarting crashed queue 'versioned_notifications.info' in vhost '/'. 2025-04-19 16:39:25.633 [error] <0.534924.0> ** Generic server <0.534924.0> terminating, ** Last message in was {'$gen_cast', {init,{recovery,{no_barrier,non_clean_shutdown}}}}, ** When Server state == {q,{amqqueue, {resource,<<"/">>,queue, <<"versioned_notifications.info">>}, false,false,none,[],<0.534924.0>,[],[],[], [{vhost,<<"/">>}, {name,<<"ttl">>}, {pattern,<<".*">>}, {'apply-to',<<"queues">>}, {definition,[{<<"message-ttl">>,600000}]}, {priority,0}], undefined,[],[],live,0,[],<<"/">>, #{user => <<"guest">>, system_creation => 1744865495083934694, recover_on_declare => false, creator =>, {1744865646,"[2111:222::144]",54011, "nova-monitor"}}, rabbit_classic_queue,#{}}, none,false,undefined,undefined, {state, {queue,[],[],0}, {active,-576274256314339,1.0}, []}, undefined,undefined,undefined,undefined, {state,none,30000,undefined}, #{},undefined,undefined,undefined, {state,#{},delegate}, undefined,undefined,undefined,undefined, 'drop-head',0,0,running,false,0,undefined,<<"/">>, undefined,0,true,false,false,0,undefined,0,0,0,[]}, ** Reason for termination == , ** {badarg,[{erlang,binary_to_term, [<<131,104,6,119,13,98,97,115,105,99,95,109,101,115,115, 97,103,101,104,4,119,8,114,101,115,111,117,114,99,101, 109,0,0,0,1,47,119,8,101,120,99,104,97,110,103,101,109, 0,0,0,4,110,111,118,97,108,0,0,0,1,109,0,0,0,28,118, 101,114,115,105,111,110,101,100,95,110,111,116,105,102, 105,99,97,116,105,111,110,115,46,105,110,102,111,106, 104,7,119,7,99,111,110,116,101,110,116,97,60,119,4,110, 111,110,101,109,0,0,0,31,248,0,16,97,112,112,108,105, 99,97,116,105,111,110,47,106,115,111,110,5,117,116,102, 45,56,0,0,0,0,2,0,119,25,114,97,98,98,105,116,95,102, 114,97,109,105,110,103,95,97,109,113,112,95,48,95,57, 95,49,108,0,0,0,1,109,0,0 2025-04-19 16:39:27.264 [error] <0.4827.0> supervisor: {<0.4827.0>,rabbit_amqqueue_sup}, errorContext: shutdown, reason: reached_max_restart_intensity, offender: [{pid,<0.534938.0>}, {id,rabbit_amqqueue}, {mfargs, {rabbit_prequeue,start_link, [{amqqueue, {resource,<<"/">>,queue, <<"versioned_notifications.info">>}, false,false,none,[],none,[],[],[], [{vhost,<<"/">>}, {name,<<"ttl">>}, {pattern,<<".*">>}, {'apply-to',<<"queues">>}, {definition,[{<<"message-ttl">>,600000}]}, {priority,0}], undefined,[],[],live,0,[],<<"/">>, #{user => <<"guest">>, system_creation => 1744865495083934694, recover_on_declare => false, creator =>, {1744865646,"[2111:222::144]",54011, "nova-monitor"}}, rabbit_classic_queue,#{}}, {declare,new}, <0.4826.0>]}}, {restart_type,transient}, {significant,true}, {shutdown,600000}, {child_type,worker}], 2025-04-19 16:41:24.807 [error] <0.535419.0> Channel error on connection <0.535410.0> ([2111:222::116]:55611 -> [2111:222::13f]:5672, vhost: '/', user: 'guest'), channel 1:, operation queue.declare caused a channel exception not_found: failed to perform operation on queue 'versioned_notifications.info' in vhost '/' due to timeout --- deps/rabbit/src/rabbit_amqqueue_process.erl | 40 ++++++++++++++------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index 2e18d49010c3..6ccab4288644 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -294,8 +294,12 @@ terminate(shutdown = R, State = #q{backing_queue = BQ, q = Q0}) -> rabbit_core_metrics:queue_deleted(qname(State)), terminate_shutdown( fun (BQS) -> - _ = update_state(stopped, Q0), - BQ:terminate(R, BQS) + case BQS of + undefined -> undefined; + _ -> + _ = update_state(stopped, Q0), + BQ:terminate(R, BQS) + end end, State); terminate({shutdown, missing_owner = Reason}, {{reply_to, From}, #q{q = Q} = State}) -> %% if the owner was missing then there will be no queue, so don't emit stats @@ -304,7 +308,13 @@ terminate({shutdown, missing_owner = Reason}, {{reply_to, From}, #q{q = Q} = Sta State1; terminate({shutdown, _} = R, State = #q{backing_queue = BQ}) -> rabbit_core_metrics:queue_deleted(qname(State)), - terminate_shutdown(fun (BQS) -> BQ:terminate(R, BQS) end, State); + terminate_shutdown( + fun (BQS) -> + case BQS of + undefined -> undefined; + _ -> BQ:terminate(R, BQS) + end + end, State); terminate(normal, State = #q{status = {terminated_by, auto_delete}}) -> %% auto_delete case %% To increase performance we want to avoid a mnesia_sync:sync call @@ -338,8 +348,11 @@ terminate_delete(EmitStats, Reason0, ReplyTo, missing_owner -> normal; Any -> Any end, - Len = BQ:len(BQS), - BQS1 = BQ:delete_and_terminate(Reason, BQS), + {Len, BQS1} = + case BQS of + undefined -> {0, undefined}; + _ -> {BQ:len(BQS), BQ:delete_and_terminate(Reason, BQS)} + end, if EmitStats -> rabbit_event:if_enabled(State, #q.stats_timer, fun() -> emit_stats(State) end); true -> ok @@ -374,14 +387,15 @@ terminate_shutdown(Fun, #q{status = Status} = State) -> fun stop_rate_timer/1, fun stop_expiry_timer/1, fun stop_ttl_timer/1]), - case BQS of - undefined -> State1; - _ -> QName = qname(State), - notify_decorators(shutdown, State), - [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || - {Ch, CTag, _, _, _, _, _, _} <- - rabbit_queue_consumers:all(Consumers)], - State1#q{backing_queue_state = Fun(BQS)} + try + QName = qname(State), + notify_decorators(shutdown, State), + [emit_consumer_deleted(Ch, CTag, QName, ActingUser) || + {Ch, CTag, _, _, _, _, _, _} <- + rabbit_queue_consumers:all(Consumers)], + State1#q{backing_queue_state = Fun(BQS)} + catch _:_ -> + State1 end. code_change(_OldVsn, State, _Extra) ->