Skip to content

Commit 04801bd

Browse files
authored
force to flush segments when recovering a message with last non-CleanShutdown.
If the broker shutdown before queue process call "rabbit_queue_index:flush", there might be only ?PUB and ack of a message in the queue's journal file. restart the broker, a crash would happen when recover_journal. to reproduce the issue by the following steps: 1. Disable flush in rabbit_queue_index, Just for reproducing the problem more easily. 2. delcare a durable queue and an exchange. [root@controller2 erlang]# date;python msg_test.py create_queue 192.168.1.66 'queue=q11;durable=true' Sat Sep 7 15:53:20 CST 2019 connent to 192.168.1.66 ok 2019-09-07 15:53:20.264 create_queue q11 2019-09-07 15:53:20.293 to close connection [root@controller2 erlang]# date;python msg_test.py create_exchange 192.168.1.66 'exchange=ex11;exchange_type=topic' Sat Sep 7 15:53:20 CST 2019 connent to 192.168.1.66 ok 2019-09-07 15:53:20.461 create_exchange ex11 topic 2019-09-07 15:53:20.463 to close connection [root@controller2 erlang]# date;python msg_test.py bind 192.168.1.66 'exchange=ex11;queue=q11;routing_key=exq11' Sat Sep 7 15:53:21 CST 2019 connent to 192.168.1.66 ok 2019-09-07 15:53:21.263 bind ex11 q11 r_key:exq11 2019-09-07 15:53:21.266 to close connection 3. publish a persistent message. [root@controller2 erlang]# date;python msg_test.py publish 192.168.1.66 'exchange=ex11;routing_key=exq11;msg="_unique_id:123";persistent=true;count=1' Sat Sep 7 15:53:34 CST 2019 connent to 192.168.1.66 ok 2019-09-07 15:53:34.642 publish ex11 r_key:exq11, 0 2019-09-07 15:53:35.643 to close connection 4. kill the broker. the system auto-restart the broker which will call recover_message with non-CleanShutdown. [root@controller2 erlang]# date;pkill -9 beam Sat Sep 7 15:54:15 CST 2019 5. create a consumer, attach it to the queue. It will receive and ack the last recovered message. [root@controller2 erlang]# date;python msg_test.py consume 192.168.1.66 'queue=q11;auto_ack=false' Sat Sep 7 15:55:47 CST 2019 connent to 192.168.1.66 ok 2019-09-07 15:55:47.529 [*] Waiting for messages from q11. To exit press CTRL+C 2019-09-07 15:55:47.529 [x] Received '_unique_id:0 aaaaaaa' ack:True delivery_tag:1 sleep:0 ^C2019-09-07 15:56:12.692 BaseException 2019-09-07 15:56:12.693 to close connection 6. now in the queue's journal file, there are only ?PUB and ack of the message. restart the broker, a crash should occur when calling recover_journal. [root@controller2 erlang]# date;systemctl restart rabbitmq-server Sat Sep 7 15:56:24 CST 2019 7. the crash information in the /var/log/rabbitmq/[email protected]: ** Reason for termination == ** {{case_clause,{{true,<<10,5,11,69,10,165,64,237,79,106,53,20,96,22,129,137,0,0,0,0,0,0,0,0,0,0,0,113>>,<<131,104,6,100,0,13,98,97,115,105,99,95,109,101,115,115,97,103,101,104,4,100,0,8,114,101,115,111,117,114,99,101,109,0,0,0,1,47,100,0,8,101,120,99,104,97,110,103,101,109,0,0,0,4,101,120,49,49,108,0,0,0,1,109,0,0,0,5,101,120,113,49,49,106,104,7,100,0,7,99,111,110,116,101,110,116,97,60,100,0,4,110,111,110,101,109,0,0,0,3,16,0,2,100,0,25,114,97,98,98,105,116,95,102,114,97,109,105,110,103,95,97,109,113,112,95,48,95,57,95,49,108,0,0,0,1,109,0,0,0,113,95,117,110,105,113,117,101,95,105,100,58,48,32,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,97,106,104,4,100,0,14,99,111,110,116,101,110,116,95,116,114,97,99,101,100,100,0,5,102,97,108,115,101,104,3,100,0,9,117,110,100,101,102,105,110,101,100,100,0,9,117,110,100,101,102,105,110,101,100,100,0,9,117,110,100,101,102,105,110,101,100,110,6,0,178,202,182,10,109,1,109,0,0,0,16,10,5,11,69,10,165,64,237,79,106,53,20,96,22,129,137,100,0,4,116,114,117,101>>},no_del,no_ack}},[{rabbit_queue_index,action_to_entry,3,[{file,"src/rabbit_queue_index.erl"},{line,838}]},{rabbit_queue_index,add_to_journal,3,[{file,"src/rabbit_queue_index.erl"},{line,814}]},{rabbit_queue_index,add_to_journal,3,[{file,"src/rabbit_queue_index.erl"},{line,805}]},{rabbit_queue_index,parse_journal_entries,2,[{file,"src/rabbit_queue_index.erl"},{line,958}]},{rabbit_queue_index,recover_journal,1,[{file,"src/rabbit_queue_index.erl"},{line,932}]},{rabbit_queue_index,init_dirty,3,[{file,"src/rabbit_queue_index.erl"},{line,622}]},{rabbit_variable_queue,init,6,[{file,"src/rabbit_variable_queue.erl"},{line,570}]},{rabbit_priority_queue,init,3,[{file,"src/rabbit_priority_queue.erl"},{line,151}]}]}
1 parent dc2552b commit 04801bd

