diff --git a/deps/rabbit/src/rabbit_fifo.erl b/deps/rabbit/src/rabbit_fifo.erl index 0b7ce0a8c43a..14824e1193f2 100644 --- a/deps/rabbit/src/rabbit_fifo.erl +++ b/deps/rabbit/src/rabbit_fifo.erl @@ -416,7 +416,8 @@ apply(#{index := Index, {State, {dequeue, empty}, Effects} end end; -apply(#{index := _Idx} = Meta, +apply(#{index := _Idx, + machine_version := Vsn} = Meta, #checkout{spec = Spec, consumer_id = ConsumerId}, State0) when Spec == cancel orelse @@ -425,19 +426,19 @@ apply(#{index := _Idx} = Meta, {ok, ConsumerKey} -> {State1, Effects1} = activate_next_consumer( cancel_consumer(Meta, ConsumerKey, State0, [], - Spec)), + Spec), Vsn), Reply = {ok, consumer_cancel_info(ConsumerKey, State1)}, {State, _, Effects} = checkout(Meta, State0, State1, Effects1), {State, Reply, Effects}; error -> {State0, {error, consumer_not_found}, []} end; -apply(#{index := Idx} = Meta, +apply(#{index := Idx, + machine_version := Vsn} = Meta, #checkout{spec = Spec0, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, State0) -> - %% might be better to check machine_version - IsV4 = tuple_size(Spec0) == 2, + IsV4 = Vsn >= 4, %% normalise spec format Spec = case Spec0 of {_, _} -> @@ -461,7 +462,7 @@ apply(#{index := Idx} = Meta, end, {Consumer, State1} = update_consumer(Meta, ConsumerKey, ConsumerId, ConsumerMeta, Spec, Priority, State0), - {State2, Effs} = activate_next_consumer(State1, []), + {State2, Effs} = activate_next_consumer(State1, [], Vsn), #consumer{checked_out = Checked, credit = Credit, delivery_count = DeliveryCount, @@ -472,7 +473,7 @@ apply(#{index := Idx} = Meta, credit => Credit, key => ConsumerKey, delivery_count => DeliveryCount, - is_active => is_active(ConsumerKey, State2), + is_active => is_active(ConsumerKey, State2, Vsn), num_checked_out => map_size(Checked)}}, checkout(Meta, State0, State2, [{monitor, process, Pid} | Effs], Reply); apply(#{index := Index}, #purge{}, @@ -553,7 +554,7 @@ apply(#{machine_version := Vsn, %% select a new consumer from the waiting queue and run a checkout State2 = State1#?STATE{waiting_consumers = WaitingConsumers}, - {State, Effects1} = activate_next_consumer(State2, Effects0), + {State, Effects1} = activate_next_consumer(State2, Effects0, Vsn), %% mark any enquers as suspected Enqs = maps:map(fun(P, E) when node(P) =:= Node -> @@ -602,8 +603,9 @@ apply(#{machine_version := Vsn, Effects = [{monitor, node, Node} | Effects1], checkout(Meta, State0, State#?STATE{enqueuers = Enqs, last_active = Ts}, Effects); -apply(Meta, {down, Pid, _Info}, State0) -> - {State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0)), +apply(#{machine_version := Vsn} = Meta, {down, Pid, _Info}, State0) -> + {State1, Effects1} = activate_next_consumer(handle_down(Meta, Pid, State0), + Vsn), checkout(Meta, State0, State1, Effects1); apply(#{machine_version := Vsn} = Meta, {nodeup, Node}, @@ -639,7 +641,7 @@ apply(#{machine_version := Vsn} = Meta, Waiting = update_waiting_consumer_status(Node, State1, up), State2 = State1#?STATE{enqueuers = Enqs1, waiting_consumers = Waiting}, - {State, Effects} = activate_next_consumer(State2, Effects1), + {State, Effects} = activate_next_consumer(State2, Effects1, Vsn), checkout(Meta, State0, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; @@ -872,7 +874,7 @@ overview(#?STATE{consumers = Cons, msg_ttl => Cfg#cfg.msg_ttl, delivery_limit => Cfg#cfg.delivery_limit }, - SacOverview = case active_consumer(Cons) of + SacOverview = case active_consumer(Cons, version()) of {SacConsumerKey, SacCon} -> SacConsumerId = consumer_id(SacCon), NumWaiting = length(WaitingConsumers), @@ -1305,7 +1307,7 @@ query_consumers(#?STATE{consumers = Consumers, query_single_active_consumer(#?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers}) -> - case active_consumer(Consumers) of + case active_consumer(Consumers, version()) of undefined -> {error, no_value}; {_CKey, ?CONSUMER_TAG_PID(Tag, Pid)} -> @@ -1476,15 +1478,15 @@ cancel_consumer0(Meta, ConsumerKey, {S0, Effects0} end. -activate_next_consumer({State, Effects}) -> - activate_next_consumer(State, Effects). +activate_next_consumer({State, Effects}, MacVsn) -> + activate_next_consumer(State, Effects, MacVsn). activate_next_consumer(#?STATE{cfg = #cfg{consumer_strategy = competing}} = State, - Effects) -> + Effects, _MacVsn) -> {State, Effects}; activate_next_consumer(#?STATE{consumers = Cons0, waiting_consumers = Waiting0} = State0, - Effects0) -> + Effects0, MacVsn) -> %% invariant, the waiting list always need to be sorted by consumers that are %% up - then by priority NextConsumer = @@ -1495,7 +1497,7 @@ activate_next_consumer(#?STATE{consumers = Cons0, undefined end, - case {active_consumer(Cons0), NextConsumer} of + case {active_consumer(Cons0, MacVsn), NextConsumer} of {undefined, {NextCKey, #consumer{cfg = NextCCfg} = NextC}} -> Remaining = tl(Waiting0), %% TODO: can this happen? @@ -1564,17 +1566,19 @@ active_consumer({CKey, #consumer{status = Status} = Consumer, _I}) active_consumer({_CKey, #consumer{status = _}, I}) -> active_consumer(maps:next(I)); active_consumer(none) -> - undefined; -active_consumer(M) when is_map(M) -> - I = maps:iterator(M), + undefined. + +active_consumer(M, MacVsn) when is_map(M) -> + I = rabbit_fifo_maps:iterator(M, MacVsn), active_consumer(maps:next(I)). -is_active(_ConsumerKey, #?STATE{cfg = #cfg{consumer_strategy = competing}}) -> +is_active(_ConsumerKey, + #?STATE{cfg = #cfg{consumer_strategy = competing}}, _MacVsn) -> %% all competing consumers are potentially active true; is_active(ConsumerKey, #?STATE{cfg = #cfg{consumer_strategy = single_active}, - consumers = Consumers}) -> - ConsumerKey == active_consumer(Consumers). + consumers = Consumers}, MacVsn) -> + ConsumerKey == active_consumer(Consumers, MacVsn). maybe_return_all(#{system_time := Ts} = Meta, ConsumerKey, #consumer{cfg = CCfg} = Consumer, S0, @@ -1835,26 +1839,26 @@ increase_credit(#consumer{cfg = #consumer_cfg{credit_mode = increase_credit(#consumer{credit = Current}, Credit) -> Current + Credit. -complete_and_checkout(#{} = Meta, MsgIds, ConsumerKey, +complete_and_checkout(#{machine_version := Vsn} = Meta, MsgIds, ConsumerKey, #consumer{} = Con0, Effects0, State0) -> State1 = complete(Meta, ConsumerKey, MsgIds, Con0, State0), %% a completion could have removed the active/quiescing consumer - Effects1 = add_active_effect(Con0, State1, Effects0), - {State2, Effects2} = activate_next_consumer(State1, Effects1), + Effects1 = add_active_effect(Con0, State1, Effects0, Vsn), + {State2, Effects2} = activate_next_consumer(State1, Effects1, Vsn), checkout(Meta, State0, State2, Effects2). add_active_effect(#consumer{status = quiescing} = Consumer, #?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Consumers} = State, - Effects) -> - case active_consumer(Consumers) of + Effects, MacVsn) -> + case active_consumer(Consumers, MacVsn) of undefined -> consumer_update_active_effects(State, Consumer, false, waiting, Effects); _ -> Effects end; -add_active_effect(_, _, Effects) -> +add_active_effect(_, _, Effects, _) -> Effects. cancel_consumer_effects(ConsumerId, @@ -2340,8 +2344,8 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta, delivery_count = DeliveryCount} end, {Consumer, update_or_remove_con(Meta, ConsumerKey, Consumer, State0)}; -update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta, - {Life, Mode} = Spec, Priority, +update_consumer(#{machine_version := Vsn} = Meta, ConsumerKey, {Tag, Pid}, + ConsumerMeta, {Life, Mode} = Spec, Priority, #?STATE{cfg = #cfg{consumer_strategy = single_active}, consumers = Cons0, waiting_consumers = Waiting0, @@ -2349,7 +2353,7 @@ update_consumer(Meta, ConsumerKey, {Tag, Pid}, ConsumerMeta, %% if it is the current active consumer, just update %% if it is a cancelled active consumer, add to waiting unless it is the only %% one, then merge - case active_consumer(Cons0) of + case active_consumer(Cons0, Vsn) of {ConsumerKey, #consumer{status = up} = Consumer0} -> Consumer = merge_consumer(Meta, Consumer0, ConsumerMeta, Spec, Priority), diff --git a/deps/rabbit/src/rabbit_fifo_maps.erl b/deps/rabbit/src/rabbit_fifo_maps.erl index ccaac64c71c2..35ccf19c86c7 100644 --- a/deps/rabbit/src/rabbit_fifo_maps.erl +++ b/deps/rabbit/src/rabbit_fifo_maps.erl @@ -8,7 +8,8 @@ -module(rabbit_fifo_maps). -export([keys/2, - fold/4]). + fold/4, + iterator/2]). -spec keys(Map, ra_machine:version()) -> Keys when Map :: #{Key => _}, @@ -29,13 +30,18 @@ keys(Map, Vsn) -> AccIn :: Init | AccOut, Map :: #{Key => Value}. fold(Fun, Init, Map, Vsn) -> - Iterable = case is_deterministic(Vsn) of - true -> - maps:iterator(Map, ordered); - false -> - Map - end, - maps:fold(Fun, Init, Iterable). + maps:fold(Fun, Init, iterator(Map, Vsn)). + +-spec iterator(Map, ra_machine:version()) -> Iterator when + Map :: #{Key => Value}, + Iterator :: maps:iterator(Key, Value). +iterator(Map, Vsn) -> + case is_deterministic(Vsn) of + true -> + maps:iterator(Map, ordered); + false -> + maps:iterator(Map) + end. is_deterministic(Vsn) when is_integer(Vsn) -> Vsn > 5.