1313import org .lognet .springboot .grpc .autoconfigure .GRpcServerProperties ;
1414import org .lognet .springboot .grpc .context .GRpcServerInitializedEvent ;
1515import org .springframework .beans .factory .BeanCreationException ;
16- import org .springframework .beans .factory .DisposableBean ;
1716import org .springframework .beans .factory .annotation .Autowired ;
1817import org .springframework .beans .factory .config .BeanDefinition ;
1918import org .springframework .beans .factory .support .RootBeanDefinition ;
20- import org .springframework .boot . CommandLineRunner ;
19+ import org .springframework .context . SmartLifecycle ;
2120import org .springframework .context .support .AbstractApplicationContext ;
2221import org .springframework .core .annotation .AnnotationAwareOrderComparator ;
2322import org .springframework .core .type .AnnotatedTypeMetadata ;
3130import java .util .Optional ;
3231import java .util .concurrent .CountDownLatch ;
3332import java .util .concurrent .TimeUnit ;
33+ import java .util .concurrent .atomic .AtomicBoolean ;
3434import java .util .function .Consumer ;
3535import java .util .stream .Collectors ;
3636import java .util .stream .Stream ;
3939 * Hosts embedded gRPC server.
4040 */
4141@ Slf4j
42- public class GRpcServerRunner implements CommandLineRunner , DisposableBean {
42+ public class GRpcServerRunner implements SmartLifecycle {
4343
44+ private AtomicBoolean isRunning = new AtomicBoolean (false );
4445 @ Autowired
4546 private HealthStatusManager healthStatusManager ;
4647
@@ -56,52 +57,59 @@ public class GRpcServerRunner implements CommandLineRunner, DisposableBean {
5657
5758 private final ServerBuilder <?> serverBuilder ;
5859
59- private final CountDownLatch latch ;
60+ private CountDownLatch latch ;
6061
6162 public GRpcServerRunner (Consumer <ServerBuilder <?>> configurator , ServerBuilder <?> serverBuilder ) {
6263 this .configurator = configurator ;
6364 this .serverBuilder = serverBuilder ;
64- this . latch = new CountDownLatch ( 1 );
65+
6566 }
6667
6768 @ Override
68- public void run (String ... args ) throws Exception {
69+ public void start () {
70+ if (isRunning ()){
71+ return ;
72+ }
6973 log .info ("Starting gRPC Server ..." );
74+ latch = new CountDownLatch (1 );
75+ try {
76+ Collection <ServerInterceptor > globalInterceptors = getBeanNamesByTypeWithAnnotation (GRpcGlobalInterceptor .class , ServerInterceptor .class )
77+ .map (name -> applicationContext .getBeanFactory ().getBean (name , ServerInterceptor .class ))
78+ .collect (Collectors .toList ());
79+
80+ // Adding health service
81+ serverBuilder .addService (healthStatusManager .getHealthService ());
82+
83+ // find and register all GRpcService-enabled beans
84+ getBeanNamesByTypeWithAnnotation (GRpcService .class , BindableService .class )
85+ .forEach (name -> {
86+ BindableService srv = applicationContext .getBeanFactory ().getBean (name , BindableService .class );
87+ ServerServiceDefinition serviceDefinition = srv .bindService ();
88+ GRpcService gRpcServiceAnn = applicationContext .findAnnotationOnBean (name , GRpcService .class );
89+ serviceDefinition = bindInterceptors (serviceDefinition , gRpcServiceAnn , globalInterceptors );
90+ serverBuilder .addService (serviceDefinition );
91+ String serviceName = serviceDefinition .getServiceDescriptor ().getName ();
92+ healthStatusManager .setStatus (serviceName , HealthCheckResponse .ServingStatus .SERVING );
93+
94+ log .info ("'{}' service has been registered." , srv .getClass ().getName ());
95+
96+ });
97+
98+ if (gRpcServerProperties .isEnableReflection ()) {
99+ serverBuilder .addService (ProtoReflectionService .newInstance ());
100+ log .info ("'{}' service has been registered." , ProtoReflectionService .class .getName ());
101+ }
70102
71- Collection <ServerInterceptor > globalInterceptors = getBeanNamesByTypeWithAnnotation (GRpcGlobalInterceptor .class , ServerInterceptor .class )
72- .map (name -> applicationContext .getBeanFactory ().getBean (name , ServerInterceptor .class ))
73- .collect (Collectors .toList ());
74-
75- // Adding health service
76- serverBuilder .addService (healthStatusManager .getHealthService ());
77-
78- // find and register all GRpcService-enabled beans
79- getBeanNamesByTypeWithAnnotation (GRpcService .class , BindableService .class )
80- .forEach (name -> {
81- BindableService srv = applicationContext .getBeanFactory ().getBean (name , BindableService .class );
82- ServerServiceDefinition serviceDefinition = srv .bindService ();
83- GRpcService gRpcServiceAnn = applicationContext .findAnnotationOnBean (name , GRpcService .class );
84- serviceDefinition = bindInterceptors (serviceDefinition , gRpcServiceAnn , globalInterceptors );
85- serverBuilder .addService (serviceDefinition );
86- String serviceName = serviceDefinition .getServiceDescriptor ().getName ();
87- healthStatusManager .setStatus (serviceName , HealthCheckResponse .ServingStatus .SERVING );
88-
89- log .info ("'{}' service has been registered." , srv .getClass ().getName ());
90-
91- });
103+ configurator .accept (serverBuilder );
104+ server = serverBuilder .build ().start ();
105+ applicationContext .publishEvent (new GRpcServerInitializedEvent (applicationContext , server ));
92106
93- if (gRpcServerProperties .isEnableReflection ()) {
94- serverBuilder .addService (ProtoReflectionService .newInstance ());
95- log .info ("'{}' service has been registered." , ProtoReflectionService .class .getName ());
107+ log .info ("gRPC Server started, listening on port {}." , server .getPort ());
108+ startDaemonAwaitThread ();
109+ }catch (Exception e ){
110+ throw new RuntimeException ("Failed to start GRPC server" ,e );
96111 }
97112
98- configurator .accept (serverBuilder );
99- server = serverBuilder .build ().start ();
100- applicationContext .publishEvent (new GRpcServerInitializedEvent (applicationContext , server ));
101-
102- log .info ("gRPC Server started, listening on port {}." , server .getPort ());
103- startDaemonAwaitThread ();
104-
105113 }
106114
107115 private ServerServiceDefinition bindInterceptors (ServerServiceDefinition serviceDefinition , GRpcService gRpcService , Collection <ServerInterceptor > globalInterceptors ) {
@@ -149,9 +157,12 @@ private Comparator<Object> serverInterceptorOrderComparator() {
149157 private void startDaemonAwaitThread () {
150158 Thread awaitThread = new Thread (() -> {
151159 try {
160+ isRunning .set (true );
152161 latch .await ();
153162 } catch (InterruptedException e ) {
154163 log .error ("gRPC server awaiter interrupted." , e );
164+ }finally {
165+ isRunning .set (false );
155166 }
156167 });
157168 awaitThread .setName ("grpc-server-awaiter" );
@@ -160,8 +171,7 @@ private void startDaemonAwaitThread() {
160171 }
161172
162173 @ Override
163- public void destroy () throws Exception {
164-
174+ public void stop () {
165175 Optional .ofNullable (server ).ifPresent (s -> {
166176 log .info ("Shutting down gRPC server ..." );
167177 s .getServices ().forEach (def -> healthStatusManager .clearStatus (def .getServiceDescriptor ().getName ()));
@@ -203,4 +213,13 @@ private <T> Stream<String> getBeanNamesByTypeWithAnnotation(Class<? extends Anno
203213 }
204214
205215
216+ @ Override
217+ public int getPhase () {
218+ return gRpcServerProperties .getStartUpPhase ();
219+ }
220+
221+ @ Override
222+ public boolean isRunning () {
223+ return isRunning .get ();
224+ }
206225}
0 commit comments