Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/logstash/inputs/beats.rb
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def register
end # def register

def create_server
server = org.logstash.beats.Server.new(@host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server = org.logstash.beats.Server.new(@id, @host, @port, @client_inactivity_timeout, @event_loop_threads, @executor_threads)
server.setSslHandlerProvider(new_ssl_handshake_provider(new_ssl_context_builder)) if @ssl_enabled
server
end
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/logstash/beats/Runner.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ static public void main(String[] args) throws Exception {
// Check for leaks.
// ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);

Server server = new Server("0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());
Server server = new Server("test", "0.0.0.0", DEFAULT_PORT, 15, 0, Runtime.getRuntime().availableProcessors());

if(args.length > 0 && args[0].equals("ssl")) {
logger.debug("Using SSL");
Expand Down
21 changes: 14 additions & 7 deletions src/main/java/org/logstash/beats/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,14 @@
import org.apache.logging.log4j.Logger;
import org.logstash.netty.SslHandlerProvider;

import static org.logstash.beats.util.DaemonThreadFactory.daemonThreadFactory;

public class Server {
private final static Logger logger = LogManager.getLogger(Server.class);

private final int port;

private final String id;
private final String host;
private final int eventLoopThreadCount;
private final int executorThreadCount;
Expand All @@ -33,7 +37,8 @@ public class Server {

private final int clientInactivityTimeoutSeconds;

public Server(String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
public Server(String id, String host, int port, int clientInactivityTimeoutSeconds, int eventLoopThreadCount, int executorThreadCount) {
this.id = id;
this.host = host;
this.port = port;
this.clientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
Expand All @@ -54,12 +59,12 @@ public Server listen() throws InterruptedException {
logger.error("Could not shut down worker group before starting", e);
}
}
bossGroup = new NioEventLoopGroup(eventLoopThreadCount); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount);
bossGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-bossGroup")); // TODO: add a config to make it adjustable, no need many threads
workGroup = new NioEventLoopGroup(eventLoopThreadCount, daemonThreadFactory(id + "-workGroup"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this is helpful for investigation.

try {
logger.info("Starting server on port: {}", this.port);

beatsInitializer = new BeatsInitializer(messageListener, clientInactivityTimeoutSeconds, executorThreadCount);
beatsInitializer = new BeatsInitializer(id, messageListener, clientInactivityTimeoutSeconds, executorThreadCount);

ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workGroup)
Expand Down Expand Up @@ -143,12 +148,14 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
private final IMessageListener localMessageListener;
private final int localClientInactivityTimeoutSeconds;

BeatsInitializer(IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThread) {
BeatsInitializer(String pluginId, IMessageListener messageListener, int clientInactivityTimeoutSeconds, int beatsHandlerThreadCount) {
// Keeps a local copy of Server settings, so they can't be modified once it starts listening
this.localMessageListener = messageListener;
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD,
daemonThreadFactory(pluginId + "-idleStateHandler"));
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThreadCount,
daemonThreadFactory(pluginId + "-beatsHandler"));
}

public void initChannel(SocketChannel socket) {
Expand Down