diff --git a/docs/filter-jdbc_streaming.asciidoc b/docs/filter-jdbc_streaming.asciidoc index 3596374..ddc9392 100644 --- a/docs/filter-jdbc_streaming.asciidoc +++ b/docs/filter-jdbc_streaming.asciidoc @@ -92,6 +92,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -120,6 +121,14 @@ filter plugins.   +[id="plugins-{type}s-{plugin}-ignore_empty_cache"] +===== `ignore_empty_cache` + + * Value type is <> + * Default value is `true` + +When set to `true`, skips caching nil values. Defaults to true. + [id="plugins-{type}s-{plugin}-cache_expiration"] ===== `cache_expiration` diff --git a/lib/logstash/filters/jdbc_streaming.rb b/lib/logstash/filters/jdbc_streaming.rb index 90a4be7..5b03935 100644 --- a/lib/logstash/filters/jdbc_streaming.rb +++ b/lib/logstash/filters/jdbc_streaming.rb @@ -76,6 +76,9 @@ module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base # Enable or disable caching, boolean true or false, defaults to true config :use_cache, :validate => :boolean, :default => true + # Enable or disable caching nil values, boolean true or false, defaults to true + config :ignore_empty_cache, :validate => :boolean, :default => true + # The minimum number of seconds any entry should remain in the cache, defaults to 5 seconds # A numeric value, you can use decimals for example `{ "cache_expiration" => 0.25 }` # If there are transient jdbc errors the cache will store empty results for a given diff --git a/lib/logstash/plugin_mixins/jdbc_streaming.rb b/lib/logstash/plugin_mixins/jdbc_streaming.rb index 8788625..4ca7a84 100644 --- a/lib/logstash/plugin_mixins/jdbc_streaming.rb +++ b/lib/logstash/plugin_mixins/jdbc_streaming.rb @@ -5,8 +5,8 @@ # for potential reuse in other plugins (input/output) module LogStash module PluginMixins module JdbcStreaming class RowCache - def initialize(size, ttl) - @cache = ::LruRedux::TTL::ThreadSafeCache.new(size, ttl) + def initialize(size, ttl, ignore_nil) + @cache = ::LruRedux::TTL::ThreadSafeCache.new(size, ttl, ignore_nil) end def get(parameters) @@ -15,7 +15,7 @@ def get(parameters) end class NoCache - def initialize(size, ttl) end + def initialize(size, ttl, ignore_nil) end def get(statement) yield diff --git a/lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb b/lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb index 3b47c94..21dfb0c 100644 --- a/lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb +++ b/lib/logstash/plugin_mixins/jdbc_streaming/statement_handler.rb @@ -20,7 +20,7 @@ def self.build_statement_handler(plugin) def initialize(plugin) @statement = plugin.statement klass = plugin.use_cache ? RowCache : NoCache - @cache = klass.new(plugin.cache_size, plugin.cache_expiration) + @cache = klass.new(plugin.cache_size, plugin.cache_expiration, plugin.ignore_empty_cache) post_init(plugin) end diff --git a/logstash-integration-jdbc.gemspec b/logstash-integration-jdbc.gemspec index 98c5a34..e65dae1 100755 --- a/logstash-integration-jdbc.gemspec +++ b/logstash-integration-jdbc.gemspec @@ -31,7 +31,7 @@ Gem::Specification.new do |s| # Pinned sequel version to >= 5.74.0 as it fixes the generic jdbc adapter to properly # handle disconnection errors, and avoid stale connections in the pool. s.add_runtime_dependency 'sequel', '>= 5.74.0' - s.add_runtime_dependency 'lru_redux' # lru cache with ttl + s.add_runtime_dependency 'sin_lru_redux' # lru cache with ttl s.add_runtime_dependency 'tzinfo' s.add_runtime_dependency 'tzinfo-data' diff --git a/spec/filters/integration/jdbcstreaming_spec.rb b/spec/filters/integration/jdbcstreaming_spec.rb index edb38ac..bb2f252 100644 --- a/spec/filters/integration/jdbcstreaming_spec.rb +++ b/spec/filters/integration/jdbcstreaming_spec.rb @@ -29,8 +29,9 @@ class TestJdbcStreaming < JdbcStreaming ::Sequel.connect(mixin_settings['jdbc_connection_string']) end let(:event) { ::LogStash::Event.new("message" => "some text", "ip" => ipaddr) } - let(:cache_expiration) { 3.0 } let(:use_cache) { true } + let(:ignore_empty_cache) { true } + let(:cache_expiration) { 3.0 } let(:cache_size) { 10 } let(:statement) { "SELECT name, location FROM reference_table WHERE ip = :ip" } let(:settings) do @@ -39,6 +40,7 @@ class TestJdbcStreaming < JdbcStreaming "parameters" => {"ip" => "ip"}, "target" => "server", "use_cache" => use_cache, + "ignore_empty_cache" => ignore_empty_cache, "cache_expiration" => cache_expiration, "cache_size" => cache_size, "tag_on_failure" => ["lookup_failed"], @@ -99,6 +101,7 @@ class TestJdbcStreaming < JdbcStreaming "prepared_statement_bind_values" => ["[ip]"], "target" => "server", "use_cache" => use_cache, + "ignore_empty_cache" => ignore_empty_cache, "cache_expiration" => cache_expiration, "cache_size" => cache_size, "tag_on_failure" => ["lookup_failed"], diff --git a/spec/filters/jdbc_streaming_spec.rb b/spec/filters/jdbc_streaming_spec.rb index 8298a73..69f7682 100644 --- a/spec/filters/jdbc_streaming_spec.rb +++ b/spec/filters/jdbc_streaming_spec.rb @@ -23,8 +23,9 @@ class TestJdbcStreaming < JdbcStreaming ::Sequel.connect(mixin_settings['jdbc_connection_string'], :user=> nil, :password=> nil) end let(:event) { ::LogStash::Event.new("message" => "some text", "ip" => ipaddr) } - let(:cache_expiration) { 3.0 } let(:use_cache) { true } + let(:ignore_empty_cache) { true } + let(:cache_expiration) { 3.0 } let(:cache_size) { 10 } before :each do @@ -57,6 +58,7 @@ class TestJdbcStreaming < JdbcStreaming "parameters" => {"ip" => "ip"}, "target" => "server", "use_cache" => use_cache, + "ignore_empty_cache" => ignore_empty_cache, "cache_expiration" => cache_expiration, "cache_size" => cache_size, "tag_on_failure" => ["lookup_failed"], @@ -191,6 +193,7 @@ class TestJdbcStreaming < JdbcStreaming "prepared_statement_bind_values" => ["[ip]", 2], "target" => "server", "use_cache" => use_cache, + "ignore_empty_cache" => ignore_empty_cache, "cache_expiration" => cache_expiration, "cache_size" => cache_size, "tag_on_failure" => ["lookup_failed"],