Skip to content

Commit dd37c84

Browse files
committed
Use rufus scheduler to support more flexible scheduling options.
It supports four different types of scheduling - cron, every, in and at. "every" option (e.g. every 5 minutes) offers the same functionality as existing Stud/interval scheduling. Therefore, 'interval' config option is deprecated. 1) bumped to 3.1.0 2) Made the following interface changes to the plugin configs: + deprecated "interval" option + added "schedule" option. e.g schedule => { every => "2m"} 3) If both are specified, raise ConfigurationError. 4) Use existing Stud library if interval option is specified. Use rufus-scheduler if schedule option is specified. 5) The first invoke of "every" schedule type is configured to run immediately to match the semantic of interval option. 5) The first invoke of "every" schedule type in rufus-scheduler(v3.0.9) sometimes does not run immediately, even if :first => :now is specified. i.e sometimes the first run is invoked immediately after schedule is started, but sometimes only after the first interval is elasped. The issue is no longer reproducible in current version (v3.2.2). However, I can't upgrade to v3.2.2 immediately since other logstash plugins are using old version. So, I added a workaround that works for both versions.
1 parent 680430c commit dd37c84

File tree

3 files changed

+229
-11
lines changed

3 files changed

+229
-11
lines changed

lib/logstash/inputs/http_poller.rb

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require "logstash/plugin_mixins/http_client"
55
require "socket" # for Socket.gethostname
66
require "manticore"
7+
require "rufus/scheduler"
78

89
# This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and
910
# send them on their merry way. The idea behind this plugins came from a need to read springboot
@@ -33,7 +34,8 @@
3334
# }
3435
# }
3536
# request_timeout => 60
36-
# interval => 60
37+
# # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
38+
# schedule => { cron => "* * * * * UTC"}
3739
# codec => "json"
3840
# # A hash of request metadata info (timing, response headers, etc.) will be sent here
3941
# metadata_target => "http_poller_metadata"
@@ -85,7 +87,20 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
8587
config :urls, :validate => :hash, :required => true
8688

8789
# How often (in seconds) the urls will be called
88-
config :interval, :validate => :number, :required => true
90+
# DEPRECATED. Use 'schedule' option instead.
91+
# If both interval and schedule options are specified, interval
92+
# option takes higher precedence
93+
config :interval, :validate => :number, :deprecated => true
94+
95+
# Schedule of when to periodically poll from the urls
96+
# Format: A hash with
97+
# + key: "cron" | "every" | "in" | "at"
98+
# + value: string
99+
# Examples:
100+
# a) { "every" => "1h" }
101+
# b) { "cron" => "* * * * * UTC" }
102+
# See: rufus/scheduler for details about different schedule options and value string format
103+
config :schedule, :validate => :hash
89104

90105
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
91106
config :target, :validate => :string
@@ -96,17 +111,19 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
96111
config :metadata_target, :validate => :string, :default => '@metadata'
97112

98113
public
114+
Schedule_types = %w(cron every at in)
99115
def register
100116
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
101117

102118
@logger.info("Registering http_poller Input", :type => @type,
103-
:urls => @urls, :interval => @interval, :timeout => @timeout)
119+
:urls => @urls, :interval => @interval, :schedule => @schedule, :timeout => @timeout)
104120

105121
setup_requests!
106122
end
107123

108124
def stop
109125
Stud.stop!(@interval_thread) if @interval_thread
126+
@scheduler.stop if @scheduler
110127
end
111128

112129
private
@@ -159,13 +176,46 @@ def validate_request!(url_or_spec, request)
159176

160177
public
161178
def run(queue)
179+
#interval or schedule must be provided. Must be exclusively either one. Not neither. Not both.
180+
raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \
181+
unless @interval || @schedule
182+
raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \
183+
if @interval && @schedule
184+
185+
if @interval
186+
setup_interval(queue)
187+
elsif @schedule
188+
setup_schedule(queue)
189+
else
190+
#should not reach here
191+
raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified."
192+
end
193+
end
194+
195+
private
196+
def setup_interval(queue)
162197
@interval_thread = Thread.current
163198
Stud.interval(@interval) do
164199
run_once(queue)
165200
end
166201
end
167202

