From dbdb8b277b667697be101db286eddffb39e81e29 Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Fri, 7 Apr 2017 11:47:31 +0300 Subject: [PATCH 1/3] Fix lost performance on debug enabled --- lib/logstash/plugin_mixins/jdbc.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index 6ef403f..e0fb857 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -218,7 +218,7 @@ def execute_statement(statement, parameters) query = @database[statement, parameters] sql_last_value = @use_column_value ? @sql_last_value : Time.now.utc @tracking_column_warning_sent = false - @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters, :count => query.count) + @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) if @jdbc_paging_enabled query.each_page(@jdbc_page_size) do |paged_dataset| From 1ab5006eee1b0b352cf9066fc15597a5e1ecd02e Mon Sep 17 00:00:00 2001 From: Ilya Zhigalko Date: Fri, 7 Apr 2017 13:36:32 +0300 Subject: [PATCH 2/3] Wrap statement in transaction with rollback Fixes #103 --- lib/logstash/plugin_mixins/jdbc.rb | 29 ++++++++++++++++++----------- spec/inputs/jdbc_spec.rb | 4 ++-- 2 files changed, 20 insertions(+), 13 deletions(-) diff --git a/lib/logstash/plugin_mixins/jdbc.rb b/lib/logstash/plugin_mixins/jdbc.rb index e0fb857..5497684 100644 --- a/lib/logstash/plugin_mixins/jdbc.rb +++ b/lib/logstash/plugin_mixins/jdbc.rb @@ -143,6 +143,7 @@ def open_jdbc_connection require "java" require "sequel" require "sequel/adapters/jdbc" + require "sequel/adapters/jdbc/transactions" load_drivers(@jdbc_driver_library.split(",")) if @jdbc_driver_library begin @@ -157,6 +158,7 @@ def open_jdbc_connection raise LogStash::ConfigurationError, "#{e}. #{message}" end @database = jdbc_connect() + @database.extend(Sequel::JDBC::Transactions) @database.extension(:pagination) if @jdbc_default_timezone @database.extension(:named_timezones) @@ -220,9 +222,21 @@ def execute_statement(statement, parameters) @tracking_column_warning_sent = false @logger.debug? and @logger.debug("Executing JDBC query", :statement => statement, :parameters => parameters) - if @jdbc_paging_enabled - query.each_page(@jdbc_page_size) do |paged_dataset| - paged_dataset.each do |row| + # Execute query in transaction cause PG driver require autocommit off for set fetch count + # See: https://jdbc.postgresql.org/documentation/head/query.html + @database.transaction do + if @jdbc_paging_enabled + query.each_page(@jdbc_page_size) do |paged_dataset| + paged_dataset.each do |row| + sql_last_value = get_column_value(row) if @use_column_value + if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) + sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` + end + yield extract_values_from(row) + end + end + else + query.each do |row| sql_last_value = get_column_value(row) if @use_column_value if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` @@ -230,14 +244,7 @@ def execute_statement(statement, parameters) yield extract_values_from(row) end end - else - query.each do |row| - sql_last_value = get_column_value(row) if @use_column_value - if @tracking_column_type=="timestamp" and @use_column_value and sql_last_value.is_a?(DateTime) - sql_last_value=Time.parse(sql_last_value.to_s) # Coerce the timestamp to a `Time` - end - yield extract_values_from(row) - end + raise Sequel::Rollback end success = true rescue Sequel::DatabaseConnectionError, Sequel::DatabaseError => e diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index c7784ee..508275c 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -833,7 +833,7 @@ { "statement" => "SELECT * FROM test_table", "jdbc_pool_timeout" => 0, - "jdbc_connection_string" => 'mock://localhost:1527/db', + "jdbc_connection_string" => 'jdbc:derby:memory:testdb;create=true', "sequel_opts" => { "max_connections" => 1 } @@ -889,7 +889,7 @@ end it "should report the statements to logging" do - expect(plugin.logger).to receive(:debug).once + expect(plugin.logger).to receive(:debug).thrice plugin.run(queue) end end From 323cdb04f5936d7bbec01f78d35d4e6015f95a78 Mon Sep 17 00:00:00 2001 From: Andrew Cholakian Date: Thu, 18 May 2017 11:50:27 -0500 Subject: [PATCH 3/3] Bump to 4.2.1 --- CHANGELOG.md | 3 +++ logstash-input-jdbc.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1149d0d..8e2b5e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 4.2.1 + - Wrap queries in rolled back transactions to work better with PostgreSQL + ## 4.2.0 - Automatically reconnect on connection issues - Fix test failures diff --git a/logstash-input-jdbc.gemspec b/logstash-input-jdbc.gemspec index 053d0e7..8b9d8ab 100755 --- a/logstash-input-jdbc.gemspec +++ b/logstash-input-jdbc.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-input-jdbc' - s.version = '4.2.0' + s.version = '4.2.1' s.licenses = ['Apache License (2.0)'] s.summary = "This example input streams a string at a definable interval." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"