Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Retry section under RabbitProperties not being configured #73

@mhewedy

Description

@mhewedy

I've noticed that the Retry section in the RabbitProperties are not being configured (In fact, the whole Listener section), as it is configured in AbstractRabbitListenerContainerFactoryConfigurer but is not considered in the multi rabbit when creating SimpleRabbitListenerContainerFactory here:

final SimpleRabbitListenerContainerFactory containerFactory = newContainerFactory(connectionFactory);

So the question is, provided that ... how can I configure the retry for different ContainerFactory's of multi rabbit?

I've fixed it with this full-of-hacks solution ... your advice appreciated

  • I've applied a hack to configure the Listener part in RabbitProperties for each SimpleRabbitListenerContainerFactory
  • Then, I've applied a hack for the MessageRecoverer (hence we need the key to hold the binding before sending the message to the dl exchange)
@Configuration
@ConditionalOnClass(RabbitTemplate.class)
public class RabbitConfig {

    @Bean
    public MessageConverter jsonMessageConverter(ObjectMapper objectMapper) {
        return new Jackson2JsonMessageConverter(objectMapper);
    }

    @Component
    @RequiredArgsConstructor
    private static class MultiRabbitCustomizer implements InitializingBean, ApplicationContextAware {

        private ApplicationContext applicationContext;

        private final RabbitTemplate rabbitTemplate;
        private final MessageConverter messageConverter;
        private final MultiRabbitProperties multiRabbitProperties;
        private final SimpleRoutingConnectionFactory multiRabbitConnectionFactory;

        @Override
        public void afterPropertiesSet() {
            multiRabbitProperties.getConnections().forEach((key, props) -> {

                var bean = applicationContext.getBean(key, SimpleRabbitListenerContainerFactory.class);
                ConnectionFactory targetConnectionFactory = multiRabbitConnectionFactory.getTargetConnectionFactory(key);

                var multiRabbitConfigurer = new MultiRabbitConfigurer(props.getListener().getSimple(), targetConnectionFactory);
                var messageRecoverer = new MultiRabbitRepublishMessageRecoverer(rabbitTemplate, "general_dl_exchange", key);
                multiRabbitConfigurer.setMessageRecoverer(messageRecoverer);
                multiRabbitConfigurer.setMessageConverter(messageConverter);
                multiRabbitConfigurer.configure(bean, multiRabbitConnectionFactory);
            });
        }

        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.applicationContext = applicationContext;
        }
    }

    @RequiredArgsConstructor
    private static class MultiRabbitConfigurer
            extends AbstractRabbitListenerContainerFactoryConfigurer<SimpleRabbitListenerContainerFactory> {

        private final RabbitProperties.SimpleContainer config;
        private final ConnectionFactory connectionFactory;

        @Override
        public void configure(SimpleRabbitListenerContainerFactory factory, ConnectionFactory ignored) {
            PropertyMapper map = PropertyMapper.get();
            configure(factory, connectionFactory, config);
            map.from(config::getConcurrency).whenNonNull().to(factory::setConcurrentConsumers);
            map.from(config::getMaxConcurrency).whenNonNull().to(factory::setMaxConcurrentConsumers);
            map.from(config::getBatchSize).whenNonNull().to(factory::setBatchSize);
            map.from(config::isConsumerBatchEnabled).to(factory::setConsumerBatchEnabled);
        }

        @Override
        public void setMessageConverter(MessageConverter messageConverter) {
            super.setMessageConverter(messageConverter);
        }

        @Override
        public void setMessageRecoverer(MessageRecoverer messageRecoverer) {
            super.setMessageRecoverer(messageRecoverer);
        }
    }

    private static class MultiRabbitRepublishMessageRecoverer extends RepublishMessageRecoverer {
        private final String key;

        public MultiRabbitRepublishMessageRecoverer(RabbitTemplate rabbitTemplate, String errorExchange, String key) {
            super(rabbitTemplate, errorExchange);
            this.key = key;
        }

        @Override
        protected void doSend(@Nullable String exchange, String routingKey, Message message) {
            SimpleResourceHolder.bind(((RabbitTemplate) errorTemplate).getConnectionFactory(), key);
            // routing key is: "error.<original queue name>"
            super.doSend(exchange, routingKey + message.getMessageProperties().getConsumerQueue(), message);
            SimpleResourceHolder.unbind(((RabbitTemplate) errorTemplate).getConnectionFactory());
        }
    }

}

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions