4343import org .springframework .integration .context .OrderlyShutdownCapable ;
4444import org .springframework .integration .endpoint .MessageProducerSupport ;
4545import org .springframework .integration .support .ErrorMessageUtils ;
46+ import org .springframework .messaging .MessageChannel ;
4647import org .springframework .retry .RecoveryCallback ;
4748import org .springframework .retry .RetryOperations ;
4849import org .springframework .retry .support .RetrySynchronizationManager ;
@@ -149,8 +150,8 @@ public void setRecoveryCallback(RecoveryCallback<?> recoveryCallback) {
149150
150151 /**
151152 * Set a batching strategy to use when de-batching messages created by a batching
152- * producer (such as the BatchingRabbitTemplate). Default is
153- * {@link SimpleBatchingStrategy}.
153+ * producer (such as the BatchingRabbitTemplate).
154+ * Default is {@link SimpleBatchingStrategy}.
154155 * @param batchingStrategy the strategy.
155156 * @since 5.2
156157 */
@@ -267,8 +268,8 @@ protected class Listener implements ChannelAwareMessageListener {
267268
268269 protected final MessageConverter converter = AmqpInboundChannelAdapter .this .messageConverter ; // NOSONAR
269270
270- protected final boolean manualAcks = AcknowledgeMode . MANUAL ==
271- AmqpInboundChannelAdapter .this .messageListenerContainer .getAcknowledgeMode (); // NNOSONAR
271+ protected final boolean manualAcks = // NNOSONAR
272+ AcknowledgeMode . MANUAL == AmqpInboundChannelAdapter .this .messageListenerContainer .getAcknowledgeMode ();
272273
273274 protected final RetryOperations retryOps = AmqpInboundChannelAdapter .this .retryTemplate ; // NOSONAR
274275
@@ -284,7 +285,8 @@ public void onMessage(final Message message, final Channel channel) {
284285 createAndSend (message , channel );
285286 }
286287 else {
287- final org .springframework .messaging .Message <Object > toSend = createMessage (message , channel );
288+ final org .springframework .messaging .Message <Object > toSend =
289+ createMessageFromAmqp (message , channel );
288290 this .retryOps .execute (
289291 context -> {
290292 StaticMessageHeaderAccessor .getDeliveryAttempt (toSend ).incrementAndGet ();
@@ -295,11 +297,13 @@ public void onMessage(final Message message, final Channel channel) {
295297 }
296298 }
297299 catch (MessageConversionException e ) {
298- if (getErrorChannel () != null ) {
300+ MessageChannel errorChannel = getErrorChannel ();
301+ if (errorChannel != null ) {
299302 setAttributesIfNecessary (message , null );
300303 getMessagingTemplate ()
301- .send (getErrorChannel (), buildErrorMessage (null ,
302- EndpointUtils .errorMessagePayload (message , channel , this .manualAcks , e )));
304+ .send (errorChannel ,
305+ buildErrorMessage (null ,
306+ EndpointUtils .errorMessagePayload (message , channel , this .manualAcks , e )));
303307 }
304308 else {
305309 throw e ;
@@ -313,28 +317,30 @@ public void onMessage(final Message message, final Channel channel) {
313317 }
314318
315319 private void createAndSend (Message message , Channel channel ) {
316- org .springframework .messaging .Message <Object > messagingMessage = createMessage (message , channel );
320+ org .springframework .messaging .Message <Object > messagingMessage = createMessageFromAmqp (message , channel );
317321 setAttributesIfNecessary (message , messagingMessage );
318322 sendMessage (messagingMessage );
319323 }
320324
321- protected org .springframework .messaging .Message <Object > createMessage (Message message , Channel channel ) {
325+ protected org .springframework .messaging .Message <Object > createMessageFromAmqp (Message message ,
326+ Channel channel ) {
327+
322328 Object payload = convertPayload (message );
323- Map <String , Object > headers = AmqpInboundChannelAdapter . this . headerMapper
324- .toHeadersFromRequest (message .getMessageProperties ());
329+ Map <String , Object > headers =
330+ AmqpInboundChannelAdapter . this . headerMapper .toHeadersFromRequest (message .getMessageProperties ());
325331 if (AmqpInboundChannelAdapter .this .bindSourceMessage ) {
326332 headers .put (IntegrationMessageHeaderAccessor .SOURCE_DATA , message );
327333 }
328334 long deliveryTag = message .getMessageProperties ().getDeliveryTag ();
329- return finalize ( channel , payload , headers , deliveryTag );
335+ return createMessageFromPayload ( payload , channel , headers , deliveryTag );
330336 }
331337
332338 protected Object convertPayload (Message message ) {
333339 Object payload ;
334340 if (AmqpInboundChannelAdapter .this .batchingStrategy .canDebatch (message .getMessageProperties ())) {
335341 List <Object > payloads = new ArrayList <>();
336- AmqpInboundChannelAdapter .this .batchingStrategy .deBatch (message , fragment -> payloads
337- .add (this .converter .fromMessage (fragment )));
342+ AmqpInboundChannelAdapter .this .batchingStrategy .deBatch (message ,
343+ fragment -> payloads .add (this .converter .fromMessage (fragment )));
338344 payload = payloads ;
339345 }
340346 else {
@@ -343,8 +349,8 @@ protected Object convertPayload(Message message) {
343349 return payload ;
344350 }
345351
346- protected org .springframework .messaging .Message <Object > finalize ( Channel channel , Object payload ,
347- Map <String , Object > headers , long deliveryTag ) {
352+ protected org .springframework .messaging .Message <Object > createMessageFromPayload ( Object payload ,
353+ Channel channel , Map <String , Object > headers , long deliveryTag ) {
348354
349355 if (this .manualAcks ) {
350356 headers .put (AmqpHeaders .DELIVERY_TAG , deliveryTag );
@@ -375,8 +381,9 @@ public void onMessageBatch(List<Message> messages, Channel channel) {
375381 converted = convertPayloads (messages , channel );
376382 }
377383 if (converted != null ) {
378- org .springframework .messaging .Message <?> message = finalize (channel , converted , new HashMap <>(),
379- messages .get (messages .size () - 1 ).getMessageProperties ().getDeliveryTag ());
384+ org .springframework .messaging .Message <?> message =
385+ createMessageFromPayload (converted , channel , new HashMap <>(),
386+ messages .get (messages .size () - 1 ).getMessageProperties ().getDeliveryTag ());
380387 try {
381388 if (this .retryOps == null ) {
382389 setAttributesIfNecessary (messages , message );
@@ -412,16 +419,15 @@ private List<org.springframework.messaging.Message<?>> convertMessages(List<Mess
412419
413420 List <org .springframework .messaging .Message <?>> converted = new ArrayList <>();
414421 try {
415- messages .forEach (message -> {
416- converted .add (createMessage (message , channel ));
417- });
422+ messages .forEach (message -> converted .add (createMessageFromAmqp (message , channel )));
418423 return converted ;
419424 }
420425 catch (MessageConversionException e ) {
421- if (getErrorChannel () != null ) {
426+ MessageChannel errorChannel = getErrorChannel ();
427+ if (errorChannel != null ) {
422428 setAttributesIfNecessary (messages , null );
423429 getMessagingTemplate ()
424- .send (getErrorChannel () , buildErrorMessage (null ,
430+ .send (errorChannel , buildErrorMessage (null ,
425431 EndpointUtils .errorMessagePayload (messages , channel , this .manualAcks , e )));
426432 }
427433 else {
@@ -434,16 +440,15 @@ private List<org.springframework.messaging.Message<?>> convertMessages(List<Mess
434440 private List <?> convertPayloads (List <Message > messages , Channel channel ) {
435441 List <Object > converted = new ArrayList <>();
436442 try {
437- messages .forEach (message -> {
438- converted .add (this .converter .fromMessage (message ));
439- });
443+ messages .forEach (message -> converted .add (this .converter .fromMessage (message )));
440444 return converted ;
441445 }
442446 catch (MessageConversionException e ) {
443- if (getErrorChannel () != null ) {
447+ MessageChannel errorChannel = getErrorChannel ();
448+ if (errorChannel != null ) {
444449 setAttributesIfNecessary (messages , null );
445450 getMessagingTemplate ()
446- .send (getErrorChannel () , buildErrorMessage (null ,
451+ .send (errorChannel , buildErrorMessage (null ,
447452 EndpointUtils .errorMessagePayload (messages , channel , this .manualAcks , e )));
448453 }
449454 else {
0 commit comments