Skip to content

Commit ed793f2

Browse files
committed
Preserve order for missing deliveries (get_checked_out)
1 parent e329881 commit ed793f2

File tree

2 files changed

+46
-2
lines changed

2 files changed

+46
-2
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1091,7 +1091,7 @@ handle_aux(_, _, {get_checked_out, ConsumerKey, MsgIds}, Aux0, RaAux0) ->
10911091
{S, Acc}
10921092
end
10931093
end, {RaAux0, []}, maps:with(MsgIds, Checked)),
1094-
{reply, {ok, IdMsgs}, Aux0, RaState};
1094+
{reply, {ok, lists:reverse(IdMsgs)}, Aux0, RaState};
10951095
_ ->
10961096
{reply, {error, consumer_not_found}, Aux0, RaAux0}
10971097
end;

deps/rabbit/test/rabbit_fifo_int_SUITE.erl

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ all_tests() ->
4545
flow,
4646
test_queries,
4747
duplicate_delivery,
48-
usage
48+
usage,
49+
50+
missing_messages_order
4951
].
5052

5153
groups() ->
@@ -296,6 +298,48 @@ usage(Config) ->
296298
?assert(Use > 0.0),
297299
ok.
298300

301+
missing_messages_order(Config) ->
302+
ClusterName = ?config(cluster_name, Config),
303+
ServerId = ?config(node_id, Config),
304+
UId = ?config(uid, Config),
305+
Tag = UId,
306+
ok = start_cluster(ClusterName, [ServerId]),
307+
F1 = rabbit_fifo_client:init([ServerId]),
308+
{empty, F3} = rabbit_fifo_client:dequeue(ClusterName, Tag, settled, F1),
309+
310+
{ok, F4, []} = rabbit_fifo_client:enqueue(ClusterName, msg1, F3),
311+
{_, _, F5} = process_ra_events(receive_ra_events(1, 0), ClusterName, F4),
312+
313+
{ok, F6, []} = rabbit_fifo_client:enqueue(ClusterName, msg2, F5),
314+
{_, _, F7} = process_ra_events(receive_ra_events(1, 0), ClusterName, F6),
315+
316+
{ok, F8, []} = rabbit_fifo_client:enqueue(ClusterName, msg3, F7),
317+
{_, _, F9} = process_ra_events(receive_ra_events(1, 0), ClusterName, F8),
318+
319+
FC = rabbit_fifo_client:init([ServerId]),
320+
{ok, _, FC3} = rabbit_fifo_client:checkout(Tag, {simple_prefetch, 10}, #{}, FC),
321+
322+
{ok, F10, []} = rabbit_fifo_client:enqueue(ClusterName, msg4, F9),
323+
{_, E, F11}= process_ra_events(receive_ra_events(1, 0), ClusterName, F10),
324+
325+
receive
326+
{ra_event, Qname, Evt1} = Msg ->
327+
{ok, FC3b, Actions1} =
328+
rabbit_fifo_client:handle_ra_event(Qname, Qname, Evt1, FC3),
329+
[{deliver, _, true,
330+
[{_,
331+
missing_messages_order,0,false,msg1},
332+
{_,
333+
missing_messages_order,1,false,msg2},
334+
{_,
335+
missing_messages_order,2,false,msg3},
336+
{_,
337+
missing_messages_order,3,false,msg4}]}] = Actions1
338+
after ?TIMEOUT ->
339+
flush(),
340+
exit(await_delivery_timeout)
341+
end.
342+
299343
resends_lost_command(Config) ->
300344
ClusterName = ?config(cluster_name, Config),
301345
ServerId = ?config(node_id, Config),

0 commit comments

Comments
 (0)