Skip to content

Commit 50b8227

Browse files
committed
feat(api): throw an exception when the api service fails to start.
1 parent 5bbff44 commit 50b8227

File tree

21 files changed

+839
-914
lines changed

21 files changed

+839
-914
lines changed

framework/src/main/java/org/tron/common/application/AbstractService.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package org.tron.common.application;
22

33
import com.google.common.base.Objects;
4+
import java.util.concurrent.CompletableFuture;
45
import lombok.Getter;
56
import lombok.extern.slf4j.Slf4j;
67
import org.tron.core.config.args.Args;
@@ -12,30 +13,36 @@ public abstract class AbstractService implements Service {
1213
protected int port;
1314
@Getter
1415
protected boolean enable;
16+
@Getter
1517
protected final String name = this.getClass().getSimpleName();
1618

1719

1820
@Override
19-
public void start() {
21+
public CompletableFuture<?> start() {
22+
logger.info("{} starting on {}", name, port);
23+
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
2024
try {
2125
innerStart();
26+
resultFuture.complete(null);
2227
logger.info("{} started, listening on {}", name, port);
2328
} catch (Exception e) {
24-
logger.error("{}", name, e);
29+
resultFuture.completeExceptionally(e);
2530
}
31+
return resultFuture;
2632
}
2733

2834
@Override
29-
public void stop() {
35+
public CompletableFuture<?> stop() {
3036
logger.info("{} shutdown...", name);
37+
final CompletableFuture<?> resultFuture = new CompletableFuture<>();
3138
try {
3239
innerStop();
40+
resultFuture.complete(null);
41+
logger.info("{} shutdown complete", name);
3342
} catch (Exception e) {
34-
logger.warn("{}", name, e);
43+
resultFuture.completeExceptionally(e);
3544
}
36-
logger.info("{} shutdown complete", name);
37-
38-
45+
return resultFuture;
3946
}
4047

4148
@Override

framework/src/main/java/org/tron/common/application/HttpService.java

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,25 +15,61 @@
1515

1616
package org.tron.common.application;
1717

18+
import java.util.concurrent.CompletableFuture;
1819
import lombok.extern.slf4j.Slf4j;
20+
import org.eclipse.jetty.server.ConnectionLimit;
1921
import org.eclipse.jetty.server.Server;
22+
import org.eclipse.jetty.servlet.ServletContextHandler;
23+
import org.tron.core.config.args.Args;
2024

2125
@Slf4j(topic = "rpc")
2226
public abstract class HttpService extends AbstractService {
2327

2428
protected Server apiServer;
2529

30+
protected String contextPath;
31+
2632
@Override
2733
public void innerStart() throws Exception {
28-
if (apiServer != null) {
29-
apiServer.start();
34+
if (this.apiServer != null) {
35+
this.apiServer.start();
3036
}
3137
}
3238

3339
@Override
3440
public void innerStop() throws Exception {
35-
if (apiServer != null) {
36-
apiServer.stop();
41+
if (this.apiServer != null) {
42+
this.apiServer.stop();
43+
}
44+
}
45+
46+
@Override
47+
public CompletableFuture<?> start() {
48+
initServer();
49+
ServletContextHandler context = initContextHandler();
50+
addServlet(context);
51+
addFilter(context);
52+
return super.start();
53+
}
54+
55+
protected void initServer() {
56+
this.apiServer = new Server(this.port);
57+
int maxHttpConnectNumber = Args.getInstance().getMaxHttpConnectNumber();
58+
if (maxHttpConnectNumber > 0) {
59+
this.apiServer.addBean(new ConnectionLimit(maxHttpConnectNumber, this.apiServer));
3760
}
3861
}
62+
63+
protected ServletContextHandler initContextHandler() {
64+
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.SESSIONS);
65+
context.setContextPath(this.contextPath);
66+
this.apiServer.setHandler(context);
67+
return context;
68+
}
69+
70+
protected abstract void addServlet(ServletContextHandler context);
71+
72+
protected void addFilter(ServletContextHandler context) {
73+
74+
}
3975
}

framework/src/main/java/org/tron/common/application/RpcService.java

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,26 +16,96 @@
1616
package org.tron.common.application;
1717

1818
import io.grpc.Server;
19+
import io.grpc.netty.NettyServerBuilder;
20+
import io.grpc.protobuf.services.ProtoReflectionService;
21+
import java.util.concurrent.CompletableFuture;
1922
import java.util.concurrent.TimeUnit;
2023
import lombok.extern.slf4j.Slf4j;
24+
import org.springframework.beans.factory.annotation.Autowired;
25+
import org.tron.common.es.ExecutorServiceManager;
26+
import org.tron.common.parameter.CommonParameter;
27+
import org.tron.core.config.args.Args;
28+
import org.tron.core.services.filter.LiteFnQueryGrpcInterceptor;
29+
import org.tron.core.services.ratelimiter.RateLimiterInterceptor;
30+
import org.tron.core.services.ratelimiter.RpcApiAccessInterceptor;
2131

2232
@Slf4j(topic = "rpc")
2333
public abstract class RpcService extends AbstractService {
2434

25-
protected Server apiServer;
35+
private Server apiServer;
36+
protected String executorName;
37+
38+
@Autowired
39+
private RateLimiterInterceptor rateLimiterInterceptor;
40+
41+
@Autowired
42+
private LiteFnQueryGrpcInterceptor liteFnQueryGrpcInterceptor;
43+
44+
@Autowired
45+
private RpcApiAccessInterceptor apiAccessInterceptor;
2646

2747
@Override
2848
public void innerStart() throws Exception {
29-
if (apiServer != null) {
30-
apiServer.start();
49+
if (this.apiServer != null) {
50+
this.apiServer.start();
3151
}
3252
}
3353

3454
@Override
3555
public void innerStop() throws Exception {
36-
if (apiServer != null) {
37-
apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
56+
if (this.apiServer != null) {
57+
this.apiServer.shutdown().awaitTermination(5, TimeUnit.SECONDS);
58+
}
59+
}
60+
61+
@Override
62+
public CompletableFuture<?> start() {
63+
NettyServerBuilder serverBuilder = initServerBuilder();
64+
addService(serverBuilder);
65+
addInterceptor(serverBuilder);
66+
initServer(serverBuilder);
67+
this.rateLimiterInterceptor.init(this.apiServer);
68+
return super.start();
69+
}
70+
71+
protected NettyServerBuilder initServerBuilder() {
72+
NettyServerBuilder serverBuilder = NettyServerBuilder.forPort(this.port);
73+
CommonParameter parameter = Args.getInstance();
74+
if (parameter.getRpcThreadNum() > 0) {
75+
serverBuilder = serverBuilder
76+
.executor(ExecutorServiceManager.newFixedThreadPool(
77+
this.executorName, parameter.getRpcThreadNum()));
3878
}
79+
// Set configs from config.conf or default value
80+
serverBuilder
81+
.maxConcurrentCallsPerConnection(parameter.getMaxConcurrentCallsPerConnection())
82+
.flowControlWindow(parameter.getFlowControlWindow())
83+
.maxConnectionIdle(parameter.getMaxConnectionIdleInMillis(), TimeUnit.MILLISECONDS)
84+
.maxConnectionAge(parameter.getMaxConnectionAgeInMillis(), TimeUnit.MILLISECONDS)
85+
.maxInboundMessageSize(parameter.getMaxMessageSize())
86+
.maxHeaderListSize(parameter.getMaxHeaderListSize());
87+
88+
if (parameter.isRpcReflectionServiceEnable()) {
89+
serverBuilder.addService(ProtoReflectionService.newInstance());
90+
}
91+
return serverBuilder;
92+
}
93+
94+
protected abstract void addService(NettyServerBuilder serverBuilder);
95+
96+
protected void addInterceptor(NettyServerBuilder serverBuilder) {
97+
// add a ratelimiter interceptor
98+
serverBuilder.intercept(this.rateLimiterInterceptor);
99+
100+
// add api access interceptor
101+
serverBuilder.intercept(this.apiAccessInterceptor);
102+
103+
// add lite fullnode query interceptor
104+
serverBuilder.intercept(this.liteFnQueryGrpcInterceptor);
105+
}
106+
107+
protected void initServer(NettyServerBuilder serverBuilder) {
108+
this.apiServer = serverBuilder.build();
39109
}
40110

41111
}

framework/src/main/java/org/tron/common/application/Service.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,25 @@
1515

1616
package org.tron.common.application;
1717

18+
import java.util.concurrent.CompletableFuture;
19+
1820
public interface Service {
1921

20-
void start();
22+
/**
23+
* Starts the service and all needed backend systems.
24+
*
25+
* @return completion state
26+
*/
27+
CompletableFuture<?> start();
2128

22-
void stop();
29+
/**
30+
* Stops the service and performs needed cleanup.
31+
*
32+
* @return completion state
33+
*/
34+
CompletableFuture<?> stop();
2335

2436
boolean isEnable();
37+
38+
String getName();
2539
}

framework/src/main/java/org/tron/common/application/ServiceContainer.java

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
package org.tron.common.application;
1717

1818
import java.util.List;
19+
import java.util.concurrent.CompletableFuture;
20+
import java.util.concurrent.ExecutionException;
21+
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.TimeoutException;
1923
import java.util.stream.Collectors;
2024
import javax.annotation.PostConstruct;
2125
import lombok.extern.slf4j.Slf4j;
@@ -43,13 +47,45 @@ private void initEnabledServices() {
4347

4448
void start() {
4549
logger.info("Starting api services.");
46-
this.enabledServices.forEach(Service::start);
50+
this.enabledServices.forEach(this::waitForServiceToStart);
4751
logger.info("All api services started.");
4852
}
4953

5054
void stop() {
5155
logger.info("Stopping api services.");
52-
this.enabledServices.forEach(Service::stop);
56+
this.enabledServices.forEach(this::waitForServiceToStop);
5357
logger.info("All api services stopped.");
5458
}
59+
60+
private void waitForServiceToStart(Service service) {
61+
final String serviceName = service.getName();
62+
final CompletableFuture<?> startFuture = service.start();
63+
do {
64+
try {
65+
startFuture.get(60, TimeUnit.SECONDS);
66+
} catch (final InterruptedException e) {
67+
Thread.currentThread().interrupt();
68+
throw new IllegalStateException("Interrupted while waiting for service to start", e);
69+
} catch (final ExecutionException e) {
70+
throw new IllegalStateException("Service " + serviceName + " failed to start", e);
71+
} catch (final TimeoutException e) {
72+
logger.warn("Service {} is taking an unusually long time to start", serviceName);
73+
}
74+
} while (!startFuture.isDone());
75+
}
76+
77+
private void waitForServiceToStop(Service service) {
78+
final String serviceName = service.getName();
79+
final CompletableFuture<?> stopFuture = service.stop();
80+
try {
81+
stopFuture.get(30, TimeUnit.SECONDS);
82+
} catch (final InterruptedException e) {
83+
logger.debug("Interrupted while waiting for service {} to complete", serviceName, e);
84+
Thread.currentThread().interrupt();
85+
} catch (final ExecutionException e) {
86+
logger.error("Service {} failed to shutdown", serviceName, e);
87+
} catch (final TimeoutException e) {
88+
logger.error("Service {} did not shut down cleanly", serviceName);
89+
}
90+
}
5591
}

0 commit comments

Comments
 (0)