Skip to content

Commit 3aca2fe

Browse files
authored
Session handling reprise (#130)
* Revert "Switch pattern of usage Sequel to leverage the DB connection pool (#119)" This reverts functionality from commit deeb4b2. * input: avoid caching db connections in statement handler * input: fix issue where retried query could prevent pipeline shutdown The use of the `retry` keyword when a lock is acquired in the `begin...` block and released in the `ensure` block will cause the lock to be acquired more times than released whenever the block is retried. Because only one thread ever executes the queries, this isn't a problem at runtime, but can cause the plugin to stall during shutdown when a separate thread attempts to acquire the lock for cleanup. * input: retain fail-in-register if driver cannot be loaded
1 parent 1804cd7 commit 3aca2fe

File tree

7 files changed

+117
-121
lines changed

7 files changed

+117
-121
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
## 5.5.2
2+
- FIX: the input plugin's prior behaviour of opening a new database connection for each scheduled run (removed in `v5.4.1`) is restored, ensuring that infrequently-run schedules do not hold open connections to their databases indefinitely, _without_ reintroducing the leak [#130](https://github.com/logstash-plugins/logstash-integration-jdbc/pull/130)
3+
14
## 5.5.1
25
- Document `statement_retry_attempts` and `statement_retry_attempts_wait_time` options
36

lib/logstash/inputs/jdbc.rb

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ def register
261261
end
262262
end
263263

264+
prepare_jdbc_connection
265+
264266
if @use_column_value
265267
# Raise an error if @use_column_value is true, but no @tracking_column is set
266268
if @tracking_column.nil?
@@ -303,20 +305,7 @@ def register
303305
converters[encoding] = converter
304306
end
305307
end
306-
307308
load_driver
308-
begin
309-
open_jdbc_connection
310-
rescue Sequel::DatabaseConnectionError,
311-
Sequel::DatabaseError,
312-
Sequel::InvalidValue,
313-
Java::JavaSql::SQLException => e
314-
details = { exception: e.class, message: e.message }
315-
details[:cause] = e.cause.inspect if e.cause
316-
details[:backtrace] = e.backtrace if @logger.debug?
317-
@logger.warn("Exception when executing JDBC query", details)
318-
raise(LogStash::ConfigurationError, "Can't create a connection pool to the database")
319-
end
320309
end # def register
321310

322311
# test injection points

lib/logstash/plugin_mixins/jdbc/jdbc.rb

Lines changed: 72 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,17 @@
33
require "logstash/config/mixin"
44
require "time"
55
require "date"
6+
require "thread" # Monitor
67
require_relative "value_tracking"
78
require_relative "timezone_proxy"
89
require_relative "statement_handler"
910
require_relative "value_handler"
1011

11-
java_import java.util.concurrent.locks.ReentrantLock
12-
1312
# Tentative of abstracting JDBC logic to a mixin
1413
# for potential reuse in other plugins (input/output)
14+
#
15+
# CAUTION: implementation of this "potential reuse" module is
16+
# VERY tightly-coupled with the JDBC Input's implementation.
1517
module LogStash module PluginMixins module Jdbc
1618
module Jdbc
1719
include LogStash::PluginMixins::Jdbc::ValueHandler
@@ -159,82 +161,94 @@ def log_java_exception(e)
159161
end
160162

161163
def open_jdbc_connection
162-
# at this point driver is already loaded
163-
Sequel.application_timezone = @plugin_timezone.to_sym
164-
165-
@database = jdbc_connect()
166-
@database.extension(:pagination)
167-
if @jdbc_default_timezone
168-
@database.extension(:named_timezones)
169-
@database.timezone = TimezoneProxy.load(@jdbc_default_timezone)
170-
end
171-
if @jdbc_validate_connection
172-
@database.extension(:connection_validator)
173-
@database.pool.connection_validation_timeout = @jdbc_validation_timeout
174-
end
175-
@database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil?
176-
begin
177-
@database.test_connection
178-
rescue Java::JavaSql::SQLException => e
179-
@logger.warn("Failed test_connection with java.sql.SQLException.", :exception => e)
180-
rescue Sequel::DatabaseConnectionError => e
181-
@logger.warn("Failed test_connection.", :exception => e)
182-
#TODO return false and let the plugin raise a LogStash::ConfigurationError
183-
raise e
184-
end
164+
@connection_lock.synchronize do
165+
# at this point driver is already loaded
166+
Sequel.application_timezone = @plugin_timezone.to_sym
167+
168+
@database = jdbc_connect()
169+
@database.extension(:pagination)
170+
if @jdbc_default_timezone
171+
@database.extension(:named_timezones)
172+
@database.timezone = TimezoneProxy.load(@jdbc_default_timezone)
173+
end
174+
if @jdbc_validate_connection
175+
@database.extension(:connection_validator)
176+
@database.pool.connection_validation_timeout = @jdbc_validation_timeout
177+
end
178+
@database.fetch_size = @jdbc_fetch_size unless @jdbc_fetch_size.nil?
179+
begin
180+
@database.test_connection
181+
rescue Java::JavaSql::SQLException => e
182+
@logger.warn("Failed test_connection with java.sql.SQLException.", :exception => e)
183+
rescue Sequel::DatabaseConnectionError => e
184+
@logger.warn("Failed test_connection.", :exception => e)
185+
#TODO return false and let the plugin raise a LogStash::ConfigurationError
186+
raise e
187+
end
185188

186-
@database.sql_log_level = @sql_log_level.to_sym
187-
@database.logger = @logger
189+
@database.sql_log_level = @sql_log_level.to_sym
190+
@database.logger = @logger
188191

189-
@database.extension :identifier_mangling
192+
@database.extension :identifier_mangling
190193

191-
if @lowercase_column_names
192-
@database.identifier_output_method = :downcase
193-
else
194-
@database.identifier_output_method = :to_s
194+
if @lowercase_column_names
195+
@database.identifier_output_method = :downcase
196+
else
197+
@database.identifier_output_method = :to_s
198+
end
195199
end
196200
end
197201

202+
public
203+
def prepare_jdbc_connection
204+
@connection_lock = Monitor.new # aka ReentrantLock
205+
end
206+
198207
public
199208
def close_jdbc_connection
200-
begin
209+
@connection_lock.synchronize do
201210
# pipeline restarts can also close the jdbc connection, block until the current executing statement is finished to avoid leaking connections
202211
# connections in use won't really get closed
203212
@database.disconnect if @database
204-
rescue => e
205-
@logger.warn("Failed to close connection", :exception => e)
206213
end
214+
rescue => e
215+
@logger.warn("Failed to close connection", :exception => e)
207216
end
208217

209218
public
210-
def execute_statement
219+
def execute_statement(&result_handler)
211220
success = false
212221
retry_attempts = @statement_retry_attempts
213222

214-
begin
215-
retry_attempts -= 1
216-
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
217-
@tracking_column_warning_sent = false
218-
@statement_handler.perform_query(@database, @value_tracker.value) do |row|
219-
sql_last_value = get_column_value(row) if @use_column_value
220-
yield extract_values_from(row)
221-
end
222-
success = true
223-
rescue Sequel::Error, Java::JavaSql::SQLException => e
224-
details = { exception: e.class, message: e.message }
225-
details[:cause] = e.cause.inspect if e.cause
226-
details[:backtrace] = e.backtrace if @logger.debug?
227-
@logger.warn("Exception when executing JDBC query", details)
228-
229-
if retry_attempts == 0
230-
@logger.error("Unable to execute statement. Tried #{@statement_retry_attempts} times.")
223+
@connection_lock.synchronize do
224+
begin
225+
retry_attempts -= 1
226+
open_jdbc_connection
227+
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
228+
@tracking_column_warning_sent = false
229+
@statement_handler.perform_query(@database, @value_tracker.value) do |row|
230+
sql_last_value = get_column_value(row) if @use_column_value
231+
yield extract_values_from(row)
232+
end
233+
success = true
234+
rescue Sequel::Error, Java::JavaSql::SQLException => e
235+
details = { exception: e.class, message: e.message }
236+
details[:cause] = e.cause.inspect if e.cause
237+
details[:backtrace] = e.backtrace if @logger.debug?
238+
@logger.warn("Exception when executing JDBC query", details)
239+
240+
if retry_attempts == 0
241+
@logger.error("Unable to execute statement. Tried #{@statement_retry_attempts} times.")
242+
else
243+
@logger.error("Unable to execute statement. Trying again.")
244+
sleep(@statement_retry_attempts_wait_time)
245+
retry
246+
end
231247
else
232-
@logger.error("Unable to execute statement. Trying again.")
233-
sleep(@statement_retry_attempts_wait_time)
234-
retry
248+
@value_tracker.set_value(sql_last_value)
249+
ensure
250+
close_jdbc_connection
235251
end
236-
else
237-
@value_tracker.set_value(sql_last_value)
238252
end
239253

240254
return success

lib/logstash/plugin_mixins/jdbc/statement_handler.rb

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -124,14 +124,14 @@ def perform_query(db, sql_last_value)
124124
end
125125

126126
class PreparedStatementHandler < StatementHandler
127-
attr_reader :name, :bind_values_array, :statement_prepared, :prepared, :parameters
127+
attr_reader :name
128128

129129
def initialize(plugin)
130130
super(plugin)
131131
@name = plugin.prepared_statement_name.to_sym
132-
@bind_values_array = plugin.prepared_statement_bind_values
133-
@parameters = plugin.parameters
134-
@statement_prepared = Concurrent::AtomicBoolean.new(false)
132+
133+
@positional_bind_mapping = create_positional_bind_mapping(plugin.prepared_statement_bind_values).freeze
134+
@positional_bind_placeholders = @positional_bind_mapping.keys.map { |v| :"$#{v}" }.freeze
135135
end
136136

137137
# Performs the query, ignoring our pagination settings, yielding once per row of data
@@ -148,41 +148,28 @@ def perform_query(db, sql_last_value)
148148
private
149149

150150
def build_query(db, sql_last_value)
151-
@parameters = create_bind_values_hash
152-
if statement_prepared.false?
153-
prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym}
154-
@prepared = db[statement, *prepended].prepare(:select, name)
155-
statement_prepared.make_true
156-
end
157151
# under the scheduler the Sequel database instance is recreated each time
158152
# so the previous prepared statements are lost, add back
159-
if db.prepared_statement(name).nil?
160-
db.set_prepared_statement(name, prepared)
161-
end
162-
bind_value_sql_last_value(sql_last_value)
163-
begin
164-
db.call(name, parameters)
165-
rescue => e
166-
# clear the statement prepared flag - the statement may be closed by this
167-
# time.
168-
statement_prepared.make_false
169-
raise e
170-
end
153+
prepared = db.prepared_statement(name)
154+
prepared ||= db[statement, *positional_bind_placeholders].prepare(:select, name)
155+
156+
prepared.call(positional_bind_mapping(sql_last_value))
171157
end
172158

