Skip to content

Commit 590a460

Browse files
authored
jdbc_static filter: configure connection pool (#187)
* jdbc_static filter: configure connection pool By increasing the size of the connection pool from the Sequel-default 4 to 16, we increase the parallelism of pipelines that are using this filter by four- fold, reducing the situations where a pipeline's total throughput is constrained by this plugin's connection pool to the internal Derby DB. We also increase the pool timeout from the Sequel-default 5 seconds to 30 seconds, so that failures to immediately acquire a connection don't result in failed lookups. * Update CHANGELOG.md with link to PR
1 parent 446e218 commit 590a460

File tree

4 files changed

+27
-5
lines changed

4 files changed

+27
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.6.1
2+
- Fixes an issue where the `jdbc_static` filter's throughput was artificially limited to 4 concurrent queries, causing the plugin to become a bottleneck in pipelines with more than 4 workers. Each instance of the plugin is now limited to 16 concurrent queries, with increased timeouts to eliminate enrichment failures [#187](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/187)
3+
14
## 5.6.0
25
- Support other rufus scheduling options in JDBC Input [#183](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/183)
36

lib/logstash/filters/jdbc/read_write_database.rb

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ def post_initialize()
103103
super
104104
# get a fair reentrant read write lock
105105
@rwlock = java.util.concurrent.locks.ReentrantReadWriteLock.new(true)
106+
107+
# configure the connection pool to reduce the chances of
108+
# a worker thread being unable to acquire a connection.
109+
@options_hash[:max_connections] = 16 # sequel default: 4
110+
@options_hash[:pool_timeout] = 30 # sequel default: 5
106111
end
107112
end
108113
end end end

logstash-integration-jdbc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-jdbc'
3-
s.version = '5.6.0'
3+
s.version = '5.6.1'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Integration with JDBC - input and filter plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/filters/jdbc/read_write_database_spec.rb

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,13 @@ module LogStash module Filters module Jdbc
1919
stub_driver_class = double('org.apache.derby.jdbc.EmbeddedDriver').as_null_object
2020
expect(::Sequel::JDBC).to receive(:load_driver).once.with("org.apache.derby.jdbc.EmbeddedDriver").and_return(stub_driver_class)
2121
# two calls to connect because ReadWriteDatabase does verify_connection and connect
22-
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class, :test => true}).and_return(db)
23-
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, {:driver => stub_driver_class}).and_return(db)
22+
connection_options = {
23+
driver: stub_driver_class,
24+
max_connections: 16,
25+
pool_timeout: 30,
26+
}
27+
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options.merge(:test => true)).and_return(db)
28+
expect(::Sequel).to receive(:connect).once.with(connection_string_regex, connection_options).and_return(db)
2429
expect(read_write_db.empty_record_set).to eq([])
2530
end
2631

@@ -30,8 +35,17 @@ module LogStash module Filters module Jdbc
3035
password = Util::Password.new("secret")
3136
stub_driver_class = double('com.example.Driver')
3237
expect(::Sequel::JDBC).to receive(:load_driver).once.with("a driver class").and_return(stub_driver_class)
33-
expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value, :test => true}).and_return(db)
34-
expect(::Sequel).to receive(:connect).once.with(connection_str, {:driver => stub_driver_class, :user => user, :password => password.value}).and_return(db)
38+
39+
connection_options = {
40+
driver: stub_driver_class,
41+
user: user,
42+
password: password.value,
43+
max_connections: 16,
44+
pool_timeout: 30,
45+
}
46+
47+
expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options.merge(:test => true)).and_return(db)
48+
expect(::Sequel).to receive(:connect).once.with(connection_str, connection_options).and_return(db)
3549
described_class.create(connection_str, "a driver class", nil, user, password)
3650
end
3751
end

0 commit comments

Comments
 (0)