diff --git a/docs/input-jdbc.asciidoc b/docs/input-jdbc.asciidoc index d1bdb33..dee0f84 100644 --- a/docs/input-jdbc.asciidoc +++ b/docs/input-jdbc.asciidoc @@ -41,9 +41,9 @@ options for more info. Input from this plugin can be scheduled to run periodically according to a specific schedule. This scheduling syntax is powered by https://github.com/jmettraux/rufus-scheduler[rufus-scheduler]. -The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ). +The syntax is either cron-like with some extensions specific to Rufus (e.g. timezone support ) if using the `schedule` option or periodic when using `period` or `interval` option. -Examples: +Examples for `schedule`: |========================================================== | `* 5 * 1-3 *` | will execute every minute of 5am every day of January through March. @@ -51,9 +51,21 @@ Examples: | `0 6 * * * America/Chicago` | will execute at 6:00am (UTC/GMT -5) every day. |========================================================== +Examples for `period` or `interval`: + +|========================================================== +| `1m` | will execute every minute +| `3h10m` | will execute every three hours and 10 minutes +|========================================================== Further documentation describing this syntax can be found https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings[here]. +`interval` jobs trigger, execute and then trigger again after the interval elapsed. + +`period` jobs try to trigger following the frequency they were scheduled with. + +You can only use one of `interval`, `period` or `schedule` at the same time. + ==== State The plugin will persist the `sql_last_value` parameter in the form of a @@ -212,6 +224,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|Yes | <> |<>|No | <> |<>|Yes @@ -229,6 +242,7 @@ This plugin supports the following configuration options plus the <> |<>|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>, one of `["local", "utc"]`|No | <> |<>|No | <> |<>|No @@ -298,6 +312,16 @@ Maximum number of times to try connecting to database Number of seconds to sleep between connection attempts +[id="plugins-{type}s-{plugin}-interval"] +===== `interval` + + * Value type is <> + * There is no default value for this setting. + +This takes a string in the form of `1h`, `1m`, to denote a time interval. `interval` jobs trigger, execute and trigger again after the provided time interval has elapsed. + +There is no schedule by default. If no scheduling statement is given, then the statement is run exactly once. + [id="plugins-{type}s-{plugin}-jdbc_connection_string"] ===== `jdbc_connection_string` @@ -538,6 +562,16 @@ Whether to force the lowercasing of identifier fields Hash of query parameter, for example `{ "target_id" => "321" }` +[id="plugins-{type}s-{plugin}-period"] +===== `period` + + * Value type is <> + * There is no default value for this setting. + +This takes a string in the form of `1h`, `1m`, to denote a time interval. `period` jobs try hard to trigger following the frequency they were scheduled with. + +There is no schedule by default. If no scheduling statement is given, then the statement is run exactly once. + [id="plugins-{type}s-{plugin}-prepared_statement_bind_values"] ===== `prepared_statement_bind_values` @@ -568,11 +602,9 @@ Whether to save state or not in <> * There is no default value for this setting. -Schedule of when to periodically run statement, in Cron format -for example: "* * * * *" (execute query every minute, on the minute) +Schedule of when to periodically run statement, in Cron format for example: "* * * * *" (execute query every minute, on the minute) -There is no schedule by default. If no schedule is given, then the statement is run -exactly once. +There is no schedule by default. If no scheduling statement is given, then the statement is run exactly once. [id="plugins-{type}s-{plugin}-sequel_opts"] ===== `sequel_opts` diff --git a/lib/logstash/inputs/jdbc.rb b/lib/logstash/inputs/jdbc.rb index 7de6ace..3cf93dc 100755 --- a/lib/logstash/inputs/jdbc.rb +++ b/lib/logstash/inputs/jdbc.rb @@ -180,6 +180,20 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base # exactly once. config :schedule, :validate => :string + # Interval of how soon to run statement again after completion + # for example: "1m" (execute again 1 minute after completion) + # + # There is no interval by default. If no interval is given, then the statement is run + # exactly once. + config :interval, :validate => :string + + # Start the job periodically after time elapsed + # for example: "1m" (execute query every minute) + # + # There is no period by default. If no period is given, then the statement is run + # exactly once. + config :period, :validate => :string + # Path to file with last run time. # The default will write file to `/plugins/inputs/jdbc/logstash_jdbc_last_run` # NOTE: it must be a file path and not a directory path @@ -241,6 +255,11 @@ module LogStash module Inputs class Jdbc < LogStash::Inputs::Base def register @logger = self.logger + if [@interval, @schedule, @period].compact.size > 1 + raise LogStash::ConfigurationError.new("Use only one of: interval, period, schedule.") + end + + if @record_last_run if @last_run_metadata_path.nil? logstash_data_path = LogStash::SETTINGS.get_value("path.data") @@ -322,6 +341,12 @@ def run(queue) # scheduler input thread name example: "[my-oracle]|input|jdbc|scheduler" scheduler.cron(@schedule) { execute_query(queue) } scheduler.join + elsif @interval + scheduler.interval(@interval) { execute_query(queue) } + scheduler.join + elsif @period + scheduler.every(@period) { execute_query(queue) } + scheduler.join else execute_query(queue) end diff --git a/spec/inputs/jdbc_spec.rb b/spec/inputs/jdbc_spec.rb index f4a9880..4d08ccd 100755 --- a/spec/inputs/jdbc_spec.rb +++ b/spec/inputs/jdbc_spec.rb @@ -228,6 +228,33 @@ end end + describe "scheduling options" do + let(:settings) { super().merge("statement" => "SELECT :num_param as num_param FROM SYSIBM.SYSDUMMY1") } + scheduling_options = ["interval", "schedule", "period"] + scheduling_options.combination(2).each do |option1, option2| + context "when using '#{option1}' and '#{option2}' at the same time" do + let(:settings) { super().merge(option1 => 'a', option2 => 'b') } + it "raises a configuration error" do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, /Use only one/) + end + end + end + context "when using 'schedule', 'period' and 'interval' at the same time" do + let(:settings) { super().merge("interval" => "a", "period" => "b", "schedule" => "c") } + it "raises a configuration error" do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError, /Use only one/) + end + end + scheduling_options.each do |option| + context "when using only '#{option}'" do + let(:settings) { super().merge(option => "a") } + it "does not raise a configuration error" do + expect { plugin.register }.to_not raise_error + end + end + end + end + context "when scheduling" do let(:settings) { {"statement" => "SELECT 1 as num_param FROM SYSIBM.SYSDUMMY1", "schedule" => "* * * * * UTC"} }