Skip to content

Commit 7dc2429

Browse files
author
Suyog Rao
authored
Merge pull request #58 from hummingV/rufus-scheduler
#34, use rufus-scheduler for more flexible scheduling
2 parents 82b68c6 + dd37c84 commit 7dc2429

File tree

3 files changed

+229
-11
lines changed

3 files changed

+229
-11
lines changed

lib/logstash/inputs/http_poller.rb

Lines changed: 54 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
require "logstash/plugin_mixins/http_client"
55
require "socket" # for Socket.gethostname
66
require "manticore"
7+
require "rufus/scheduler"
78

89
# This Logstash input plugin allows you to call an HTTP API, decode the output of it into event(s), and
910
# send them on their merry way. The idea behind this plugins came from a need to read springboot
@@ -33,7 +34,8 @@
3334
# }
3435
# }
3536
# request_timeout => 60
36-
# interval => 60
37+
# # Supports "cron", "every", "at" and "in" schedules by rufus scheduler
38+
# schedule => { cron => "* * * * * UTC"}
3739
# codec => "json"
3840
# # A hash of request metadata info (timing, response headers, etc.) will be sent here
3941
# metadata_target => "http_poller_metadata"
@@ -86,7 +88,20 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
8688
config :urls, :validate => :hash, :required => true
8789

8890
# How often (in seconds) the urls will be called
89-
config :interval, :validate => :number, :required => true
91+
# DEPRECATED. Use 'schedule' option instead.
92+
# If both interval and schedule options are specified, interval
93+
# option takes higher precedence
94+
config :interval, :validate => :number, :deprecated => true
95+
96+
# Schedule of when to periodically poll from the urls
97+
# Format: A hash with
98+
# + key: "cron" | "every" | "in" | "at"
99+
# + value: string
100+
# Examples:
101+
# a) { "every" => "1h" }
102+
# b) { "cron" => "* * * * * UTC" }
103+
# See: rufus/scheduler for details about different schedule options and value string format
104+
config :schedule, :validate => :hash
90105

91106
# Define the target field for placing the received data. If this setting is omitted, the data will be stored at the root (top level) of the event.
92107
config :target, :validate => :string
@@ -97,17 +112,19 @@ class LogStash::Inputs::HTTP_Poller < LogStash::Inputs::Base
97112
config :metadata_target, :validate => :string, :default => '@metadata'
98113

99114
public
115+
Schedule_types = %w(cron every at in)
100116
def register
101117
@host = Socket.gethostname.force_encoding(Encoding::UTF_8)
102118

103119
@logger.info("Registering http_poller Input", :type => @type,
104-
:urls => @urls, :interval => @interval, :timeout => @timeout)
120+
:urls => @urls, :interval => @interval, :schedule => @schedule, :timeout => @timeout)
105121

106122
setup_requests!
107123
end
108124

109125
def stop
110126
Stud.stop!(@interval_thread) if @interval_thread
127+
@scheduler.stop if @scheduler
111128
end
112129

113130
private
@@ -160,13 +177,46 @@ def validate_request!(url_or_spec, request)
160177

161178
public
162179
def run(queue)
180+
#interval or schedule must be provided. Must be exclusively either one. Not neither. Not both.
181+
raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified." \
182+
unless @interval || @schedule
183+
raise LogStash::ConfigurationError, "Invalid config. Specify only interval or schedule. Not both." \
184+
if @interval && @schedule
185+
186+
if @interval
187+
setup_interval(queue)
188+
elsif @schedule
189+
setup_schedule(queue)
190+
else
191+
#should not reach here
192+
raise LogStash::ConfigurationError, "Invalid config. Neither interval nor schedule was specified."
193+
end
194+
end
195+
196+
private
197+
def setup_interval(queue)
163198
@interval_thread = Thread.current
164199
Stud.interval(@interval) do
165200
run_once(queue)
166201
end
167202
end
168203

