Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/filter-jdbc_streaming.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ This plugin supports the following configuration options plus the <<plugins-{typ
[cols="<,<,<",options="header",]
|=======================================================================
|Setting |Input type|Required
| <<plugins-{type}s-{plugin}-ignore_empty_cache>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-cache_expiration>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-cache_size>> |<<number,number>>|No
| <<plugins-{type}s-{plugin}-default_hash>> |<<hash,hash>>|No
Expand Down Expand Up @@ -120,6 +121,14 @@ filter plugins.

&nbsp;

[id="plugins-{type}s-{plugin}-ignore_empty_cache"]
===== `ignore_empty_cache`

* Value type is <<boolean,boolean>>
* Default value is `true`

When set to `true`, skips caching nil values. Defaults to true.

[id="plugins-{type}s-{plugin}-cache_expiration"]
===== `cache_expiration`

Expand Down
3 changes: 3 additions & 0 deletions lib/logstash/filters/jdbc_streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ module LogStash module Filters class JdbcStreaming < LogStash::Filters::Base
# Enable or disable caching, boolean true or false, defaults to true
config :use_cache, :validate => :boolean, :default => true

# Enable or disable caching nil values, boolean true or false, defaults to true
config :ignore_empty_cache, :validate => :boolean, :default => true

# The minimum number of seconds any entry should remain in the cache, defaults to 5 seconds
# A numeric value, you can use decimals for example `{ "cache_expiration" => 0.25 }`
# If there are transient jdbc errors the cache will store empty results for a given
Expand Down
6 changes: 3 additions & 3 deletions lib/logstash/plugin_mixins/jdbc_streaming.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
# for potential reuse in other plugins (input/output)
module LogStash module PluginMixins module JdbcStreaming
class RowCache
def initialize(size, ttl)
@cache = ::LruRedux::TTL::ThreadSafeCache.new(size, ttl)
def initialize(size, ttl, ignore_nil)
@cache = ::LruRedux::TTL::ThreadSafeCache.new(size, ttl, ignore_nil)
end

def get(parameters)
Expand All @@ -15,7 +15,7 @@ def get(parameters)
end

class NoCache
def initialize(size, ttl) end
def initialize(size, ttl, ignore_nil) end

def get(statement)
yield
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def self.build_statement_handler(plugin)
def initialize(plugin)
@statement = plugin.statement
klass = plugin.use_cache ? RowCache : NoCache
@cache = klass.new(plugin.cache_size, plugin.cache_expiration)
@cache = klass.new(plugin.cache_size, plugin.cache_expiration, plugin.ignore_empty_cache)
post_init(plugin)
end

Expand Down
2 changes: 1 addition & 1 deletion logstash-integration-jdbc.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ Gem::Specification.new do |s|
# Pinned sequel version to >= 5.74.0 as it fixes the generic jdbc adapter to properly
# handle disconnection errors, and avoid stale connections in the pool.
s.add_runtime_dependency 'sequel', '>= 5.74.0'
s.add_runtime_dependency 'lru_redux' # lru cache with ttl
s.add_runtime_dependency 'sin_lru_redux' # lru cache with ttl

s.add_runtime_dependency 'tzinfo'
s.add_runtime_dependency 'tzinfo-data'
Expand Down
5 changes: 4 additions & 1 deletion spec/filters/integration/jdbcstreaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ class TestJdbcStreaming < JdbcStreaming
::Sequel.connect(mixin_settings['jdbc_connection_string'])
end
let(:event) { ::LogStash::Event.new("message" => "some text", "ip" => ipaddr) }
let(:cache_expiration) { 3.0 }
let(:use_cache) { true }
let(:ignore_empty_cache) { true }
let(:cache_expiration) { 3.0 }
let(:cache_size) { 10 }
let(:statement) { "SELECT name, location FROM reference_table WHERE ip = :ip" }
let(:settings) do
Expand All @@ -39,6 +40,7 @@ class TestJdbcStreaming < JdbcStreaming
"parameters" => {"ip" => "ip"},
"target" => "server",
"use_cache" => use_cache,
"ignore_empty_cache" => ignore_empty_cache,
"cache_expiration" => cache_expiration,
"cache_size" => cache_size,
"tag_on_failure" => ["lookup_failed"],
Expand Down Expand Up @@ -99,6 +101,7 @@ class TestJdbcStreaming < JdbcStreaming
"prepared_statement_bind_values" => ["[ip]"],
"target" => "server",
"use_cache" => use_cache,
"ignore_empty_cache" => ignore_empty_cache,
"cache_expiration" => cache_expiration,
"cache_size" => cache_size,
"tag_on_failure" => ["lookup_failed"],
Expand Down
5 changes: 4 additions & 1 deletion spec/filters/jdbc_streaming_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ class TestJdbcStreaming < JdbcStreaming
::Sequel.connect(mixin_settings['jdbc_connection_string'], :user=> nil, :password=> nil)
end
let(:event) { ::LogStash::Event.new("message" => "some text", "ip" => ipaddr) }
let(:cache_expiration) { 3.0 }
let(:use_cache) { true }
let(:ignore_empty_cache) { true }
let(:cache_expiration) { 3.0 }
let(:cache_size) { 10 }

before :each do
Expand Down Expand Up @@ -57,6 +58,7 @@ class TestJdbcStreaming < JdbcStreaming
"parameters" => {"ip" => "ip"},
"target" => "server",
"use_cache" => use_cache,
"ignore_empty_cache" => ignore_empty_cache,
"cache_expiration" => cache_expiration,
"cache_size" => cache_size,
"tag_on_failure" => ["lookup_failed"],
Expand Down Expand Up @@ -191,6 +193,7 @@ class TestJdbcStreaming < JdbcStreaming
"prepared_statement_bind_values" => ["[ip]", 2],
"target" => "server",
"use_cache" => use_cache,
"ignore_empty_cache" => ignore_empty_cache,
"cache_expiration" => cache_expiration,
"cache_size" => cache_size,
"tag_on_failure" => ["lookup_failed"],
Expand Down