Skip to content

Commit f853ce6

Browse files
praseodymrobbavey
authored andcommitted
Re-initialise Netty worker group on plugin restart
This allows the plugin to actually recover from exceptions after a restart. It also has the side effect of providing nicer error messages and clearer stack traces to the end user. Fixes #289
1 parent f7a2978 commit f853ce6

File tree

1 file changed

+18
-6
lines changed

1 file changed

+18
-6
lines changed

src/main/java/org/logstash/beats/Server.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ public class Server {
2121
private final static Logger logger = LogManager.getLogger(Server.class);
2222

2323
private final int port;
24-
private final NioEventLoopGroup workGroup;
2524
private final String host;
2625
private final int beatsHeandlerThreadCount;
26+
private NioEventLoopGroup workGroup;
2727
private IMessageListener messageListener = new MessageListener();
2828
private SslSimpleBuilder sslBuilder;
2929
private BeatsInitializer beatsInitializer;
@@ -35,16 +35,24 @@ public Server(String host, int p, int timeout, int threadCount) {
3535
port = p;
3636
clientInactivityTimeoutSeconds = timeout;
3737
beatsHeandlerThreadCount = threadCount;
38-
workGroup = new NioEventLoopGroup();
3938
}
4039

4140
public void enableSSL(SslSimpleBuilder builder) {
4241
sslBuilder = builder;
4342
}
4443

4544
public Server listen() throws InterruptedException {
45+
if (workGroup != null) {
46+
try {
47+
logger.debug("Shutting down existing worker group before starting");
48+
workGroup.shutdownGracefully().sync();
49+
} catch (Exception e) {
50+
logger.error("Could not shut down worker group before starting", e);
51+
}
52+
}
53+
workGroup = new NioEventLoopGroup();
4654
try {
47-
logger.info("Starting server on port: " + this.port);
55+
logger.info("Starting server on port: {}", this.port);
4856

4957
beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount);
5058

@@ -63,16 +71,20 @@ public Server listen() throws InterruptedException {
6371
return this;
6472
}
6573

66-
public void stop() throws InterruptedException {
74+
public void stop() {
6775
logger.debug("Server shutting down");
6876
shutdown();
6977
logger.debug("Server stopped");
7078
}
7179

7280
private void shutdown(){
7381
try {
74-
workGroup.shutdownGracefully().sync();
75-
beatsInitializer.shutdownEventExecutor();
82+
if (workGroup != null) {
83+
workGroup.shutdownGracefully().sync();
84+
}
85+
if (beatsInitializer != null) {
86+
beatsInitializer.shutdownEventExecutor();
87+
}
7688
} catch (InterruptedException e){
7789
throw new IllegalStateException(e);
7890
}

0 commit comments

Comments
 (0)