Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
## 5.6.1
- Fixes an issue where the `jdbc_static` filter's throughput was artificially limited to 4 concurrent queries, causing the plugin to become a bottleneck in pipelines with more than 4 workers. Each instance of the plugin is now limited to 16 concurrent queries, with increased timeouts to eliminate enrichment failures [#187](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/187)

## 5.6.0
- Support other rufus scheduling options in JDBC Input [#183](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/183)

Expand Down
5 changes: 5 additions & 0 deletions lib/logstash/filters/jdbc/read_write_database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ def post_initialize()
super
# get a fair reentrant read write lock
@rwlock = java.util.concurrent.locks.ReentrantReadWriteLock.new(true)

# configure the connection pool to reduce the chances of
# a worker thread being unable to acquire a connection.
@options_hash[:max_connections] = 16 # sequel default: 4
@options_hash[:pool_timeout] = 30 # sequel default: 5
end
end
end end end
2 changes: 1 addition & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-integration-jdbc'
s.version = '5.6.0'
s.version = '5.6.1'
s.licenses = ['Apache License (2.0)']
s.summary = "Integration with JDBC - input and filter plugins"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
22 changes: 18 additions & 4 deletions spec/filters/jdbc/read_write_database_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,13 @@ module LogStash module Filters module Jdbc
stub_driver_class = double('org.apache.derby.jdbc.EmbeddedDriver').as_null_object
expect(::Sequel::JDBC).to receive(:load_driver).once.with("org.apache.derby.jdbc.EmbeddedDriver").and_return(stub_driver_class)
# two calls to connect because ReadWriteDatabase does verify_connection and connect
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class, :test => true}).and_return(db)
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class}).and_return(db)
connection_options = {
driver: stub_driver_class,
max_connections: 16,
pool_timeout: 30,
}
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options.merge(:test => true)).and_return(db)
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options).and_return(db)
expect(read_write_db.empty_record_set).to eq([])
end

Expand All @@ -30,8 +35,17 @@ module LogStash module Filters module Jdbc
password = Util::Password.new("secret")
stub_driver_class = double('com.example.Driver')
expect(::Sequel::JDBC).to receive(:load_driver).once.with("a driver class").and_return(stub_driver_class)
expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value, :test => true}).and_return(db)
expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value}).and_return(db)

connection_options = {
driver: stub_driver_class,
user: user,
password: password.value,
max_connections: 16,
pool_timeout: 30,
}

expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options.merge(:test => true)).and_return(db)
expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options).and_return(db)
described_class.create(connection_str, "a driver class", nil, user, password)
end
end
Expand Down