173-
def create_bind_values_hash
159+
def create_positional_bind_mapping(bind_values_array)
174160
hash = {}
175161
bind_values_array.each_with_index {|v,i| hash[:"p#{i}"] = v}
176162
hash
177163
end
178164

179-
def bind_value_sql_last_value(sql_last_value)
180-
parameters.keys.each do |key|
181-
value = parameters[key]
182-
if value == ":sql_last_value"
183-
parameters[key] = sql_last_value
184-
end
165+
def positional_bind_mapping(sql_last_value)
166+
@positional_bind_mapping.transform_values do |value|
167+
value == ":sql_last_value" ? sql_last_value : value
185168
end
186169
end
170+
171+
def positional_bind_placeholders
172+
@positional_bind_mapping.keys.map { |v| :"$#{v}" }.freeze
173+
end
187174
end
188175
end end end

logstash-integration-jdbc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-integration-jdbc'
3-
s.version = '5.5.1'
3+
s.version = '5.5.2'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Integration with JDBC - input and filter plugins"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

spec/inputs/integration/integ_spec.rb

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,21 +113,25 @@
113113
end
114114

115115
it "log warning msg when plugin run" do
116+
plugin.register
116117
expect( plugin ).to receive(:log_java_exception)
117118
expect(plugin.logger).to receive(:warn).once.with("Exception when executing JDBC query",
118119
hash_including(:message => instance_of(String)))
119-
expect{ plugin.register }.to raise_error(::LogStash::ConfigurationError)
120+
q = Queue.new
121+
expect{ plugin.run(q) }.not_to raise_error
120122
end
121123