168-
private
203+
def setup_schedule(queue)
204+
#schedule hash must contain exactly one of the allowed keys
205+
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
206+
"exactly one of the following keys - cron, at, every or in"
207+
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
208+
schedule_type = @schedule.keys.first
209+
schedule_value = @schedule[schedule_type]
210+
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
211+
212+
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
213+
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
214+
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
215+
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
216+
@scheduler.join
217+
end
218+
169219
def run_once(queue)
170220
@requests.each do |name, request|
171221
request_async(queue, name, request)

logstash-input-http_poller.gemspec

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-http_poller'
3-
s.version = '3.0.3'
3+
s.version = '3.1.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Poll HTTP endpoints with Logstash."
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -22,8 +22,10 @@ Gem::Specification.new do |s|
2222
s.add_runtime_dependency 'logstash-codec-plain'
2323
s.add_runtime_dependency 'logstash-mixin-http_client', ">= 2.2.4", "< 5.0.0"
2424
s.add_runtime_dependency 'stud', "~> 0.0.22"
25+
s.add_runtime_dependency 'rufus-scheduler', "~>3.0.9"
2526

2627
s.add_development_dependency 'logstash-codec-json'
2728
s.add_development_dependency 'logstash-devutils'
2829
s.add_development_dependency 'flores'
30+
s.add_development_dependency 'timecop'
2931
end

spec/inputs/http_poller_spec.rb

Lines changed: 172 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
require "logstash/devutils/rspec/spec_helper"
22
require 'logstash/inputs/http_poller'
33
require 'flores/random'
4+
require "timecop"
45

