Skip to content

Commit 64a3a31

Browse files
Set link capabilities
1 parent 35b3fda commit 64a3a31

File tree

2 files changed

+50
-20
lines changed

2 files changed

+50
-20
lines changed

deps/rabbitmq_shovel/src/rabbit_amqp10_shovel.erl

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,32 +74,49 @@ connect_source(State = #{name := Name,
7474
ack_mode := AckMode,
7575
source := #{uris := [Uri | _],
7676
source_address := Addr} = Src}) ->
77-
SndSettleMode = case AckMode of
77+
SettleMode = case AckMode of
7878
no_ack -> settled;
7979
on_publish -> unsettled;
8080
on_confirm -> unsettled
8181
end,
82-
AttachFun = fun amqp10_client:attach_receiver_link/5,
83-
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "receiver", Addr, Src,
84-
AttachFun),
82+
AttachArgs = receiver_attach_args(Name, SettleMode, Addr, Src),
83+
{Conn, Sess, LinkRef} = connect(Uri, AttachArgs),
8584
State#{source => Src#{current => #{conn => Conn,
8685
session => Sess,
8786
link => LinkRef,
8887
uri => Uri}}}.
8988

89+
receiver_attach_args(Name, SettleMode, Addr, Src) ->
90+
LinkName = begin
91+
LinkName0 = gen_unique_name(Name, "receiver"),
92+
rabbit_data_coercion:to_binary(LinkName0)
93+
end,
94+
Durability = maps:get(durability, Src, unsettled_state),
95+
Capabilities = maps:get(receiver_capabilities, Src, undefined),
96+
Receiver0 = #{address => Addr, durable => Durability},
97+
Receiver = case Capabilities of
98+
undefined -> Receiver0;
99+
_ -> maps:put(capabilities, Capabilities, Receiver0)
100+
end,
101+
#{name => LinkName,
102+
role => {receiver, Receiver, self()},
103+
snd_settle_mode => SettleMode,
104+
rcv_settle_mode => first,
105+
filter => #{},
106+
properties => #{}}.
107+
90108
-spec connect_dest(state()) -> state().
91109
connect_dest(State = #{name := Name,
92110
ack_mode := AckMode,
93111
dest := #{uris := [Uri | _],
94112
target_address := Addr} = Dst}) ->
95-
SndSettleMode = case AckMode of
113+
SettleMode = case AckMode of
96114
no_ack -> settled;
97115
on_publish -> settled;
98116
on_confirm -> unsettled
99-
end,
100-
AttachFun = fun amqp10_client:attach_sender_link_sync/5,
101-
{Conn, Sess, LinkRef} = connect(Name, SndSettleMode, Uri, "sender", Addr, Dst,
102-
AttachFun),
117+
end,
118+
AttachArgs = sender_attach_args(Name, SettleMode, Addr, Dst),
119+
{Conn, Sess, LinkRef} = connect(Uri, AttachArgs),
103120
%% wait for link credit here as if there are messages waiting we may try
104121
%% to forward before we've received credit
105122
State#{dest => Dst#{current => #{conn => Conn,
@@ -109,7 +126,24 @@ connect_dest(State = #{name := Name,
109126
link => LinkRef,
110127
uri => Uri}}}.
111128

112-
connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
129+
sender_attach_args(Name, SettleMode, Addr, Src) ->
130+
LinkName = begin
131+
LinkName0 = gen_unique_name(Name, "sender"),
132+
rabbit_data_coercion:to_binary(LinkName0)
133+
end,
134+
Durability = maps:get(durability, Src, unsettled_state),
135+
Capabilities = maps:get(sender_capabilities, Src, undefined),
136+
Sender0 = #{address => Addr, durable => Durability},
137+
Sender = case Capabilities of
138+
undefined -> Sender0;
139+
_ -> maps:put(capabilities, Capabilities, Sender0)
140+
end,
141+
#{name => LinkName,
142+
role => {sender, Sender},
143+
snd_settle_mode => SettleMode,
144+
rcv_settle_mode => first}.
145+
146+
connect(Uri, AttachArgs) ->
113147
{ok, Config0} = amqp10_client:parse_uri(Uri),
114148
%% As done for AMQP 0.9.1, exclude AMQP 1.0 shovel connections from maintenance mode
115149
%% to prevent crashes and errors being logged by the shovel plugin when a node gets drained.
@@ -119,16 +153,8 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
119153
{ok, Conn} = amqp10_client:open_connection(Config),
120154
{ok, Sess} = amqp10_client:begin_session(Conn),
121155
link(Conn),
122-
LinkName = begin
123-
LinkName0 = gen_unique_name(Name, Postfix),
124-
rabbit_data_coercion:to_binary(LinkName0)
125-
end,
126-
% needs to be sync, i.e. awaits the 'attach' event as
127-
% else we may try to use the link before it is ready
128-
Durability = maps:get(durability, Map, unsettled_state),
129-
{ok, LinkRef} = AttachFun(Sess, LinkName, Addr,
130-
SndSettleMode,
131-
Durability),
156+
157+
{ok, LinkRef} = amqp10_client:attach_link(Sess, AttachArgs),
132158
{Conn, Sess, LinkRef}.
133159

134160
-spec init_source(state()) -> state().

deps/rabbitmq_shovel/src/rabbit_shovel_parameters.erl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,11 +314,13 @@ parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
314314
MessageAnns =
315315
rabbit_data_coercion:to_proplist(
316316
pget(<<"dest-message-annotations">>, Def, [])),
317+
SenderCapabilities = pget(<<"dest-capabilities">>, Def),
317318
#{module => rabbit_amqp10_shovel,
318319
uris => Uris,
319320
target_address => Address,
320321
message_annotations => maps:from_list(MessageAnns),
321322
application_properties => maps:from_list(AppProperties ++ SourceHeaders),
323+
sender_capabilities => SenderCapabilities,
322324
properties => maps:from_list(
323325
lists:map(fun({K, V}) ->
324326
{rabbit_data_coercion:to_atom(K), V}
@@ -416,12 +418,14 @@ parse_amqp10_source(Def) ->
416418
Address = pget(<<"src-address">>, Def),
417419
DeleteAfter = pget(<<"src-delete-after">>, Def, <<"never">>),
418420
PrefetchCount = pget(<<"src-prefetch-count">>, Def, 1000),
421+
ReceiverCapabilities = pget(<<"src-capabilities">>, Def),
419422
Headers = [],
420423
{#{module => rabbit_amqp10_shovel,
421424
uris => Uris,
422425
source_address => Address,
423426
delete_after => opt_b2a(DeleteAfter),
424427
prefetch_count => PrefetchCount,
428+
receiver_capabilities => ReceiverCapabilities,
425429
consumer_args => []}, Headers}.
426430

427431
parse_amqp091_source(Def) ->

0 commit comments

Comments
 (0)