169-
private
204+
def setup_schedule(queue)
205+
#schedule hash must contain exactly one of the allowed keys
206+
msg_invalid_schedule = "Invalid config. schedule hash must contain " +
207+
"exactly one of the following keys - cron, at, every or in"
208+
raise Logstash::ConfigurationError, msg_invalid_schedule if @schedule.keys.length !=1
209+
schedule_type = @schedule.keys.first
210+
schedule_value = @schedule[schedule_type]
211+
raise LogStash::ConfigurationError, msg_invalid_schedule unless Schedule_types.include?(schedule_type)
212+
213+
@scheduler = Rufus::Scheduler.new(:max_work_threads => 1)
214+
#as of v3.0.9, :first_in => :now doesn't work. Use the following workaround instead
215+
opts = schedule_type == "every" ? { :first_in => 0.01 } : {}
216+
@scheduler.send(schedule_type, schedule_value, opts) { run_once(queue) }
217+
@scheduler.join
218+
end
219+
170220
def run_once(queue)
171221
@requests.each do |name, request|
172222
request_async(queue, name, request)

logstash-input-http_poller.gemspec

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-input-http_poller'
3-
s.version = '3.0.3'
3+
s.version = '3.1.0'
44
s.licenses = ['Apache License (2.0)']
55
s.summary = "Poll HTTP endpoints with Logstash."
66
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
@@ -22,8 +22,10 @@ Gem::Specification.new do |s|
2222
s.add_runtime_dependency 'logstash-codec-plain'
2323
s.add_runtime_dependency 'logstash-mixin-http_client', ">= 2.2.4", "< 5.0.0"
2424
s.add_runtime_dependency 'stud', "~> 0.0.22"
25+
s.add_runtime_dependency 'rufus-scheduler', "~>3.0.9"
2526

2627
s.add_development_dependency 'logstash-codec-json'
2728
s.add_development_dependency 'logstash-devutils'
2829
s.add_development_dependency 'flores'
30+
s.add_development_dependency 'timecop'
2931
end

spec/inputs/http_poller_spec.rb

Lines changed: 172 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
require "logstash/devutils/rspec/spec_helper"
22
require 'logstash/inputs/http_poller'
33
require 'flores/random'
4+
require "timecop"
45

