@@ -29,7 +29,9 @@ groups() ->
2929 autodelete_amqp091_dest_on_confirm ,
3030 autodelete_amqp091_dest_on_publish ,
3131 simple_amqp10_dest ,
32- simple_amqp10_src
32+ simple_amqp10_src ,
33+ message_prop_conversion ,
34+ message_prop_conversion_no_props
3335 ]},
3436 {with_map_config , [], [
3537 simple ,
@@ -171,6 +173,168 @@ simple_amqp10_src(Config) ->
171173 ok
172174 end ).
173175
176+ message_prop_conversion (Config ) ->
177+ MapConfig = ? config (map_config , Config ),
178+ Src = ? config (srcq , Config ),
179+ Dest = ? config (destq , Config ),
180+ with_session (Config ,
181+ fun (Sess ) ->
182+ shovel_test_utils :set_param (
183+ Config ,
184+ <<" test" >>, [{<<" src-protocol" >>, <<" amqp10" >>},
185+ {<<" src-address" >>, Src },
186+ {<<" dest-protocol" >>, <<" amqp091" >>},
187+ {<<" dest-queue" >>, Dest },
188+ {<<" add-forward-headers" >>, true },
189+ {<<" dest-add-timestamp-header" >>, true },
190+ {<<" publish-properties" >>,
191+ case MapConfig of
192+ true -> #{<<" cluster_id" >> => <<" x" >>};
193+ _ -> [{<<" cluster_id" >>, <<" x" >>}]
194+ end }
195+ ]),
196+ LinkName = <<" dynamic-sender-" , Dest /binary >>,
197+ Tag = <<" tag1" >>,
198+ Payload = <<" payload" >>,
199+ {ok , Sender } = amqp10_client :attach_sender_link (Sess , LinkName , Src ,
200+ unsettled , unsettled_state ),
201+ ok = await_amqp10_event (link , Sender , attached ),
202+ Headers = #{durable => true , priority => 3 , ttl => 180000 },
203+ Msg = amqp10_msg :set_headers (Headers ,
204+ amqp10_msg :new (Tag , Payload , false )),
205+ Msg2 = amqp10_msg :set_properties (#{
206+ message_id => <<" message-id" >>,
207+ user_id => <<" guest" >>,
208+ to => <<" to" >>,
209+ subject => <<" subject" >>,
210+ reply_to => <<" reply-to" >>,
211+ correlation_id => <<" correlation-id" >>,
212+ content_type => <<" content-type" >>,
213+ content_encoding => <<" content-encoding" >>,
214+ % absolute_expiry_time => 123456789,
215+ creation_time => 123456789 ,
216+ group_id => <<" group-id" >>,
217+ group_sequence => 123 ,
218+ reply_to_group_id => <<" reply-to-group-id" >>
219+ }, Msg ),
220+ Msg3 = amqp10_msg :set_application_properties (#{
221+ <<" x-binary" >> => <<" binary" >>,
222+ <<" x-int" >> => 33 ,
223+ <<" x-negative-int" >> => - 33 ,
224+ <<" x-float" >> => 1.3 ,
225+ <<" x-true" >> => true ,
226+ <<" x-false" >> => false
227+ }, Msg2 ),
228+ ok = amqp10_client :send_msg (Sender , Msg3 ),
229+ receive
230+ {amqp10_disposition , {accepted , Tag }} -> ok
231+ after 3000 ->
232+ exit (publish_disposition_not_received )
233+ end ,
234+ amqp10_client :detach_link (Sender ),
235+ Channel = rabbit_ct_client_helpers :open_channel (Config ),
236+ {# 'basic.get_ok' {}, # amqp_msg {payload = Payload , props = # 'P_basic' {
237+ content_type = ReceivedContentType ,
238+ content_encoding = ReceivedContentEncoding ,
239+ headers = Headers2 ,
240+ delivery_mode = ReceivedDeliveryMode ,
241+ priority = ReceivedPriority ,
242+ correlation_id = ReceivedCorrelationId ,
243+ reply_to = ReceivedReplyTo ,
244+ expiration = ReceivedExpiration ,
245+ message_id = ReceivedMessageId ,
246+ timestamp = ReceivedTimestamp ,
247+ type = _ReceivedType ,
248+ user_id = ReceivedUserId ,
249+ app_id = _ReceivedAppId ,
250+ cluster_id = _ReceivedClusterId
251+ }}} = amqp_channel :call (Channel , # 'basic.get' {queue = Dest , no_ack = true }),
252+
253+ ? assertEqual (<<" payload" >>, Payload ),
254+ ? assertEqual (2 , ReceivedDeliveryMode ),
255+ ? assertEqual ({longstr , <<" binary" >>}, rabbit_misc :table_lookup (Headers2 , <<" x-binary" >>)),
256+ ? assertEqual ({long , 33 }, rabbit_misc :table_lookup (Headers2 , <<" x-int" >>)),
257+ ? assertEqual ({long , - 33 }, rabbit_misc :table_lookup (Headers2 , <<" x-negative-int" >>)),
258+ ? assertEqual ({double , 1.3 }, rabbit_misc :table_lookup (Headers2 , <<" x-float" >>)),
259+ ? assertEqual ({bool , true }, rabbit_misc :table_lookup (Headers2 , <<" x-true" >>)),
260+ ? assertEqual ({bool , false }, rabbit_misc :table_lookup (Headers2 , <<" x-false" >>)),
261+
262+ ? assertEqual (<<" content-type" >>, ReceivedContentType ),
263+ ? assertEqual (<<" content-encoding" >>, ReceivedContentEncoding ),
264+
265+ ? assertEqual (3 , ReceivedPriority ),
266+ ? assertEqual (<<" correlation-id" >>, ReceivedCorrelationId ),
267+ ? assertEqual (<<" reply-to" >>, ReceivedReplyTo ),
268+ ? assertEqual (<<" 180000" >>, ReceivedExpiration ),
269+ ? assertEqual (<<" message-id" >>, ReceivedMessageId ),
270+ ? assertEqual (123456 , ReceivedTimestamp ), % timestamp is divided by 1 000
271+ ? assertEqual (<<" guest" >>, ReceivedUserId ),
272+ ok
273+ end ).
274+
275+ message_prop_conversion_no_props (Config ) ->
276+ MapConfig = ? config (map_config , Config ),
277+ Src = ? config (srcq , Config ),
278+ Dest = ? config (destq , Config ),
279+ with_session (Config ,
280+ fun (Sess ) ->
281+ shovel_test_utils :set_param (
282+ Config ,
283+ <<" test" >>, [{<<" src-protocol" >>, <<" amqp10" >>},
284+ {<<" src-address" >>, Src },
285+ {<<" dest-protocol" >>, <<" amqp091" >>},
286+ {<<" dest-queue" >>, Dest },
287+ {<<" add-forward-headers" >>, true },
288+ {<<" dest-add-timestamp-header" >>, true },
289+ {<<" publish-properties" >>,
290+ case MapConfig of
291+ true -> #{<<" cluster_id" >> => <<" x" >>};
292+ _ -> [{<<" cluster_id" >>, <<" x" >>}]
293+ end }
294+ ]),
295+ LinkName = <<" dynamic-sender-" , Dest /binary >>,
296+ Tag = <<" tag1" >>,
297+ Payload = <<" payload" >>,
298+ {ok , Sender } = amqp10_client :attach_sender_link (Sess , LinkName , Src ,
299+ unsettled , unsettled_state ),
300+ ok = await_amqp10_event (link , Sender , attached ),
301+ Msg = amqp10_msg :new (Tag , Payload , false ),
302+ ok = amqp10_client :send_msg (Sender , Msg ),
303+ receive
304+ {amqp10_disposition , {accepted , Tag }} -> ok
305+ after 3000 ->
306+ exit (publish_disposition_not_received )
307+ end ,
308+ amqp10_client :detach_link (Sender ),
309+ Channel = rabbit_ct_client_helpers :open_channel (Config ),
310+ {# 'basic.get_ok' {}, # amqp_msg {payload = ReceivedPayload , props = # 'P_basic' {
311+ content_type = undefined ,
312+ content_encoding = undefined ,
313+ headers = ReceivedHeaders ,
314+ delivery_mode = ReceivedDeliveryMode ,
315+ priority = ReceivedPriority ,
316+ correlation_id = undefined ,
317+ reply_to = undefined ,
318+ expiration = undefined ,
319+ message_id = undefined ,
320+ timestamp = undefined ,
321+ type = undefined ,
322+ user_id = undefined ,
323+ app_id = undefined ,
324+ cluster_id = ReceivedClusterId
325+ }}} = amqp_channel :call (Channel , # 'basic.get' {queue = Dest , no_ack = true }),
326+
327+ ? assertEqual (<<" payload" >>, ReceivedPayload ),
328+ ? assertEqual (1 , ReceivedDeliveryMode ),
329+ ? assertEqual (<<" x" >>, ReceivedClusterId ),
330+ ? assertEqual (4 , ReceivedPriority ),
331+
332+ ? assertNotEqual (undefined , rabbit_misc :table_lookup (ReceivedHeaders , <<" x-shovelled" >>)),
333+
334+ ok
335+ end ).
336+
337+
174338change_definition (Config ) ->
175339 Src = ? config (srcq , Config ),
176340 Dest = ? config (destq , Config ),
0 commit comments