Skip to content

Commit 2bacf71

Browse files
Merge pull request #14757 from rabbitmq/shovel-revert-fix
Shovels bugfixes
2 parents e9baade + 9908b25 commit 2bacf71

File tree

3 files changed

+14
-7
lines changed

3 files changed

+14
-7
lines changed

deps/amqp10_client/src/amqp10_client_connection.erl

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -352,6 +352,9 @@ close_sent(_EvtType, heartbeat, _Data) ->
352352
close_sent(_EvtType, {'EXIT', _Pid, shutdown}, _Data) ->
353353
%% monitored processes may exit during closure
354354
keep_state_and_data;
355+
close_sent(_EvtType, {'EXIT', _Pid, {shutdown, _}}, _Data) ->
356+
%% monitored processes may exit during closure
357+
keep_state_and_data;
355358
close_sent(_EvtType, {'DOWN', _Ref, process, ReaderPid, _Reason},
356359
#state{reader = ReaderPid}) ->
357360
%% if the reader exits we probably won't receive a close frame

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -306,13 +306,13 @@ ack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
306306
-spec nack(Tag :: tag(), Multi :: boolean(), state()) -> state().
307307
nack(Tag, false, State = #{source := #{current := #{link := LinkRef}} = Src}) ->
308308
% the tag is the same as the deliveryid
309-
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, released),
310-
State#{source => Src#{last_nacked_tag => Tag}};
309+
ok = amqp10_client_session:disposition(LinkRef, Tag, Tag, true, rejected),
310+
State#{source => Src#{last_acked_tag => Tag}};
311311
nack(Tag, true, State = #{source := #{current := #{link := LinkRef},
312-
last_nacked_tag := LastTag} = Src}) ->
312+
last_acked_tag := LastTag} = Src}) ->
313313
First = LastTag + 1,
314-
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, released),
315-
State#{source => Src#{last_nacked_tag => Tag}}.
314+
ok = amqp10_client_session:disposition(LinkRef, First, Tag, true, rejected),
315+
State#{source => Src#{last_acked_tag => Tag}}.
316316

317317
status(#{dest := #{current := #{link_state := attached}}}) ->
318318
flow;

deps/rabbitmq_shovel/test/shovel_dynamic_SUITE.erl

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,12 @@ tests() ->
7171
autodelete_quorum_on_confirm_no_transfer,
7272
autodelete_classic_on_publish_no_transfer,
7373
autodelete_quorum_on_publish_no_transfer,
74-
autodelete_classic_on_confirm_with_rejections,
75-
autodelete_quorum_on_confirm_with_rejections,
74+
%% AMQP091 and local shovels requeue messages on reject
75+
%% AMQP10 discards messages on reject
76+
%% These two tests will remain commented out until the
77+
%% behaviour is unified.
78+
%% autodelete_classic_on_confirm_with_rejections,
79+
%% autodelete_quorum_on_confirm_with_rejections,
7680
autodelete_classic_on_publish_with_rejections,
7781
autodelete_quorum_on_publish_with_rejections
7882
].

0 commit comments

Comments
 (0)