diff --git a/.ci/run.sh b/.ci/run.sh index 49b60b8..4890e9b 100755 --- a/.ci/run.sh +++ b/.ci/run.sh @@ -5,4 +5,6 @@ set -ex export USER='logstash' -bundle exec rspec spec && bundle exec rspec spec --tag integration +export LOG_LEVEL='trace' + +jruby -rbundler/setup -S rspec -fd && jruby -rbundler/setup -S rspec -fd --tag integration diff --git a/.ci/setup.sql b/.ci/setup.sql index 8697f23..9007d9e 100644 --- a/.ci/setup.sql +++ b/.ci/setup.sql @@ -52,8 +52,12 @@ create DATABASE jdbc_input_db; CREATE TABLE employee ( emp_no integer NOT NULL, first_name VARCHAR (50) NOT NULL, - last_name VARCHAR (50) NOT NULL + last_name VARCHAR (50) NOT NULL, + created_at DATE NOT NULL DEFAULT CURRENT_DATE, + updated_at TIMESTAMP ); -INSERT INTO employee VALUES (1, 'David', 'Blenkinsop'); -INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer'); +INSERT INTO employee VALUES (1, 'David', 'Blenkinsop', '2000-01-02', '2000-02-01 00:30:40'); +INSERT INTO employee VALUES (2, 'Mark', 'Guckenheimer', '2000-01-01', '2020-01-01 20:30:40+00'); +INSERT INTO employee VALUES (3, 'Ján', 'Borůvka', '2000-02-01', '2020-01-31 20:30:40+00'); +INSERT INTO employee VALUES (4, 'Jožko', 'Šuška', '2010-01-01', NULL); diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 4b138c4..266c245 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -209,7 +209,7 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base config :prepared_statement_bind_values, :validate => :array, :default => [] - attr_reader :database # for test mocking/stubbing + attr_reader :database, :value_tracker # for test mocking/stubbing public @@ -260,6 +260,11 @@ def register converters[encoding] = converter end end + + require "sequel" + require "sequel/adapters/jdbc" + + Sequel.application_timezone = @plugin_timezone.to_sym end # def register # test injection points @@ -268,6 +273,7 @@ def set_statement_logger(instance) end def set_value_tracker(instance) + @logger.debug "using last run value tracker: #{instance.inspect}" @value_tracker = instance end diff --git a/lib/logstash/plugin_mixins/jdbc/common.rb b/lib/logstash/plugin_mixins/jdbc/common.rb index 9c30e97..701dc92 100644 --- a/lib/logstash/plugin_mixins/jdbc/common.rb +++ b/lib/logstash/plugin_mixins/jdbc/common.rb @@ -18,15 +18,11 @@ def complete_sequel_opts(defaults = {}) def load_driver return @driver_impl if @driver_impl ||= nil - require "java" - require "sequel" - require "sequel/adapters/jdbc" - load_driver_jars begin @driver_impl = Sequel::JDBC.load_driver(@jdbc_driver_class) rescue Sequel::AdapterNotFound => e # Sequel::AdapterNotFound, "#{@jdbc_driver_class} not loaded" - # fix this !!! + @logger.debug("driver loading failed", :exception => e.class, :message => e.message) message = if jdbc_driver_library_set? "Are you sure you've included the correct jdbc driver in :jdbc_driver_library?" else diff --git a/lib/logstash/plugin_mixins/jdbc/jdbc.rb b/lib/logstash/plugin_mixins/jdbc/jdbc.rb index e7517b5..e608d41 100644 --- a/lib/logstash/plugin_mixins/jdbc/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc/jdbc.rb @@ -134,7 +134,6 @@ def jdbc_connect def open_jdbc_connection # at this point driver is already loaded - Sequel.application_timezone = @plugin_timezone.to_sym @database = jdbc_connect() @database.extension(:pagination) @@ -198,7 +197,8 @@ def execute_statement begin sql_last_value = @use_column_value ? @value_tracker.value : Time.now.utc @tracking_column_warning_sent = false - @statement_handler.perform_query(@database, @value_tracker.value, @jdbc_paging_enabled, @jdbc_page_size) do |row| + @statement_handler.perform_query(@database, @value_tracker.value) do |row| + @logger.trace? && @logger.trace("result row:", row) sql_last_value = get_column_value(row) if @use_column_value yield extract_values_from(row) end @@ -208,6 +208,7 @@ def execute_statement details[:backtrace] = e.backtrace if @logger.debug? @logger.warn("Exception when executing JDBC query", details) else + @logger.debug "last run value", :sql_last_value => sql_last_value @value_tracker.set_value(sql_last_value) ensure close_jdbc_connection diff --git a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb index 3354640..a88c725 100644 --- a/lib/logstash/plugin_mixins/jdbc/statement_handler.rb +++ b/lib/logstash/plugin_mixins/jdbc/statement_handler.rb @@ -3,7 +3,15 @@ module LogStash module PluginMixins module Jdbc class StatementHandler def self.build_statement_handler(plugin, logger) - klass = plugin.use_prepared_statements ? PreparedStatementHandler : NormalStatementHandler + if plugin.use_prepared_statements + klass = PreparedStatementHandler + else + if plugin.jdbc_paging_enabled + klass = PagingStatementHandler + else + klass = NormalStatementHandler + end + end klass.new(plugin, logger) end @@ -29,18 +37,10 @@ class NormalStatementHandler < StatementHandler # @param db [Sequel::Database] # @param sql_last_value [Integet|DateTime|Time] # @yieldparam row [Hash{Symbol=>Object}] - def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size) + def perform_query(db, sql_last_value) query = build_query(db, sql_last_value) - if jdbc_paging_enabled - query.each_page(jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| - yield row - end - end - else - query.each do |row| - yield row - end + query.each do |row| + yield row end end @@ -67,14 +67,32 @@ def post_init(plugin) end end + class PagingStatementHandler < NormalStatementHandler + + def initialize(plugin, statement_logger) + super(plugin, statement_logger) + @page_size = plugin.jdbc_page_size + end + + def perform_query(db, sql_last_value) + query = build_query(db, sql_last_value) + query.each_page(@page_size) do |paged_dataset| + paged_dataset.each do |row| + yield row + end + end + end + + end + class PreparedStatementHandler < StatementHandler - attr_reader :name, :bind_values_array, :statement_prepared, :prepared + attr_reader :name # Performs the query, ignoring our pagination settings, yielding once per row of data # @param db [Sequel::Database] # @param sql_last_value [Integet|DateTime|Time] # @yieldparam row [Hash{Symbol=>Object}] - def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size) + def perform_query(db, sql_last_value) query = build_query(db, sql_last_value) query.each do |row| yield row @@ -84,25 +102,24 @@ def perform_query(db, sql_last_value, jdbc_paging_enabled, jdbc_page_size) private def build_query(db, sql_last_value) - @parameters = create_bind_values_hash - if statement_prepared.false? - prepended = parameters.keys.map{|v| v.to_s.prepend("$").to_sym} + if @statement_prepared.false? + prepended = parameters.keys.map { |v| v.to_s.prepend('$').to_sym } @prepared = db[statement, *prepended].prepare(:select, name) - statement_prepared.make_true + @statement_prepared.make_true end # under the scheduler the Sequel database instance is recreated each time # so the previous prepared statements are lost, add back if db.prepared_statement(name).nil? - db.set_prepared_statement(name, prepared) + db.set_prepared_statement(name, @prepared) end - bind_value_sql_last_value(sql_last_value) + bind_sql_last_value_parameter(sql_last_value) statement_logger.log_statement_parameters(statement, parameters, nil) begin db.call(name, parameters) rescue => e # clear the statement prepared flag - the statement may be closed by this # time. - statement_prepared.make_false + @statement_prepared.make_false raise e end end @@ -113,24 +130,17 @@ def post_init(plugin) @statement_logger.disable_count @name = plugin.prepared_statement_name.to_sym - @bind_values_array = plugin.prepared_statement_bind_values - @parameters = plugin.parameters - @statement_prepared = Concurrent::AtomicBoolean.new(false) - end + @parameters = {} # plugin.parameters are ignored in favor of prepared_statement_bind_values + plugin.prepared_statement_bind_values.each_with_index { |v,i| @parameters[:"p#{i}"] = v } + + sql_last_value_pair = @parameters.find { |_, value| value == ":sql_last_value" } + @sql_last_value_key = sql_last_value_pair ? sql_last_value_pair.first : nil - def create_bind_values_hash - hash = {} - bind_values_array.each_with_index {|v,i| hash[:"p#{i}"] = v} - hash + @statement_prepared = Concurrent::AtomicBoolean.new(false) end - def bind_value_sql_last_value(sql_last_value) - parameters.keys.each do |key| - value = parameters[key] - if value == ":sql_last_value" - parameters[key] = sql_last_value - end - end + def bind_sql_last_value_parameter(sql_last_value) + parameters[@sql_last_value_key] = sql_last_value if @sql_last_value_key end end end end end diff --git a/spec/inputs/integration/integ_spec.rb b/spec/inputs/integration/integ_spec.rb index f85daf1..43101c5 100644 --- a/spec/inputs/integration/integ_spec.rb +++ b/spec/inputs/integration/integ_spec.rb @@ -1,14 +1,13 @@ -require "logstash/devutils/rspec/spec_helper" -require "logstash/inputs/jdbc" -require "sequel" -require "sequel/adapters/jdbc" - +require_relative 'spec_helper' +require 'tempfile' describe LogStash::Inputs::Jdbc, :integration => true do # This is a necessary change test-wide to guarantee that no local timezone # is picked up. It could be arbitrarily set to any timezone, but then the test # would have to compensate differently. That's why UTC is chosen. - ENV["TZ"] = "Etc/UTC" + before(:all) { ENV['TZ'] = "Etc/UTC" } unless ENV['KEEP_TZ'] == 'true' + after(:all) { ENV['TZ'] = ENV_TZ } unless ENV['KEEP_TZ'] == 'true' + # For Travis and CI based on docker, we source from ENV jdbc_connection_string = ENV.fetch("PG_CONNECTION_STRING", "jdbc:postgresql://postgresql:5432") + "/jdbc_input_db?user=postgres" @@ -16,7 +15,7 @@ let(:settings) do { "jdbc_driver_class" => "org.postgresql.Driver", "jdbc_connection_string" => jdbc_connection_string, - "jdbc_driver_library" => "/usr/share/logstash/postgresql.jar", + "jdbc_driver_library" => ENV['POSTGRES_DRIVER_JAR'] || "/usr/share/logstash/postgresql.jar", "jdbc_user" => "postgres", "jdbc_password" => ENV["POSTGRES_PASSWORD"], "statement" => 'SELECT FIRST_NAME, LAST_NAME FROM "employee" WHERE EMP_NO = 2' @@ -74,5 +73,165 @@ end.to raise_error(::Sequel::DatabaseConnectionError) end end + + context "using sql last value" do + + let(:last_run_metadata_file) do + Tempfile.new('last_run_metadata') + end + + before do + plugin.register + end + + after do + plugin.stop + last_run_metadata_file.close rescue nil + File.unlink(last_run_metadata_file.path) rescue nil + end + + context '(DATE)' do + + let(:settings) do + super.merge('use_column_value' => true, + 'tracking_column' => "created_at", + 'tracking_column_type' => "timestamp", + 'jdbc_default_timezone' => "America/New_York", # ENV["TZ"] is "Etc/UTC" + 'schedule' => "*/2 * * * * *", # every 2 seconds + 'last_run_metadata_path' => last_run_metadata_file.path, + 'statement' => "SELECT * FROM employee WHERE created_at > :sql_last_value ORDER BY created_at") + end + + it "should populate the event with database entries" do + Thread.start { plugin.run(queue) } + + sleep(3.0) + + expect( queue.size ).to be >= 4 + event = queue.pop + expect(event.get('first_name')).to eq("Mark") + event = queue.pop + expect(event.get('first_name')).to eq("David") + event = queue.pop + expect(event.get('first_name')).to eq("Ján") + expect(event.get('created_at').to_s).to eql '2000-02-01T00:00:00.000Z' if env_zone_utc? + event = queue.pop + expect(event.get('first_name')).to eq("Jožko") + + expect( last_run_value = read_yaml(last_run_metadata_file.path) ).to be >= DateTime.new(2010) + expect( last_run_value.zone ).to eql '+00:00' + + expect( queue.size ).to be(0), lambda { "queue wasn't empty, size: #{queue.size} - #{queue.pop.to_hash}" } + + begin + delete_test_employee_data!(plugin.database) + insert_test_employee_data!(plugin.database, now = Date.today, :created_at) + + sleep(2.5) + + # TODO will return "emp_no" => 4 ("Jožko") again + # due SELECT * FROM employee WHERE created_at > '2009-12-31 19:00:00.000000-0500' + # expect( queue.size ).to eql 3 + # + # event = queue.pop + # expect(event.get('first_name')).to eq("3") + # event = queue.pop + # expect(event.get('first_name')).to eq("2") + # event = queue.pop + # expect(event.get('first_name')).to eq("1") + + expect( read_yaml(last_run_metadata_file.path) ).to be > last_run_value + expect( read_yaml(last_run_metadata_file.path) ).to be > now + + ensure + delete_test_employee_data!(plugin.database) + end + end + + end + + context '(TIMESTAMP)' do + + let(:settings) do + super.merge('use_column_value' => true, + 'tracking_column' => "updated_at", + 'tracking_column_type' => "timestamp", + 'jdbc_default_timezone' => "Europe/Paris", # ENV["TZ"] is "Etc/UTC" + 'schedule' => "*/1 * * * * *", # every second + 'last_run_metadata_path' => last_run_metadata_file.path, + 'statement' => "SELECT * FROM employee WHERE updated_at IS NOT NULL "+ + "AND updated_at > :sql_last_value ORDER BY updated_at") + end + + it "should populate the event with database entries" do + Thread.start { plugin.run(queue) } + + sleep(1.75) + + expect( queue.size ).to be 3 + event = queue.pop + expect(event.get('first_name')).to eq("David") + event = queue.pop + expect(event.get('first_name')).to eq("Mark") + event = queue.pop + expect(event.get('first_name')).to eq("Ján") + + if ENV['TZ'].nil? && Time.new.utc_offset == 60 * 60 + # for local TZ != UTC this gets adjusted: 2000-01-31T23:00:00.000Z + expect(event.get('created_at').to_s).to eql '2000-01-31T23:00:00.000Z' + # expect(event.get('updated_at').to_s).to eql '2020-01-31T19:30:40.000Z' # DateTime -> 2020-01-31T20:30:40.000Z + else # assume TZ = UTC + expect(event.get('created_at').to_s).to eql '2000-02-01T00:00:00.000Z' + end + + last_run_value = read_yaml last_run_metadata_file.path + puts "(1) last_run_value: #{last_run_value.inspect} - #{last_run_value.class}" # 2020-01-31 19:30:40 UTC - Time + + expect( last_run_value.to_datetime ).to be >= DateTime.new(2020, 01, 31, 19, 30, 40) + expect( last_run_value.to_datetime.zone ).to eql '+00:00' + + begin + delete_test_employee_data!(plugin.database) + insert_test_employee_data!(plugin.database, now = Time.now, :updated_at) + plugin.database.run "INSERT INTO employee VALUES (42, '42', 'user', CURRENT_DATE, '2020-01-31 20:32:40')" + + sleep(1.0) + + expect( queue.size ).to eql 4 + + event = queue.pop + expect(event.get('first_name')).to eq("42") + event = queue.pop + expect(event.get('first_name')).to eq("3") + event = queue.pop + expect(event.get('first_name')).to eq("2") + event = queue.pop + expect(event.get('first_name')).to eq("1") + + last_run_value2 = read_yaml last_run_metadata_file.path + puts "(2) last_run_value: #{last_run_value2.inspect} - #{last_run_value2.class}" # 2020-01-31 20:30:40 +0000 - Time + + # e.g. # + expect( last_run_value2 ).to be > last_run_value + expect( last_run_value2.to_time ).to be > now + + ensure + delete_test_employee_data!(plugin.database) + end + end + + end + + def insert_test_employee_data!(db, now, row) + db[:employee].insert(:emp_no => 10, :first_name => '2', :last_name => 'user', row => now - 1) + db[:employee].insert(:emp_no => 11, :first_name => '3', :last_name => 'user', row => now - 2) + db[:employee].insert(:emp_no => 12, :first_name => '1', :last_name => 'user', row => now + 1) + end + + def delete_test_employee_data!(db) + db[:employee].where(:last_name => 'user').delete + end + + end end diff --git a/spec/inputs/integration/spec_helper.rb b/spec/inputs/integration/spec_helper.rb new file mode 100644 index 0000000..20364b1 --- /dev/null +++ b/spec/inputs/integration/spec_helper.rb @@ -0,0 +1,26 @@ +require "logstash/devutils/rspec/spec_helper" +require "logstash/inputs/jdbc" + +ENV_TZ = ENV["TZ"] + +module LogStash::Inputs::Jdbc::SpecHelpers + + def puts(msg) + !$VERBOSE.nil? && Kernel.puts(msg) + end + + def read_yaml(path) + # "--- !ruby/object:DateTime '2020-11-17 07:56:23.978705000 Z'\n" + YAML.load(File.read(path)) + end + + def env_zone_utc? + # we allow (local) testing with skipping the forced ENV['TZ'] = ... + ENV['TZ'] == "Etc/UTC" + end + +end + +RSpec.configure do |config| + config.include LogStash::Inputs::Jdbc::SpecHelpers +end diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index 7147256..99fce77 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -1097,7 +1097,7 @@ it "should raise PoolTimeout error" do plugin.register plugin.run(queue) - db = plugin.instance_variable_get(:@database) + db = plugin.database expect(db.pool.instance_variable_get(:@timeout)).to eq(0) expect(db.pool.instance_variable_get(:@max_size)).to eq(1) @@ -1143,7 +1143,8 @@ end it "should report the statements to logging" do - expect(plugin.logger).to receive(:debug).once + expect(plugin.logger).to receive(:debug).with(/.*? SELECT \* from test_table/).once + expect(plugin.logger).to receive(:debug).with(any_args) plugin.run(queue) end end