122124
it "should log (native) Java driver error" do
125+
plugin.register
123126
expect( org.apache.logging.log4j.LogManager ).to receive(:getLogger).and_wrap_original do |m, *args|
124127
logger = m.call(*args)
125128
expect( logger ).to receive(:error) do |_, e|
126129
expect( e ).to be_a org.postgresql.util.PSQLException
127130
end.and_call_original
128131
logger
129132
end
130-
expect{ plugin.register }.to raise_error(::LogStash::ConfigurationError)
133+
q = Queue.new
134+
expect{ plugin.run(q) }.not_to raise_error
131135
end
132136
end
133137
end

spec/inputs/jdbc_spec.rb

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,6 +1283,7 @@
12831283
plugin.register
12841284
plugin.run(queue)
12851285
db = plugin.instance_variable_get(:@database)
1286+
expect(db.pool).to be_a_kind_of(::Sequel::ThreadedConnectionPool) # pries into internal details
12861287
expect(db.pool.instance_variable_get(:@timeout)).to eq(0)
12871288
expect(db.pool.instance_variable_get(:@max_size)).to eq(1)
12881289

@@ -1296,11 +1297,12 @@
12961297

12971298
it "should log error message" do
12981299
allow(Sequel).to receive(:connect).and_raise(Sequel::PoolTimeout)
1299-
expect(plugin.logger).to receive(:error).with("Failed to connect to database. 0 second timeout exceeded. Tried 1 times.")
1300-
expect do
1301-
plugin.register
1302-
plugin.run(queue)
1303-
end.to raise_error(Sequel::PoolTimeout)
1300+
allow(plugin.logger).to receive(:error)
1301+
1302+
plugin.register
1303+
plugin.run(queue)
1304+
1305+
expect(plugin.logger).to have_received(:error).with("Failed to connect to database. 0 second timeout exceeded. Tried 1 times.")
13041306
end
13051307
end
13061308

