|
89 | 89 | make_purge/0, |
90 | 90 | make_purge_nodes/1, |
91 | 91 | make_update_config/1, |
92 | | - make_garbage_collection/0 |
| 92 | + make_garbage_collection/0, |
| 93 | + |
| 94 | + exec_read/3 |
| 95 | + |
93 | 96 | ]). |
94 | 97 |
|
95 | 98 | -ifdef(TEST). |
@@ -2076,30 +2079,27 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}], |
2076 | 2079 | {CTag, CPid} = consumer_id(ConsumerKey, State), |
2077 | 2080 | {send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]}, |
2078 | 2081 | ?DELIVERY_SEND_MSG_OPTS}; |
2079 | | -delivery_effect(ConsumerKey, Msgs, |
2080 | | - #?STATE{cfg = #cfg{resource = QR}} = State) -> |
| 2082 | +delivery_effect(ConsumerKey, Msgs, #?STATE{} = State) -> |
2081 | 2083 | {CTag, CPid} = consumer_id(ConsumerKey, State), |
2082 | | - {RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> |
2083 | | - {[I | Acc], N+1} |
2084 | | - end, {[], 0}, Msgs), |
2085 | | - {log, RaftIdxs, |
2086 | | - fun (Commands) |
2087 | | - when length(Commands) < Num -> |
2088 | | - %% the mandatory length/1 guard is a bit :( |
2089 | | - rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b " |
2090 | | - "indexes ~w but only ~b were returned. " |
2091 | | - "This is most likely a stale read request " |
2092 | | - "and can be ignored", |
2093 | | - [rabbit_misc:rs(QR), CTag, Num, RaftIdxs, |
2094 | | - length(Commands)]), |
2095 | | - []; |
2096 | | - (Commands) -> |
2097 | | - DelMsgs = lists:zipwith( |
2098 | | - fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) -> |
2099 | | - {MsgId, {Header, get_msg(Cmd)}} |
2100 | | - end, Commands, Msgs), |
2101 | | - [{send_msg, CPid, {delivery, CTag, DelMsgs}, |
2102 | | - ?DELIVERY_SEND_MSG_OPTS}] |
| 2084 | + {RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) -> |
| 2085 | + {[I | Acc], N+1} |
| 2086 | + end, {[], 0}, Msgs), |
| 2087 | + {log_ext, RaftIdxs, |
| 2088 | + fun (ReadPlan) -> |
| 2089 | + case node(CPid) == node() of |
| 2090 | + true -> |
| 2091 | + [{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs}, |
| 2092 | + ?DELIVERY_SEND_MSG_OPTS}]; |
| 2093 | + false -> |
| 2094 | + %% if we got there we need to read the data on this node |
| 2095 | + %% and send it to the consumer pid as it isn't availble |
| 2096 | + %% locally |
| 2097 | + {DelMsgs, Flru} = exec_read(undefined, ReadPlan, Msgs), |
| 2098 | + %% we need to evict all cached items here |
| 2099 | + _ = ra_flru:evict_all(Flru), |
| 2100 | + [{send_msg, CPid, {delivery, CTag, DelMsgs}, |
| 2101 | + ?DELIVERY_SEND_MSG_OPTS}] |
| 2102 | + end |
2103 | 2103 | end, |
2104 | 2104 | {local, node(CPid)}}. |
2105 | 2105 |
|
@@ -3014,3 +3014,11 @@ incr_msg(Msg0, DelFailed, Anns) -> |
3014 | 3014 | false -> |
3015 | 3015 | Msg2 |
3016 | 3016 | end. |
| 3017 | + |
| 3018 | +exec_read(Flru0, ReadPlan, Msgs) -> |
| 3019 | + {Entries, Flru} = ra_log_read_plan:execute(ReadPlan, Flru0), |
| 3020 | + %% return a list in original order |
| 3021 | + {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) -> |
| 3022 | + Cmd = maps:get(Idx, Entries), |
| 3023 | + {MsgId, {Header, get_msg(Cmd)}} |
| 3024 | + end, Msgs), Flru}. |
0 commit comments