56
describe LogStash::Inputs::HTTP_Poller do
67
let(:metadata_target) { "_http_poller_metadata" }
78
let(:queue) { Queue.new }
8-
let(:default_interval) { 5 }
9+
let(:default_schedule) {
10+
{ "cron" => "* * * * * UTC" }
11+
}
912
let(:default_name) { "url1 " }
1013
let(:default_url) { "http://localhost:1827" }
1114
let(:default_urls) {
@@ -15,7 +18,7 @@
1518
}
1619
let(:default_opts) {
1720
{
18-
"interval" => default_interval,
21+
"schedule" => default_schedule,
1922
"urls" => default_urls,
2023
"codec" => "json",
2124
"metadata_target" => metadata_target
@@ -31,9 +34,13 @@
3134
end
3235

3336
describe "#run" do
34-
it "should run at the specified interval" do
35-
expect(Stud).to receive(:interval).with(default_interval).once
36-
subject.run(double("queue"))
37+
it "should setup a scheduler" do
38+
runner = Thread.new do
39+
subject.run(double("queue"))
40+
expect(subject.instance_variable_get("@scheduler")).to be_a_kind_of(Rufus::Scheduler)
41+
end
42+
runner.kill
43+
runner.join
3744
end
3845
end
3946

@@ -151,6 +158,163 @@
151158
end
152159
end
153160

161+
describe "scheduler configuration" do
162+
context "given an interval" do
163+
let(:opts) {
164+
{
165+
"interval" => 2,
166+
"urls" => default_urls,
167+
"codec" => "json",
168+
"metadata_target" => metadata_target
169+
}
170+
}
171+
it "should run once in each interval" do
172+
instance = klass.new(opts)
173+
instance.register
174+
queue = Queue.new
175+
runner = Thread.new do
176+
instance.run(queue)
177+
end
178+
#T 0123456
179+
#events x x x x
180+
#expects 3 events at T=5
181+
sleep 5
182+
instance.stop
183+
runner.kill
184+
runner.join
185+
expect(queue.size).to eq(3)
186+
end
187+
end
188+
189+
context "given both interval and schedule options" do
190+
let(:opts) {
191+
{
192+
"interval" => 1,
193+
"schedule" => { "every" => "5s" },
194+
"urls" => default_urls,
195+
"codec" => "json",
196+
"metadata_target" => metadata_target
197+
}
198+
}
199+
it "should raise ConfigurationError" do
200+
instance = klass.new(opts)
201+
instance.register
202+
queue = Queue.new
203+
runner = Thread.new do
204+
expect{instance.run(queue)}.to raise_error(LogStash::ConfigurationError)
205+
end
206+
instance.stop
207+
runner.kill
208+
runner.join
209+
end
210+
end
211+
212+
context "given 'cron' expression" do
213+
let(:opts) {
214+
{
215+
"schedule" => { "cron" => "* * * * * UTC" },
216+
"urls" => default_urls,
217+
"codec" => "json",
218+
"metadata_target" => metadata_target
219+
}
220+
}
221+
it "should run at the schedule" do
222+
instance = klass.new(opts)
223+
instance.register
224+
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
225+
Timecop.scale(60)
226+
queue = Queue.new
227+
runner = Thread.new do
228+
instance.run(queue)
229+
end
230+
sleep 3
231+
instance.stop
232+
runner.kill
233+
runner.join
234+
expect(queue.size).to eq(2)
235+
Timecop.return
236+
end
237+
end
238+
239+
context "given 'at' expression" do
240+
let(:opts) {
241+
{
242+
"schedule" => { "at" => "2000-01-01 00:05:00 +0000"},
243+
"urls" => default_urls,
244+
"codec" => "json",
245+
"metadata_target" => metadata_target
246+
}
247+
}
248+
it "should run at the schedule" do
249+
instance = klass.new(opts)
250+
instance.register
251+
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
252+
Timecop.scale(60 * 5)
253+
queue = Queue.new
254+
runner = Thread.new do
255+
instance.run(queue)
256+
end
257+
sleep 2
258+
instance.stop
259+
runner.kill
260+
runner.join
261+
expect(queue.size).to eq(1)
262+
Timecop.return
263+
end
264+
end
265+
266+
context "given 'every' expression" do
267+
let(:opts) {
268+
{
269+
"schedule" => { "every" => "2s"},
270+
"urls" => default_urls,
271+
"codec" => "json",
272+
"metadata_target" => metadata_target
273+
}
274+
}
275+
it "should run at the schedule" do
276+
instance = klass.new(opts)
277+
instance.register
278+
queue = Queue.new
279+
runner = Thread.new do
280+
instance.run(queue)
281+
end
282+
#T 0123456
283+
#events x x x x
284+
#expects 3 events at T=5
285+
sleep 5
286+
instance.stop
287+
runner.kill
288+
runner.join
289+
expect(queue.size).to eq(3)
290+
end
291+
end
292+
293+
context "given 'in' expression" do
294+
let(:opts) {
295+
{
296+
"schedule" => { "in" => "2s"},
297+
"urls" => default_urls,
298+
"codec" => "json",
299+
"metadata_target" => metadata_target
300+
}
301+
}
302+
it "should run at the schedule" do
303+
instance = klass.new(opts)
304+
instance.register
305+
queue = Queue.new
306+
runner = Thread.new do
307+
instance.run(queue)
308+
end
309+
sleep 3
310+
instance.stop
311+
runner.kill
312+
runner.join
313+
expect(queue.size).to eq(1)
314+
end
315+
end
316+
end
317+
154318
describe "events" do
155319
shared_examples("matching metadata") {
156320
let(:metadata) { event.get(metadata_target) }
@@ -292,7 +456,9 @@
292456
}
293457
let(:opts) {
294458
{
295-
"interval" => default_interval,
459+
"schedule" => {
460+
"cron" => "* * * * * UTC"
461+
},
296462
"urls" => {
297463
default_name => url
298464
},

0 commit comments

Comments
 (0)