-
Notifications
You must be signed in to change notification settings - Fork 75
Description
Logstash information:
docker container docker.elastic.co/logstash/logstash:8.11.4
Description of the problem including expected versus actual behavior:
I have messages getting sent to the logstash tcp input plugin from python code running on 5,000+ workers.
The connection to the logstash input is through an azure load balancer.
It seems like logstash isn't recognizing when these connections get disconnected, because I can easily build up 100,000 connections in the CLOSE_WAIT state on the logstash server in ~30 min. Eventually this seems to essentially stall logstash (it stops making output connections to elasticsearch). Restarting the docker container instantly cleans up all the connections, and after restart sometimes it takes a few hours to get into this bad state where most connections seem to get left in the CLOST_WAIT state.
Steps to reproduce:
- start logstash docker container with the configuration below
- run the below code (every time you run it, another connection is left in the
CLOSE_WAITstate on the logstash side)
This may only happen when many clients are sending messages to the same logstash TCP input at the same time.
input {
tcp {
type => "pipelinelog"
port => 5040
codec => json
tcp_keep_alive => true
}
udp {
type => "pipelinelog"
port => 5040
codec => json
}
...
output {
# stdout { codec => rubydebug }
elasticsearch {
hosts => "https://<hostname>:9243"
user => "<user>"
password => "<password>"
index => "pipelinelog-%{name}-%{+yyyy.MM.dd}"
action => "create"
compression_level => 0
resurrect_delay => 1
retry_initial_interval => 1
retry_max_interval => 1
retry_on_conflict => 0
timeout => 10
silence_errors_in_log => ["version_conflict_engine_exception"]
manage_template => false
ilm_enabled => false
}
}
import json
import logging
from logging.handlers import SocketHandler
class LogstashFormatter(logging.Formatter):
def __init__(self):
logging.Formatter.__init__(self, None, '%a %b %d %H:%M:%S %Z %Y')
def format(self, record):
return json.dumps({"msg": record.getMessage()})
class TcpLogstashHandler(SocketHandler):
def __init__(self, host, port, formatter):
super().__init__(host, port)
self.formatter = formatter
class LogstashHandler(TcpLogstashHandler):
def __init__(self, host, port, name, formatter):
super().__init__(host, port, formatter)
self.name = name
self.setFormatter(formatter)
if __name__ == "__main__":
log = logging.Logger("test")
ls_formatter = LogstashFormatter()
ls_handler = LogstashHandler("<hostname>", 5040, "pipeline", ls_formatter)
log.addHandler(ls_handler)
s_handler = logging.StreamHandler()
log.addHandler(s_handler)
log.info("testing 1, 2, 3...")