@@ -2883,25 +2883,30 @@ convert(Meta, 7, To, State) ->
28832883 convert (Meta , 8 , To , convert_v7_to_v8 (Meta , State )).
28842884
28852885smallest_raft_index (#? STATE {messages = Messages ,
2886- dlx = #? DLX {discards = Discards }} = State ) ->
2887- SmallestDlxRaIdx = lqueue :fold (fun (? TUPLE (_ , Msg ), Acc ) ->
2888- min (get_msg_idx (Msg ), Acc )
2889- end , undefined , Discards ),
2890- SmallestMsgsRaIdx = rabbit_fifo_pq :get_lowest_index (Messages ),
2891- % % scan consumers and returns queue here instead
2892- smallest_checked_out (State , min (SmallestDlxRaIdx , SmallestMsgsRaIdx )).
2893-
2894- smallest_checked_out (#? STATE {returns = Returns ,
2895- consumers = Consumers }, Min ) ->
2896- SmallestSoFar = lqueue :fold (fun (Msg , Acc ) ->
2897- min (get_msg_idx (Msg ), Acc )
2898- end , Min , Returns ),
2899- maps :fold (fun (_Cid , # consumer {checked_out = Ch }, Acc0 ) ->
2900- maps :fold (
2901- fun (_MsgId , Msg , Acc ) ->
2902- min (get_msg_idx (Msg ), Acc )
2903- end , Acc0 , Ch )
2904- end , SmallestSoFar , Consumers ).
2886+ returns = Returns ,
2887+ consumers = Consumers ,
2888+ dlx = #? DLX {consumer = DlxConsumer ,
2889+ discards = Discards }}) ->
2890+ Min0 = rabbit_fifo_pq :get_lowest_index (Messages ),
2891+ Min1 = lqueue :fold (fun (Msg , Acc ) ->
2892+ min (get_msg_idx (Msg ), Acc )
2893+ end , Min0 , Returns ),
2894+ Min2 = maps :fold (fun (_Cid , # consumer {checked_out = Ch }, Acc0 ) ->
2895+ maps :fold (fun (_MsgId , Msg , Acc ) ->
2896+ min (get_msg_idx (Msg ), Acc )
2897+ end , Acc0 , Ch )
2898+ end , Min1 , Consumers ),
2899+ Min = lqueue :fold (fun (? TUPLE (_Reason , Msg ), Acc ) ->
2900+ min (get_msg_idx (Msg ), Acc )
2901+ end , Min2 , Discards ),
2902+ case DlxConsumer of
2903+ undefined ->
2904+ Min ;
2905+ # dlx_consumer {checked_out = Checked } ->
2906+ maps :fold (fun (_MsgId , ? TUPLE (_Reason , Msg ), Acc ) ->
2907+ min (get_msg_idx (Msg ), Acc )
2908+ end , Min , Checked )
2909+ end .
29052910
29062911make_requeue (ConsumerKey , Notify , [{MsgId , Idx , Header , Msg }], Acc ) ->
29072912 lists :reverse ([{append ,
0 commit comments