File tree

1 file changed

+11
-9
lines changed

1 file changed

+11
-9
lines changed

src/rabbit_queue_index.erl

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -640,7 +640,8 @@ init_dirty(CleanShutdown, ContainsCheckFun, State) ->
640640
{{Segment = #segment { unacked = UnackedCount }, Dirty},
641641
UnackedBytes} =
642642
recover_segment(ContainsCheckFun, CleanShutdown,
643-
segment_find_or_new(Seg, Dir, Segments2)),
643+
segment_find_or_new(Seg, Dir, Segments2),
644+
State1#qistate.max_journal_entries),
644645
{segment_store(Segment, Segments2),
645646
CountAcc + UnackedCount,
646647
BytesAcc + UnackedBytes, DirtyCount + Dirty}
@@ -664,7 +665,7 @@ terminate(State = #qistate { journal_handle = JournalHdl,
664665
segments = undefined }}.
665666

666667
recover_segment(ContainsCheckFun, CleanShutdown,
667-
Segment = #segment { journal_entries = JEntries }) ->
668+
Segment = #segment { journal_entries = JEntries }, MaxJournal) ->
668669
{SegEntries, UnackedCount} = load_segment(false, Segment),
669670
{SegEntries1, UnackedCountDelta} =
670671
segment_plus_journal(SegEntries, JEntries),
@@ -673,7 +674,7 @@ recover_segment(ContainsCheckFun, CleanShutdown,
673674
{SegmentAndDirtyCount, Bytes}) ->
674675
{MsgOrId, MsgProps} = parse_pub_record_body(Bin, MsgBin),
675676
{recover_message(ContainsCheckFun(MsgOrId), CleanShutdown,
676-
Del, RelSeq, SegmentAndDirtyCount),
677+
Del, RelSeq, SegmentAndDirtyCount, MaxJournal),
677678
Bytes + case IsPersistent of
678679
true -> MsgProps#message_properties.size;
679680
false -> 0
@@ -682,15 +683,16 @@ recover_segment(ContainsCheckFun, CleanShutdown,
682683
{{Segment #segment { unacked = UnackedCount + UnackedCountDelta }, 0}, 0},
683684
SegEntries1).
684685

685-
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount) ->
686+
recover_message( true, true, _Del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
686687
SegmentAndDirtyCount;
687-
recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount) ->
688+
recover_message( true, false, del, _RelSeq, SegmentAndDirtyCount, _MaxJournal) ->
688689
SegmentAndDirtyCount;
689-
recover_message( true, false, no_del, RelSeq, {Segment, DirtyCount}) ->
690-
{add_to_journal(RelSeq, del, Segment), DirtyCount + 1};
691-
recover_message(false, _, del, RelSeq, {Segment, DirtyCount}) ->
690+
recover_message( true, false, no_del, RelSeq, {Segment, _DirtyCount}, MaxJournal) ->
691+
%% force to flush the segment
692+
{add_to_journal(RelSeq, del, Segment), MaxJournal + 1};
693+
recover_message(false, _, del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
692694
{add_to_journal(RelSeq, ack, Segment), DirtyCount + 1};
693-
recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}) ->
695+
recover_message(false, _, no_del, RelSeq, {Segment, DirtyCount}, _MaxJournal) ->
694696
{add_to_journal(RelSeq, ack,
695697
add_to_journal(RelSeq, del, Segment)),
696698
DirtyCount + 2}.

0 commit comments

Comments
 (0)