Skip to content

Commit 3aa01e5

Browse files
committed
wip
1 parent c78bc8a commit 3aa01e5

File tree

3 files changed

+70
-24
lines changed

3 files changed

+70
-24
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 58 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,10 @@
8989
make_purge/0,
9090
make_purge_nodes/1,
9191
make_update_config/1,
92-
make_garbage_collection/0
92+
make_garbage_collection/0,
93+
94+
exec_read/3
95+
9396
]).
9497

9598
-ifdef(TEST).
@@ -2058,31 +2061,57 @@ delivery_effect(ConsumerKey, [{MsgId, ?MSG(Idx, Header)}],
20582061
{send_msg, CPid, {delivery, CTag, [{MsgId, {Header, RawMsg}}]},
20592062
?DELIVERY_SEND_MSG_OPTS};
20602063
delivery_effect(ConsumerKey, Msgs,
2061-
#?STATE{cfg = #cfg{resource = QR}} = State) ->
2064+
#?STATE{cfg = #cfg{resource = _QR}} = State) ->
20622065
{CTag, CPid} = consumer_id(ConsumerKey, State),
2063-
{RaftIdxs, Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
2066+
{RaftIdxs, _Num} = lists:foldr(fun ({_, ?MSG(I, _)}, {Acc, N}) ->
20642067
{[I | Acc], N+1}
20652068
end, {[], 0}, Msgs),
2066-
{log, RaftIdxs,
2067-
fun (Commands)
2068-
when length(Commands) < Num ->
2069-
%% the mandatory length/1 guard is a bit :(
2070-
rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2071-
"indexes ~w but only ~b were returned. "
2072-
"This is most likely a stale read request "
2073-
"and can be ignored",
2074-
[rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2075-
length(Commands)]),
2076-
[];
2077-
(Commands) ->
2078-
DelMsgs = lists:zipwith(
2079-
fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
2080-
{MsgId, {Header, get_msg(Cmd)}}
2081-
end, Commands, Msgs),
2082-
[{send_msg, CPid, {delivery, CTag, DelMsgs},
2069+
{log_ext, RaftIdxs,
2070+
fun
2071+
% (Commands)
2072+
% when length(Commands) < Num ->
2073+
% %% the mandatory length/1 guard is a bit :(
2074+
% rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2075+
% "indexes ~w but only ~b were returned. "
2076+
% "This is most likely a stale read request "
2077+
% "and can be ignored",
2078+
% [rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2079+
% length(Commands)]),
2080+
% [];
2081+
(ReadPlan) ->
2082+
% Fun = fun (Flru0) ->
2083+
% {Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0),
2084+
% %% pretend entries is a map
2085+
% {lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
2086+
% {_, _, Cmd} = maps:get(Idx, Entries),
2087+
% %% hacky
2088+
% {MsgId, {Header, get_msg(element(3, Cmd))}}
2089+
% end, Msgs), Flru}
2090+
% end,
2091+
[{send_msg, CPid, {delivery, CTag, ReadPlan, Msgs},
20832092
?DELIVERY_SEND_MSG_OPTS}]
20842093
end,
20852094
{local, node(CPid)}}.
2095+
% {log, RaftIdxs,
2096+
% fun (Commands)
2097+
% when length(Commands) < Num ->
2098+
% %% the mandatory length/1 guard is a bit :(
2099+
% rabbit_log:info("~ts: requested read consumer tag '~ts' of ~b "
2100+
% "indexes ~w but only ~b were returned. "
2101+
% "This is most likely a stale read request "
2102+
% "and can be ignored",
2103+
% [rabbit_misc:rs(QR), CTag, Num, RaftIdxs,
2104+
% length(Commands)]),
2105+
% [];
2106+
% (Commands) ->
2107+
% DelMsgs = lists:zipwith(
2108+
% fun (Cmd, {MsgId, ?MSG(_Idx, Header)}) ->
2109+
% {MsgId, {Header, get_msg(Cmd)}}
2110+
% end, Commands, Msgs),
2111+
% [{send_msg, CPid, {delivery, CTag, DelMsgs},
2112+
% ?DELIVERY_SEND_MSG_OPTS}]
2113+
% end,
2114+
% {local, node(CPid)}}.
20862115

20872116
reply_log_effect(RaftIdx, MsgId, Header, Ready, From) ->
20882117
{log, [RaftIdx],
@@ -2995,3 +3024,12 @@ incr_msg(Msg0, DelFailed, Anns) ->
29953024
false ->
29963025
Msg2
29973026
end.
3027+
3028+
exec_read(Flru0, ReadPlan, Msgs) ->
3029+
{Entries, Flru} = ra_log:execute_read(ReadPlan, Flru0),
3030+
%% pretend entries is a map
3031+
{lists:map(fun ({MsgId, ?MSG(Idx, Header)}) ->
3032+
{_, _, Cmd} = maps:get(Idx, Entries),
3033+
%% hacky
3034+
{MsgId, {Header, get_msg(element(3, Cmd))}}
3035+
end, Msgs), Flru}.

deps/rabbit/src/rabbit_fifo_client.erl

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@
6969
pending = #{} :: #{seq() =>
7070
{term(), rabbit_fifo:command()}},
7171
consumers = #{} :: #{rabbit_types:ctag() => #consumer{}},
72-
timer_state :: term()
72+
timer_state :: term(),
73+
cached_segments :: undefined | ra_flru:state()
7374
}).
7475

7576
-opaque state() :: #state{}.
@@ -633,7 +634,8 @@ handle_ra_event(QName, From, {applied, Seqs},
633634
_ ->
634635
{ok, State1, Actions}
635636
end;
636-
handle_ra_event(QName, From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
637+
handle_ra_event(QName, From, {machine, Del}, State0)
638+
when element(1, Del) == delivery ->
637639
handle_delivery(QName, From, Del, State0);
638640
handle_ra_event(_QName, _From, {machine, Action}, State)
639641
when element(1, Action) =:= credit_reply orelse
@@ -835,7 +837,12 @@ handle_delivery(_QName, _Leader, {delivery, Tag, [_ | _] = IdMsgs},
835837
%% we should return all messages.
836838
MsgIntIds = [Id || {Id, _} <- IdMsgs],
837839
{State1, Deliveries} = return(Tag, MsgIntIds, State0),
838-
{ok, State1, Deliveries}.
840+
{ok, State1, Deliveries};
841+
handle_delivery(QName, Leader, {delivery, Tag, ReadState, Msgs},
842+
#state{cached_segments = Cached0} = State) ->
843+
{MsgIds, Cached} = rabbit_fifo:exec_read(Cached0, ReadState, Msgs),
844+
handle_delivery(QName, Leader, {delivery, Tag, MsgIds},
845+
State#state{cached_segments = Cached}).
839846

840847
transform_msgs(QName, QRef, Msgs) ->
841848
lists:map(

rabbitmq-components.mk

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,8 @@ dep_khepri = hex 0.16.0
5050
dep_khepri_mnesia_migration = hex 0.7.0
5151
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.4
5252
dep_prometheus = hex 4.11.0
53-
dep_ra = hex 2.14.0
53+
dep_ra = git https://github.com/rabbitmq/ra partial-read-api
54+
# dep_ra = hex 2.14.0
5455
dep_ranch = hex 2.1.0
5556
dep_recon = hex 2.5.6
5657
dep_redbug = hex 2.0.7

0 commit comments

Comments
 (0)