Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import java.util.function.BiFunction;
import java.util.stream.Collectors;

public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean, SmartLifecycle {
public class RocketMQMessageListenerBeanPostProcessor implements ApplicationContextAware, BeanPostProcessor, InitializingBean {

private ApplicationContext applicationContext;

Expand Down Expand Up @@ -61,41 +61,13 @@ public Object postProcessAfterInitialization(Object bean, String beanName) throw
return bean;
}

@Override
public int getPhase() {
return Integer.MAX_VALUE - 2000;
}

@Override
public void start() {
if (!isRunning()) {
this.setRunning(true);
listenerContainerConfiguration.startContainer();
}
}

@Override
public void stop() {

}

public void setRunning(boolean running) {
this.running = running;
}


@Override
public boolean isRunning() {
return running;
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

@Override
public void afterPropertiesSet() throws Exception {
public void afterPropertiesSet() {
buildEnhancer();
this.listenerContainerConfiguration = this.applicationContext.getBean(ListenerContainerConfiguration.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,17 +78,9 @@ public void registerContainer(String beanName, Object bean, RocketMQMessageListe
log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
}

public void startContainer() {
for (DefaultListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}

public List<DefaultListenerContainer> getContainers() {
return containers;
}

private DefaultListenerContainer createRocketMQListenerContainer(String name, Object bean, RocketMQMessageListener annotation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.AnyNestedCondition;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.autoconfigure.condition.*;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
Expand Down Expand Up @@ -138,6 +136,13 @@ public RocketMQClientTemplate rocketMQClientTemplate(RocketMQMessageConverter ro
return rocketMQClientTemplate;
}


@Bean
@ConditionalOnClass(value = SpringApplication.class)
public RocketMQSpringInitialization rocketMQSpringInitialization() {
return new RocketMQSpringInitialization();
}

/**
*
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package org.apache.rocketmq.client.autoconfigure;

import org.apache.rocketmq.client.support.AssertSkipInitialization;
import org.apache.rocketmq.client.support.DefaultListenerContainer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.Ordered;

import javax.annotation.Resource;
import java.util.List;

public class RocketMQSpringInitialization implements ApplicationRunner, ApplicationContextAware, Ordered {

private final static Logger log = LoggerFactory.getLogger(RocketMQSpringInitialization.class);

private ConfigurableApplicationContext applicationContext;

@Resource
private ListenerContainerConfiguration listenerContainerConfiguration;

@Override
public int getOrder() {
return LOWEST_PRECEDENCE - 20;
}

@Override
public void run(ApplicationArguments args) {
// spring cloud init context skip
if (AssertSkipInitialization.shouldSkipInitialization(applicationContext.getEnvironment().getPropertySources())) {
return;
}

List<DefaultListenerContainer> containers = listenerContainerConfiguration.getContainers();

for (DefaultListenerContainer container : containers) {
if (!container.isRunning()) {
try {
container.start();
} catch (Exception e) {
log.error("Started container failed. {}", container, e);
throw new RuntimeException(e);
}
}
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) {
this.applicationContext = (ConfigurableApplicationContext) applicationContext;
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.apache.rocketmq.client.support;

import org.springframework.core.env.MutablePropertySources;

import java.util.Objects;

public class AssertSkipInitialization {

private static final String BOOTSTRAP_PROPERTY_SOURCE = "bootstrap";

public static Boolean shouldSkipInitialization(MutablePropertySources mutablePropertySources) {

if (Objects.isNull(mutablePropertySources)) {
return Boolean.FALSE;
}
return mutablePropertySources.contains(BOOTSTRAP_PROPERTY_SOURCE);
}
}
Loading