56
describe LogStash::Inputs::HTTP_Poller do
67
let(:metadata_target) { "_http_poller_metadata" }
78
let(:queue) { Queue.new }
8-
let(:default_interval) { 5 }
9+
let(:default_schedule) {
10+
{ "cron" => "* * * * * UTC" }
11+
}
912
let(:default_name) { "url1 " }
1013
let(:default_url) { "http://localhost:1827" }
1114
let(:default_urls) {
@@ -15,7 +18,7 @@
1518
}
1619
let(:default_opts) {
1720
{
18-
"interval" => default_interval,
21+
"schedule" => default_schedule,
1922
"urls" => default_urls,
2023
"codec" => "json",
2124
"metadata_target" => metadata_target
@@ -31,9 +34,13 @@
3134
end
3235

3336
describe "#run" do
34-
it "should run at the specified interval" do
35-
expect(Stud).to receive(:interval).with(default_interval).once
36-
subject.run(double("queue"))
37+
it "should setup a scheduler" do
38+
runner = Thread.new do
39+
subject.run(double("queue"))
40+
expect(subject.instance_variable_get("@scheduler")).to be_a_kind_of(Rufus::Scheduler)
41+
end
42+
runner.kill
43+
runner.join
3744
end
3845
end
3946

@@ -151,6 +158,163 @@
151158
end
152159
end
153160

161+
describe "scheduler configuration" do
162+
context "given an interval" do
163+
let(:opts) {
164+
{
165+
"interval" => 2,
166+
"urls" => default_urls,
167+
"codec" => "json",
168+
"metadata_target" => metadata_target
169+
}
170+
}
171+
it "should run once in each interval" do
172+
instance = klass.new(opts)
173+
instance.register
174+
queue = Queue.new
175+
runner = Thread.new do
176+
instance.run(queue)
177+
end
178+
#T 0123456
179+
#events x x x x
180+
#expects 3 events at T=5
181+
sleep 5
182+
instance.stop
183+
runner.kill
184+
runner.join
185+
expect(queue.size).to eq(3)
186+
end
187+
end
188+
189+
context "given both interval and schedule options" do
190+
let(:opts) {
191+
{
192+
"interval" => 1,
193+
"schedule" => { "every" => "5s" },
194+
"urls" => default_urls,
195+
"codec" => "json",
196+
"metadata_target" => metadata_target
197+
}
198+
}
199+
it "should raise ConfigurationError" do
200+
instance = klass.new(opts)
201+
instance.register
202+
queue = Queue.new
203+
runner = Thread.new do
204+
expect{instance.run(queue)}.to raise_error(LogStash::ConfigurationError)
205+
end
206+
instance.stop
207+
runner.kill
208+
runner.join
209+
end
210+
end
211+
212+
context "given 'cron' expression" do
213+
let(:opts) {
214+
{
215+
"schedule" => { "cron" => "* * * * * UTC" },
216+
"urls" => default_urls,
217+
"codec" => "json",
218+
"metadata_target" => metadata_target
219+
}
220+
}
221+
it "should run at the schedule" do
222+
instance = klass.new(opts)
223+
instance.register
224+
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
225+
Timecop.scale(60)
226+
queue = Queue.new
227+
runner = Thread.new do
228+
instance.run(queue)
229+
end
230+
sleep 3
231+
instance.stop
232+
runner.kill
233+
runner.join
234+
expect(queue.size).to eq(2)
235+
Timecop.return
236+
end
237+
end
238+
239+
context "given 'at' expression" do
240+
let(:opts) {
241+
{
242+
"schedule" => { "at" => "2000-01-01 00:05:00 +0000"},
243+
"urls" => default_urls,
244+
"codec" => "json",
245+
"metadata_target" => metadata_target
246+
}
247+
}
248+
it "should run at the schedule" do
249+
instance = klass.new(opts)
250+
instance.register
251+
Timecop.travel(Time.new(2000,1,1,0,0,0,'+00:00'))
252+
Timecop.scale(60 * 5)
253+
queue = Queue.new
254+
runner = Thread.new do
255+
instance.run(queue)
256+
end
257+
sleep 2
258+
instance.stop
259+
runner.kill
260+
runner.join
261+
expect(queue.size).to eq(1)
262+
Timecop.return
263+
end
264+
end
265+
266+
context "given 'every' expression" do
267+
let(:opts) {
268+
{
269+
"schedule" => { "every" => "2s"},
270+
"urls" => default_urls,
271+
"codec" => "json",
272+
"metadata_target" => metadata_target
273+
}
274+
}
275+
it "should run at the schedule" do
276+
instance = klass.new(opts)
277+
instance.register
278+
queue = Queue.new
279+
runner = Thread.new do
280+
instance.run(queue)
281+
end
282+
#T 0123456
283+
#events x x x x
284+
#expects 3 events at T=5
285+
sleep 5
286+
instance.stop
287+
runner.kill
288+
runner.join
289+
expect(queue.size).to eq(3)
290+
end
291+
end
292+
293+
context "given 'in' expression" do
294+
let(:opts) {
295+
{
296+
"schedule" => { "in" => "2s"},
297+
"urls" => default_urls,
298+
"codec" => "json",
299+
"metadata_target" => metadata_target
300+
}
301+
}
302+
it "should run at the schedule" do
303+
instance = klass.new(opts)
304+
instance.register
305+
queue = Queue.new
306+
runner = Thread.new do
307+
instance.run(queue)
308+
end
309+
sleep 3
310+
instance.stop
311+
runner.kill
312+
runner.join
313+
expect(queue.size).to eq(1)
314+
end
315+
end
316+
end
317+
154318
describe "events" do
155319
shared_examples("matching metadata") {
156320
let(:metadata) { event.get(metadata_target) }
@@ -292,7 +456,9 @@
292456
}
293457
let(:opts) {
294458
{
295-
"interval" => default_interval,
459+
"schedule" => {
460+
"cron" => "* * * * * UTC"
461+
},
296462
"urls" => {
297463
default_name => url
298464
},

0 commit comments

Comments
 (0)