|
12 | 12 | publish/5, publish_delivered/4, |
13 | 13 | discard/3, drain_confirmed/1, |
14 | 14 | dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, |
15 | | - ackfold/4, fold/3, len/1, is_empty/1, depth/1, |
| 15 | + ackfold/4, len/1, is_empty/1, depth/1, |
16 | 16 | update_rates/1, needs_timeout/1, timeout/1, |
17 | 17 | handle_pre_hibernate/1, resume/1, msg_rates/1, |
18 | 18 | info/2, invoke/3, is_duplicate/2, set_queue_mode/2, |
@@ -649,14 +649,6 @@ ackfold(MsgFun, Acc, State, AckTags) -> |
649 | 649 | end, {Acc, State}, AckTags), |
650 | 650 | {AccN, a(StateN)}. |
651 | 651 |
|
652 | | -%% @todo I think this is never used. Was CMQ. |
653 | | -fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> |
654 | | - {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, |
655 | | - [msg_iterator(State), |
656 | | - disk_ack_iterator(State), |
657 | | - ram_ack_iterator(State)]), |
658 | | - ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). |
659 | | - |
660 | 652 | len(#vqstate { len = Len }) -> Len. |
661 | 653 |
|
662 | 654 | is_empty(State) -> 0 == len(State). |
@@ -1985,89 +1977,6 @@ msg_from_pending_ack(SeqId, State) -> |
1985 | 1977 | delta_limit(?BLANK_DELTA_PATTERN(_)) -> undefined; |
1986 | 1978 | delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. |
1987 | 1979 |
|
1988 | | -%%---------------------------------------------------------------------------- |
1989 | | -%% Iterator |
1990 | | -%%---------------------------------------------------------------------------- |
1991 | | - |
1992 | | -ram_ack_iterator(State) -> |
1993 | | - {ack, maps:iterator(State#vqstate.ram_pending_ack)}. |
1994 | | - |
1995 | | -disk_ack_iterator(State) -> |
1996 | | - {ack, maps:iterator(State#vqstate.disk_pending_ack)}. |
1997 | | - |
1998 | | -msg_iterator(State) -> istate(start, State). |
1999 | | - |
2000 | | -istate(start, State) -> {q3, State#vqstate.q3, State}; |
2001 | | -istate(q3, State) -> {delta, State#vqstate.delta, State}; |
2002 | | -istate(delta, _State) -> done. |
2003 | | - |
2004 | | -next({ack, It}, IndexState) -> |
2005 | | - case maps:next(It) of |
2006 | | - none -> {empty, IndexState}; |
2007 | | - {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, |
2008 | | - {value, MsgStatus, true, Next, IndexState} |
2009 | | - end; |
2010 | | -next(done, IndexState) -> {empty, IndexState}; |
2011 | | -next({delta, #delta{start_seq_id = SeqId, |
2012 | | - end_seq_id = SeqId}, State}, IndexState) -> |
2013 | | - next(istate(delta, State), IndexState); |
2014 | | -next({delta, #delta{start_seq_id = SeqId, |
2015 | | - end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> |
2016 | | - SeqIdB = rabbit_classic_queue_index_v2:next_segment_boundary(SeqId), |
2017 | | - %% It may make sense to limit this based on rate. But this |
2018 | | - %% is not called outside of CMQs so I will leave it alone |
2019 | | - %% for the time being. |
2020 | | - SeqId1 = lists:min([SeqIdB, |
2021 | | - %% We must limit the number of messages read at once |
2022 | | - %% otherwise the queue will attempt to read up to segment_entry_count() |
2023 | | - %% messages from the index each time. The value |
2024 | | - %% chosen here is arbitrary. |
2025 | | - SeqId + 2048, |
2026 | | - SeqIdEnd]), |
2027 | | - {List, IndexState1} = rabbit_classic_queue_index_v2:read(SeqId, SeqId1, IndexState), |
2028 | | - next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); |
2029 | | -next({delta, Delta, [], State}, IndexState) -> |
2030 | | - next({delta, Delta, State}, IndexState); |
2031 | | -next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> |
2032 | | - case is_msg_in_pending_acks(SeqId, State) of |
2033 | | - false -> Next = {delta, Delta, Rest, State}, |
2034 | | - {value, beta_msg_status(M), false, Next, IndexState}; |
2035 | | - true -> next({delta, Delta, Rest, State}, IndexState) |
2036 | | - end; |
2037 | | -next({Key, Q, State}, IndexState) -> |
2038 | | - case ?QUEUE:out(Q) of |
2039 | | - {empty, _Q} -> next(istate(Key, State), IndexState); |
2040 | | - {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, |
2041 | | - {value, MsgStatus, false, Next, IndexState} |
2042 | | - end. |
2043 | | - |
2044 | | -inext(It, {Its, IndexState}) -> |
2045 | | - case next(It, IndexState) of |
2046 | | - {empty, IndexState1} -> |
2047 | | - {Its, IndexState1}; |
2048 | | - {value, MsgStatus1, Unacked, It1, IndexState1} -> |
2049 | | - {[{MsgStatus1, Unacked, It1} | Its], IndexState1} |
2050 | | - end. |
2051 | | - |
2052 | | -ifold(_Fun, Acc, [], State0) -> |
2053 | | - {Acc, State0}; |
2054 | | -ifold(Fun, Acc, Its0, State0) -> |
2055 | | - [{MsgStatus, Unacked, It} | Rest] = |
2056 | | - lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, |
2057 | | - {#msg_status{seq_id = SeqId2}, _, _}) -> |
2058 | | - SeqId1 =< SeqId2 |
2059 | | - end, Its0), |
2060 | | - {Msg, State1} = read_msg(MsgStatus, State0), |
2061 | | - case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of |
2062 | | - {stop, Acc1} -> |
2063 | | - {Acc1, State1}; |
2064 | | - {cont, Acc1} -> |
2065 | | - IndexState0 = State1#vqstate.index_state, |
2066 | | - {Its1, IndexState1} = inext(It, {Rest, IndexState0}), |
2067 | | - State2 = State1#vqstate{index_state = IndexState1}, |
2068 | | - ifold(Fun, Acc1, Its1, State2) |
2069 | | - end. |
2070 | | - |
2071 | 1980 | %%---------------------------------------------------------------------------- |
2072 | 1981 | %% Phase changes |
2073 | 1982 | %%---------------------------------------------------------------------------- |
|
0 commit comments