Skip to content

Commit cd75a15

Browse files
author
Guy Boertje
authored
Force all usage of sql_last_value to be typed according to the settings (#260)
When :sql_last_value is sent the to database, it is converted to the JVM default timezone. This change honors the jdbc_default_timezone for use in the :sql_last_value timezone conversion. Assuming America/Chicago as the default JVM timezone, but the configuration declares UTC as the database timezone jdbc_default_timezone => "UTC" The last_run is correctly recorded as 2018-02-23 22:30:34.054592000 Z , but when the > :sql_last_value gets sent to the database, it incorrectly gets converted to the JVM default timezone ( > '2018-02-23 16:30:34' ). The same holds true if reversed, Logstash has UTC as the JVM timezone and your database records datetimes in a local timezone. This change fixes the conversion when sending the :sql_last_value to the database to honor the jdbc_default_timezone setting. Specifically, this leverages the fact that the Sequel library will handle the timezone conversions properly if passed a DateTime object, and won't don't any timezone conversions if passed a Time object. This change also refactors parts of the code for better readability and adds more tests. Fixes #140
1 parent 5e6656d commit cd75a15

File tree

7 files changed

+288
-99
lines changed

7 files changed

+288
-99
lines changed

CHANGELOG.md

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1+
## 4.3.5
2+
- [#140](https://github.com/logstash-plugins/logstash-input-jdbc/issues/140) Fix long standing bug where setting jdbc_default_timezone loses milliseconds. Force all usage of sql_last_value to be typed according to the settings.
3+
14
## 4.3.4
25
- [#261](https://github.com/logstash-plugins/logstash-input-jdbc/issues/261) Fix memory leak.
3-
6+
47
## 4.3.3
58
- [#255](https://github.com/logstash-plugins/logstash-input-jdbc/issues/255) Fix thread and memory leak.
6-
9+
710
## 4.3.2
811
- [#251](https://github.com/logstash-plugins/logstash-input-jdbc/issues/251) Fix connection and memory leak.
9-
12+
1013
## 4.3.1
1114
- Update gemspec summary
1215

13-
## 4.3.0
14-
- [#147](https://github.com/logstash-plugins/logstash-input-jdbc/issues/147) Open and close connection for each query
16+
## 4.3.0
17+
- [#147](https://github.com/logstash-plugins/logstash-input-jdbc/issues/147) Open and close connection for each query
1518

1619
## 4.2.4
1720
- [#220](https://github.com/logstash-plugins/logstash-input-jdbc/issues/220) Log exception when database connection test fails

NOTICE

Whitespace-only changes.

lib/logstash/inputs/jdbc.rb

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
require "logstash/inputs/base"
33
require "logstash/namespace"
44
require "logstash/plugin_mixins/jdbc"
5-
require "yaml" # persistence
5+
66

77
# This plugin was created as a way to ingest data from any database
88
# with a JDBC interface into Logstash. You can periodically schedule ingestion
@@ -206,21 +206,16 @@ def register
206206
require "rufus/scheduler"
207207
prepare_jdbc_connection
208208

209-
# Raise an error if @use_column_value is true, but no @tracking_column is set
210209
if @use_column_value
210+
# Raise an error if @use_column_value is true, but no @tracking_column is set
211211
if @tracking_column.nil?
212212
raise(LogStash::ConfigurationError, "Must set :tracking_column if :use_column_value is true.")
213213
end
214214
end
215215

216-
@enable_encoding = !@charset.nil? || !@columns_charset.empty?
216+
@value_tracker = LogStash::PluginMixins::ValueTracking.build_last_value_tracker(self)
217217

218-
# load sql_last_value from file if exists
219-
if @clean_run && File.exist?(@last_run_metadata_path)
220-
File.delete(@last_run_metadata_path)
221-
elsif File.exist?(@last_run_metadata_path)
222-
@sql_last_value = YAML.load(File.read(@last_run_metadata_path))
223-
end
218+
@enable_encoding = !@charset.nil? || !@columns_charset.empty?
224219

225220
unless @statement.nil? ^ @statement_filepath.nil?
226221
raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
@@ -248,13 +243,11 @@ def run(queue)
248243
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
249244
@scheduler.cron @schedule do
250245
execute_query(queue)
251-
update_state_file
252246
end
253247

254248
@scheduler.join
255249
else
256250
execute_query(queue)
257-
update_state_file
258251
end
259252
end # def run
260253

@@ -267,7 +260,7 @@ def stop
267260

268261
def execute_query(queue)
269262
# update default parameters
270-
@parameters['sql_last_value'] = @sql_last_value
263+
@parameters['sql_last_value'] = @value_tracker.value
271264
execute_statement(@statement, @parameters) do |row|
272265
if enable_encoding?
273266
## do the necessary conversions to string elements
@@ -277,12 +270,7 @@ def execute_query(queue)
277270
decorate(event)
278271
queue << event
279272
end
280-
end
281-
282-
def update_state_file
283-
if @record_last_run
284-
File.write(@last_run_metadata_path, YAML.dump(@sql_last_value))
285-
end
273+
@value_tracker.write
286274
end
287275

288276
private

lib/logstash/plugin_mixins/jdbc.rb

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "logstash/config/mixin"
44
require "time"
55
require "date"
6+
require "logstash/plugin_mixins/value_tracking"
67

78
java_import java.util.concurrent.locks.ReentrantLock
89

@@ -195,17 +196,7 @@ def open_jdbc_connection
195196
public
196197
def prepare_jdbc_connection
197198
@connection_lock = ReentrantLock.new
198-
if @use_column_value
199-
case @tracking_column_type
200-
when "numeric"
201-
@sql_last_value = 0
202-
when "timestamp"
203-
@sql_last_value = Time.at(0).utc
204-
end
205-
else
206-
@sql_last_value = Time.at(0).utc
207-
end
208-
end # def prepare_jdbc_connection
199+
end
209200

210201
public
211202
def close_jdbc_connection
@@ -229,22 +220,20 @@ def execute_statement(statement, parameters)
229220
begin
230221
parameters = symbolized_params(parameters)
231222
query = @database[statement, parameters]
232-
sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc
223+
224+
sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc
233225
@tracking_column_warning_sent = false
234226
@logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count)
235227

236228
perform_query(query) do |row|
237229
sql_last_value = get_column_value(row) if @use_column_value
238-
if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime)
239-
sql_last_value = sql_last_value.to_time # Coerce the timestamp to a `Time`
240-
end
241230
yield extract_values_from(row)
242231
end
243232
success = true
244233
rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e
245234
@logger.warn("Exception when executing JDBC query", :exception => e)
246235
else
247-
@sql_last_value = sql_last_value
236+
@value_tracker.set_value(sql_last_value)
248237
ensure
249238
close_jdbc_connection
250239
@connection_lock.unlock
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
# encoding: utf-8
2+
require "yaml" # persistence
3+
4+
module LogStash module PluginMixins
5+
class ValueTracking
6+
7+
def self.build_last_value_tracker(plugin)
8+
if plugin.use_column_value && plugin.tracking_column_type == "numeric"
9+
# use this irrespective of the jdbc_default_timezone setting
10+
klass = NumericValueTracker
11+
else
12+
if plugin.jdbc_default_timezone.nil? || plugin.jdbc_default_timezone.empty?
13+
# no TZ stuff for Sequel, use Time
14+
klass = TimeValueTracker
15+
else
16+
# Sequel does timezone handling on DateTime only
17+
klass = DateTimeValueTracker
18+
end
19+
end
20+
21+
handler = NullFileHandler.new(plugin.last_run_metadata_path)
22+
if plugin.record_last_run
23+
handler = FileHandler.new(plugin.last_run_metadata_path)
24+
end
25+
if plugin.clean_run
26+
handler.clean
27+
end
28+
29+
instance = klass.new(handler)
30+
end
31+
32+
attr_reader :value
33+
34+
def initialize(handler)
35+
@file_handler = handler
36+
set_value(get_initial)
37+
end
38+
39+
def get_initial
40+
# override in subclass
41+
end
42+
43+
def set_value(value)
44+
# override in subclass
45+
end
46+
47+
def write
48+
@file_handler.write(@value)
49+
end
50+
end
51+
52+
53+
class NumericValueTracker < ValueTracking
54+
def get_initial
55+
@file_handler.read || 0
56+
end
57+
58+
def set_value(value)
59+
return unless value.is_a?(Numeric)
60+
@value = value
61+
end
62+
end
63+
64+
class DateTimeValueTracker < ValueTracking
65+
def get_initial
66+
@file_handler.read || DateTime.new(1970)
67+
end
68+
69+
def set_value(value)
70+
if value.respond_to?(:to_datetime)
71+
@value = value.to_datetime
72+
end
73+
end
74+
end
75+
76+
class TimeValueTracker < ValueTracking
77+
def get_initial
78+
@file_handler.read || Time.at(0).utc
79+
end
80+
81+
def set_value(value)
82+
if value.respond_to?(:to_time)
83+
@value = value.to_time
84+
end
85+
end
86+
end
87+
88+
class FileHandler
89+
def initialize(path)
90+
@path = path
91+
@exists = ::File.exist?(@path)
92+
end
93+
94+
def clean
95+
return unless @exists
96+
::File.delete(@path)
97+
@exists = false
98+
end
99+
100+
def read
101+
return unless @exists
102+
YAML.load(::File.read(@path))
103+
end
104+
105+
def write(value)
106+
::File.write(@path, YAML.dump(value))
107+
@exists = true
108+
end
109+
end
110+
111+
class NullFileHandler
112+
def initialize(path)
113+
end
114+
115+
def clean
116+
end
117+
118+
def read
119+
end
120+
121+
def write(value)
122+
end
123+
end
124+
end end

logstash-input-jdbc.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-jdbc'
3-
s.version = '4.3.4'
3+
s.version = '4.3.5'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Creates events from JDBC data"
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"

0 commit comments

Comments
 (0)