2121import com .rabbitmq .client .ReturnListener ;
2222
2323import java .io .IOException ;
24+ import java .time .OffsetDateTime ;
25+ import java .util .Arrays ;
26+ import java .util .Collection ;
2427import java .util .Collections ;
28+ import java .util .Date ;
29+ import java .util .HashMap ;
2530import java .util .List ;
31+ import java .util .Map ;
2632import java .util .SortedSet ;
2733import java .util .TreeSet ;
2834import java .util .UUID ;
2935import java .util .concurrent .Semaphore ;
3036import java .util .concurrent .TimeUnit ;
3137import java .util .concurrent .atomic .AtomicBoolean ;
32- import java .util .function .UnaryOperator ;
38+ import java .util .function .Function ;
39+ import static java .util .stream .Collectors .toMap ;
3340
3441public class Producer extends ProducerConsumerBase implements Runnable , ReturnListener ,
3542 ConfirmListener
@@ -49,7 +56,7 @@ public class Producer extends ProducerConsumerBase implements Runnable, ReturnLi
4956
5057 private final MessageBodySource messageBodySource ;
5158
52- private final UnaryOperator < AMQP .BasicProperties .Builder > propertiesBuilderProcessor ;
59+ private final Function < AMQP . BasicProperties . Builder , AMQP .BasicProperties .Builder > propertiesBuilderProcessor ;
5360 private Semaphore confirmPool ;
5461 private int confirmTimeout ;
5562 private final SortedSet <Long > unconfirmedSet =
@@ -63,31 +70,135 @@ public Producer(Channel channel, String exchangeName, String id, boolean randomR
6370 float rateLimit , int msgLimit ,
6471 long confirm , int confirmTimeout ,
6572 MessageBodySource messageBodySource ,
66- TimestampProvider tsp , Stats stats , MulticastSet .CompletionHandler completionHandler ) {
73+ TimestampProvider tsp , Stats stats , Map <String , Object > messageProperties ,
74+ MulticastSet .CompletionHandler completionHandler ) {
6775 this .channel = channel ;
6876 this .exchangeName = exchangeName ;
6977 this .id = id ;
7078 this .randomRoutingKey = randomRoutingKey ;
7179 this .mandatory = flags .contains ("mandatory" );
7280 this .persistent = flags .contains ("persistent" );
81+
82+ Function <AMQP .BasicProperties .Builder , AMQP .BasicProperties .Builder > builderProcessor = Function .identity ();
7383 this .txSize = txSize ;
7484 this .rateLimit = rateLimit ;
7585 this .msgLimit = msgLimit ;
7686 this .messageBodySource = messageBodySource ;
7787 if (tsp .isTimestampInHeader ()) {
78- this .propertiesBuilderProcessor = builder -> {
79- builder .headers (Collections .<String , Object >singletonMap (TIMESTAMP_HEADER , tsp .getCurrentTime ()));
80- return builder ;
81- };
82- } else {
83- this .propertiesBuilderProcessor = UnaryOperator .identity ();
88+ builderProcessor = builderProcessor .andThen (builder -> builder .headers (Collections .singletonMap (TIMESTAMP_HEADER , tsp .getCurrentTime ())));
89+ }
90+ if (messageProperties != null && !messageProperties .isEmpty ()) {
91+ builderProcessor = builderProcessorWithMessageProperties (messageProperties , builderProcessor );
8492 }
8593 if (confirm > 0 ) {
8694 this .confirmPool = new Semaphore ((int )confirm );
8795 this .confirmTimeout = confirmTimeout ;
8896 }
8997 this .stats = stats ;
9098 this .completionHandler = completionHandler ;
99+ this .propertiesBuilderProcessor = builderProcessor ;
100+ }
101+
102+ private Function <AMQP .BasicProperties .Builder , AMQP .BasicProperties .Builder > builderProcessorWithMessageProperties (
103+ Map <String , Object > messageProperties ,
104+ Function <AMQP .BasicProperties .Builder , AMQP .BasicProperties .Builder > builderProcessor ) {
105+ if (messageProperties .containsKey ("contentType" )) {
106+ String value = messageProperties .get ("contentType" ).toString ();
107+ builderProcessor = builderProcessor .andThen (builder -> builder .contentType (value ));
108+ }
109+ if (messageProperties .containsKey ("contentEncoding" )) {
110+ String value = messageProperties .get ("contentEncoding" ).toString ();
111+ builderProcessor = builderProcessor .andThen (builder -> builder .contentEncoding (value ));
112+ }
113+ if (messageProperties .containsKey ("deliveryMode" )) {
114+ Integer value = ((Number ) messageProperties .get ("deliveryMode" )).intValue ();
115+ builderProcessor = builderProcessor .andThen (builder -> builder .deliveryMode (value ));
116+ }
117+ if (messageProperties .containsKey ("priority" )) {
118+ Integer value = ((Number ) messageProperties .get ("priority" )).intValue ();
119+ builderProcessor = builderProcessor .andThen (builder -> builder .priority (value ));
120+ }
121+ if (messageProperties .containsKey ("correlationId" )) {
122+ String value = messageProperties .get ("correlationId" ).toString ();
123+ builderProcessor = builderProcessor .andThen (builder -> builder .correlationId (value ));
124+ }
125+ if (messageProperties .containsKey ("replyTo" )) {
126+ String value = messageProperties .get ("replyTo" ).toString ();
127+ builderProcessor = builderProcessor .andThen (builder -> builder .replyTo (value ));
128+ }
129+ if (messageProperties .containsKey ("expiration" )) {
130+ String value = messageProperties .get ("expiration" ).toString ();
131+ builderProcessor = builderProcessor .andThen (builder -> builder .expiration (value ));
132+ }
133+ if (messageProperties .containsKey ("messageId" )) {
134+ String value = messageProperties .get ("messageId" ).toString ();
135+ builderProcessor = builderProcessor .andThen (builder -> builder .messageId (value ));
136+ }
137+ if (messageProperties .containsKey ("timestamp" )) {
138+ String value = messageProperties .get ("timestamp" ).toString ();
139+ Date timestamp = Date .from (OffsetDateTime .parse (value ).toInstant ());
140+ builderProcessor = builderProcessor .andThen (builder -> builder .timestamp (timestamp ));
141+ }
142+ if (messageProperties .containsKey ("type" )) {
143+ String value = messageProperties .get ("type" ).toString ();
144+ builderProcessor = builderProcessor .andThen (builder -> builder .type (value ));
145+ }
146+ if (messageProperties .containsKey ("userId" )) {
147+ String value = messageProperties .get ("userId" ).toString ();
148+ builderProcessor = builderProcessor .andThen (builder -> builder .userId (value ));
149+ }
150+ if (messageProperties .containsKey ("appId" )) {
151+ String value = messageProperties .get ("appId" ).toString ();
152+ builderProcessor = builderProcessor .andThen (builder -> builder .appId (value ));
153+ }
154+ if (messageProperties .containsKey ("clusterId" )) {
155+ String value = messageProperties .get ("clusterId" ).toString ();
156+ builderProcessor = builderProcessor .andThen (builder -> builder .clusterId (value ));
157+ }
158+
159+ final Map <String , Object > headers = messageProperties .entrySet ().stream ()
160+ .filter (entry -> !isPropertyKey (entry .getKey ()))
161+ .collect (toMap (e -> e .getKey (), e -> e .getValue ()));
162+
163+ if (!headers .isEmpty ()) {
164+ builderProcessor = builderProcessor .andThen (builder -> {
165+ // we merge if there are already some headers
166+ AMQP .BasicProperties properties = builder .build ();
167+ Map <String , Object > existingHeaders = properties .getHeaders ();
168+ if (existingHeaders != null && !existingHeaders .isEmpty ()) {
169+ Map <String , Object > newHeaders = new HashMap <>();
170+ newHeaders .putAll (existingHeaders );
171+ newHeaders .putAll (headers );
172+ builder = builder .headers (newHeaders );
173+ } else {
174+ builder = builder .headers (headers );
175+ }
176+ return builder ;
177+ });
178+ }
179+
180+ return builderProcessor ;
181+ }
182+
183+ private static final Collection <String > MESSAGE_PROPERTIES_KEYS = Arrays .asList (
184+ "contentType" ,
185+ "contentEncoding" ,
186+ "headers" ,
187+ "deliveryMode" ,
188+ "priority" ,
189+ "correlationId" ,
190+ "replyTo" ,
191+ "expiration" ,
192+ "messageId" ,
193+ "timestamp" ,
194+ "type" ,
195+ "userId" ,
196+ "appId" ,
197+ "clusterId"
198+ );
199+
200+ private boolean isPropertyKey (String key ) {
201+ return MESSAGE_PROPERTIES_KEYS .contains (key );
91202 }
92203
93204 public void handleReturn (int replyCode ,
0 commit comments