@@ -1376,12 +1378,13 @@
13761378
mixin_settings['connection_retry_attempts'] = 2
13771379
mixin_settings['jdbc_pool_timeout'] = 0
13781380
allow(Sequel).to receive(:connect).and_raise(Sequel::PoolTimeout)
1379-
expect(plugin.logger).to receive(:error).with("Failed to connect to database. 0 second timeout exceeded. Trying again.")
1380-
expect(plugin.logger).to receive(:error).with("Failed to connect to database. 0 second timeout exceeded. Tried 2 times.")
1381-
expect do
1382-
plugin.register
1383-
plugin.run(queue)
1384-
end.to raise_error(Sequel::PoolTimeout)
1381+
allow(plugin.logger).to receive(:error)
1382+
1383+
plugin.register
1384+
plugin.run(queue)
1385+
1386+
expect(plugin.logger).to have_received(:error).with("Failed to connect to database. 0 second timeout exceeded. Trying again.")
1387+
expect(plugin.logger).to have_received(:error).with("Failed to connect to database. 0 second timeout exceeded. Tried 2 times.")
13851388
end
13861389

13871390
it "should not fail when passed a non-positive value" do
@@ -1642,16 +1645,12 @@ def load_derby_version
16421645
{ "statement" => "SELECT * from types_table", "jdbc_driver_library" => invalid_driver_jar_path }
16431646
end
16441647

1645-
after do
1646-
plugin.stop
1647-
end
1648-
1649-
it "raise a loading error" do
1648+
it "raise a loading error during #register" do
16501649
expect(File.exists?(invalid_driver_jar_path)).to be true
16511650
expect(FileTest.readable?(invalid_driver_jar_path)).to be false
16521651

1653-
expect { plugin.register }.
1654-
to raise_error(LogStash::PluginLoadingError, /unable to load .*? from :jdbc_driver_library, file not readable/)
1652+
expect { plugin.register }
1653+
.to raise_error(LogStash::PluginLoadingError, /unable to load .*? from :jdbc_driver_library, file not readable/)
16551654
end
16561655
end
16571656

0 commit comments

Comments
 (0)