Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import java.util.Objects;
import java.util.stream.Collectors;


@Configuration
public class ExtConsumerResetConfiguration implements ApplicationContextAware, SmartInitializingSingleton {
private static final Logger log = LoggerFactory.getLogger(ExtConsumerResetConfiguration.class);
Expand All @@ -61,7 +60,7 @@ public class ExtConsumerResetConfiguration implements ApplicationContextAware, S
private RocketMQMessageConverter rocketMQMessageConverter;

public ExtConsumerResetConfiguration(RocketMQMessageConverter rocketMQMessageConverter,
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
ConfigurableEnvironment environment, RocketMQProperties rocketMQProperties) {
this.rocketMQMessageConverter = rocketMQMessageConverter;
this.environment = environment;
this.rocketMQProperties = rocketMQProperties;
Expand All @@ -75,9 +74,9 @@ public void setApplicationContext(ApplicationContext applicationContext) throws
@Override
public void afterSingletonsInstantiated() {
Map<String, Object> beans = this.applicationContext
.getBeansWithAnnotation(org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.getBeansWithAnnotation(org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.class)
.entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
beans.forEach(this::registerTemplate);
}

Expand All @@ -103,10 +102,12 @@ private void registerTemplate(String beanName, Object bean) {
rocketMQTemplate.setSimpleConsumerBuilder(consumerBuilder);
rocketMQTemplate.setSimpleConsumer(simpleConsumer);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real simpleConsumer to :{} {}", beanName, annotation.value());
String topic = environment.resolvePlaceholders(annotation.topic());
log.info("Set real simpleConsumer to {} using {} topic", beanName, topic);
}

private SimpleConsumerBuilder createConsumer(org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration annotation) {
private SimpleConsumerBuilder createConsumer(
org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration annotation) {
RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer();
String consumerGroupName = resolvePlaceholders(annotation.consumerGroup(), simpleConsumer.getConsumerGroup());
String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic());
Expand Down Expand Up @@ -142,12 +143,12 @@ private String resolvePlaceholders(String text, String defaultValue) {
}

private void validate(org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration annotation,
GenericApplicationContext genericApplicationContext) {
GenericApplicationContext genericApplicationContext) {
if (genericApplicationContext.isBeanNameInUse(annotation.value())) {
throw new BeanDefinitionValidationException(
String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQConsumerConfiguration",
annotation.value()));
String.format("Bean {} has been used in Spring Application Context, " +
"please check the @ExtRocketMQConsumerConfiguration",
annotation.value()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,8 @@ private void registerTemplate(String beanName, Object bean) {
RocketMQClientTemplate rocketMQTemplate = (RocketMQClientTemplate) bean;
rocketMQTemplate.setProducerBuilder(producerBuilder);
rocketMQTemplate.setMessageConverter(rocketMQMessageConverter.getMessageConverter());
log.info("Set real producerBuilder to :{} {}", beanName, annotation.value());
String topic = environment.resolvePlaceholders(annotation.topic());
log.info("Set real producer to {} using topic {}", beanName, topic);
}

private ProducerBuilder createProducer(ExtProducerResetConfiguration annotation) {
Expand Down
Loading