Skip to content

Commit e7d08de

Browse files
committed
QQ: allow unlimited returns.
Base delivery-limit on the delivery-count _not_ the acquired_count. This allows messages to be explicitly returned to the queue an unlimited number of times. The delivery-limit will only be reached for returns triggered by the consuming process crashing with any reason other than noconnection.
1 parent 6b35f95 commit e7d08de

File tree

4 files changed

+192
-142
lines changed

4 files changed

+192
-142
lines changed

deps/rabbit/src/rabbit_fifo.erl

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -303,20 +303,20 @@ apply_(Meta, #return{consumer_key = ConsumerKey,
303303
{State, ok}
304304
end;
305305
apply_(Meta, #modify{consumer_key = ConsumerKey,
306-
delivery_failed = DelFailed,
307-
undeliverable_here = Undel,
306+
delivery_failed = DeliveryFailed,
307+
undeliverable_here = UndelHere,
308308
annotations = Anns,
309309
msg_ids = MsgIds},
310310
#?STATE{consumers = Cons} = State) ->
311311
case find_consumer(ConsumerKey, Cons) of
312312
{ActualConsumerKey, #consumer{checked_out = Checked}}
313-
when Undel == false ->
314-
return(Meta, ActualConsumerKey, MsgIds, DelFailed,
313+
when UndelHere == false ->
314+
return(Meta, ActualConsumerKey, MsgIds, DeliveryFailed,
315315
Anns, Checked, [], State);
316316
{ActualConsumerKey, #consumer{} = Con}
317-
when Undel == true ->
317+
when UndelHere == true ->
318318
discard(Meta, MsgIds, ActualConsumerKey,
319-
Con, DelFailed, Anns, State);
319+
Con, DeliveryFailed, Anns, State);
320320
_ ->
321321
{State, ok}
322322
end;
@@ -500,7 +500,10 @@ apply_(#{system_time := Ts} = Meta,
500500
%% and checked out messages should be returned
501501
Effs = consumer_update_active_effects(
502502
S0, C0, false, suspected_down, E0),
503-
{St, Effs1} = return_all(Meta, S0, Effs, CKey, C0, true),
503+
%% TODO: set a timer instead of reaturn all here to allow
504+
%% a disconnected node a configurable bit of time to be
505+
%% reconnected
506+
{St, Effs1} = return_all(Meta, S0, Effs, CKey, C0, false),
504507
%% if the consumer was cancelled there is a chance it got
505508
%% removed when returning hence we need to be defensive here
506509
Waiting = case St#?STATE.consumers of
@@ -550,7 +553,7 @@ apply_(#{system_time := Ts} = Meta,
550553
status = up} = C0,
551554
{St0, Eff}) when node(P) =:= Node ->
552555
C = C0#consumer{status = suspected_down},
553-
{St, Eff0} = return_all(Meta, St0, Eff, CKey, C, true),
556+
{St, Eff0} = return_all(Meta, St0, Eff, CKey, C, false),
554557
Eff1 = consumer_update_active_effects(St, C, false,
555558
suspected_down, Eff0),
556559
{St, Eff1};
@@ -1902,7 +1905,7 @@ annotate_msg(Header, Msg0) ->
19021905
Msg0
19031906
end.
19041907

1905-
return_one(Meta, MsgId, Msg0, DelivFailed, Anns,
1908+
return_one(Meta, MsgId, Msg0, DeliveryFailed, Anns,
19061909
#?STATE{returns = Returns,
19071910
consumers = Consumers,
19081911
dlx = DlxState0,
@@ -1911,12 +1914,14 @@ return_one(Meta, MsgId, Msg0, DelivFailed, Anns,
19111914
dead_letter_handler = DLH}} = State0,
19121915
Effects0, ConsumerKey) ->
19131916
#consumer{checked_out = Checked0} = Con0 = maps:get(ConsumerKey, Consumers),
1914-
Msg = incr_msg(Msg0, DelivFailed, Anns),
1917+
Msg = incr_msg_headers(Msg0, DeliveryFailed, Anns),
19151918
Header = get_msg_header(Msg),
19161919
%% TODO: do not use acquired count here as that includes all deliberate
19171920
%% returns, use delivery_count header instead
1918-
case get_header(acquired_count, Header) of
1919-
AcquiredCount when AcquiredCount > DeliveryLimit ->
1921+
case get_header(delivery_count, Header) of
1922+
DeliveryCount
1923+
when is_integer(DeliveryCount) andalso
1924+
DeliveryCount > DeliveryLimit ->
19201925
{DlxState, RetainedBytes, DlxEffects} =
19211926
discard_or_dead_letter([Msg], delivery_limit, DLH, DlxState0),
19221927
%% subtract retained bytes as complete/6 will add them on irrespective
@@ -1939,10 +1944,10 @@ return_one(Meta, MsgId, Msg0, DelivFailed, Anns,
19391944
end.
19401945

19411946
return_all(Meta, #?STATE{consumers = Cons} = State0, Effects0, ConsumerKey,
1942-
#consumer{checked_out = Checked} = Con, DelivFailed) ->
1947+
#consumer{checked_out = Checked} = Con, DeliveryFailed) ->
19431948
State = State0#?STATE{consumers = Cons#{ConsumerKey => Con}},
19441949
lists:foldl(fun ({MsgId, Msg}, {S, E}) ->
1945-
return_one(Meta, MsgId, Msg, DelivFailed, #{},
1950+
return_one(Meta, MsgId, Msg, DeliveryFailed, #{},
19461951
S, E, ConsumerKey)
19471952
end, {State, Effects0}, lists:sort(maps:to_list(Checked))).
19481953

@@ -3024,7 +3029,7 @@ discard(Meta, MsgIds, ConsumerKey,
30243029
undefined ->
30253030
false;
30263031
Msg0 ->
3027-
{true, incr_msg(Msg0, DelFailed, Anns)}
3032+
{true, incr_msg_headers(Msg0, DelFailed, Anns)}
30283033
end
30293034
end, MsgIds),
30303035
{DlxState, RetainedBytes, Effects} =
@@ -3033,7 +3038,7 @@ discard(Meta, MsgIds, ConsumerKey,
30333038
discarded_bytes = DiscardedBytes0 - RetainedBytes},
30343039
complete_and_checkout(Meta, MsgIds, ConsumerKey, Con, Effects, State).
30353040

3036-
incr_msg(Msg0, DelFailed, Anns) ->
3041+
incr_msg_headers(Msg0, DeliveryFailed, Anns) ->
30373042
Msg1 = update_msg_header(acquired_count, fun incr/1, 1, Msg0),
30383043
Msg2 = case map_size(Anns) > 0 of
30393044
true ->
@@ -3044,7 +3049,7 @@ incr_msg(Msg0, DelFailed, Anns) ->
30443049
false ->
30453050
Msg1
30463051
end,
3047-
case DelFailed of
3052+
case DeliveryFailed of
30483053
true ->
30493054
update_msg_header(delivery_count, fun incr/1, 1, Msg2);
30503055
false ->

0 commit comments

Comments
 (0)