diff --git a/CHANGELOG.md b/CHANGELOG.md index 815a95b..6d9bda9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 5.6.1 + - Fixes an issue where the `jdbc_static` filter's throughput was artificially limited to 4 concurrent queries, causing the plugin to become a bottleneck in pipelines with more than 4 workers. Each instance of the plugin is now limited to 16 concurrent queries, with increased timeouts to eliminate enrichment failures [#187](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/187) + ## 5.6.0 - Support other rufus scheduling options in JDBC Input [#183](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/183) diff --git a/lib/logstash/filters/jdbc/read_write_database.rb b/lib/logstash/filters/jdbc/read_write_database.rb index 23a4eee..c7782c7 100644 --- a/lib/logstash/filters/jdbc/read_write_database.rb +++ b/lib/logstash/filters/jdbc/read_write_database.rb @@ -103,6 +103,11 @@ def post_initialize() super # get a fair reentrant read write lock @rwlock = java.util.concurrent.locks.ReentrantReadWriteLock.new(true) + + # configure the connection pool to reduce the chances of + # a worker thread being unable to acquire a connection. + @options_hash[:max_connections] = 16 # sequel default: 4 + @options_hash[:pool_timeout] = 30 # sequel default: 5 end end end end end diff --git a/logstash-integration-jdbc.gemspec b/logstash-integration-jdbc.gemspec index 72ff52d..6182978 100755 --- a/logstash-integration-jdbc.gemspec +++ b/logstash-integration-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-integration-jdbc' - s.version = '5.6.0' + s.version = '5.6.1' s.licenses = ['Apache License (2.0)'] s.summary = "Integration with JDBC - input and filter plugins" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/filters/jdbc/read_write_database_spec.rb b/spec/filters/jdbc/read_write_database_spec.rb index eda4568..554dc0d 100644 --- a/spec/filters/jdbc/read_write_database_spec.rb +++ b/spec/filters/jdbc/read_write_database_spec.rb @@ -19,8 +19,13 @@ module LogStash module Filters module Jdbc stub_driver_class = double('org.apache.derby.jdbc.EmbeddedDriver').as_null_object expect(::Sequel::JDBC).to receive(:load_driver).once.with("org.apache.derby.jdbc.EmbeddedDriver").and_return(stub_driver_class) # two calls to connect because ReadWriteDatabase does verify_connection and connect - expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class, :test => true}).and_return(db) - expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class}).and_return(db) + connection_options = { + driver: stub_driver_class, + max_connections: 16, + pool_timeout: 30, + } + expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options.merge(:test => true)).and_return(db) + expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options).and_return(db) expect(read_write_db.empty_record_set).to eq([]) end @@ -30,8 +35,17 @@ module LogStash module Filters module Jdbc password = Util::Password.new("secret") stub_driver_class = double('com.example.Driver') expect(::Sequel::JDBC).to receive(:load_driver).once.with("a driver class").and_return(stub_driver_class) - expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value, :test => true}).and_return(db) - expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value}).and_return(db) + + connection_options = { + driver: stub_driver_class, + user: user, + password: password.value, + max_connections: 16, + pool_timeout: 30, + } + + expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options.merge(:test => true)).and_return(db) + expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options).and_return(db) described_class.create(connection_str, "a driver class", nil, user, password) end end