Skip to content

Commit 0381afc

Browse files
committed
Add timeout parameter and error handling
1 parent 5ed4a58 commit 0381afc

File tree

5 files changed

+100
-18
lines changed

5 files changed

+100
-18
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ Path to `obsolete-plugins.yml`. This parameter is deprecated. Please use `plugin
4343

4444
Default value: `nil`
4545

46+
### timeout (integer) (optional)
47+
48+
Timeout to read data of obsolete plugins.
49+
If it occurs timeout, it just skips to detect obsolete plugins.
50+
51+
Default value: `15`
52+
4653
### raise_error (bool) (optional)
4754

4855
Raise error if obsolete plugins are detected

lib/fluent/plugin/filter_obsolete_plugins.rb

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,37 @@ module Plugin
2121
class ObsoletePluginsFilter < Fluent::Plugin::Filter
2222
Fluent::Plugin.register_filter("obsolete_plugins", self)
2323

24+
helpers :thread
25+
2426
PLUGINS_JSON_URL = "https://raw.githubusercontent.com/fluent/fluentd-website/master/scripts/plugins.json"
2527

2628
desc "Path to obsolete-plugins.yml"
2729
config_param :obsolete_plugins_yml, :string, default: nil, deprecated: "use plugins_json parameter instead"
2830
desc "Path to plugins.json"
2931
config_param :plugins_json, :string, default: PLUGINS_JSON_URL
32+
desc "Timeout value to read data of obsolete plugins"
33+
config_param :timeout, :integer, default: 15
3034
desc "Raise error if obsolete plugins are detected"
3135
config_param :raise_error, :bool, default: false
3236

3337
def configure(conf)
3438
super
3539

36-
obsolete_plugins =
37-
if @obsolete_plugins_yml
38-
ObsoletePluginsUtils.obsolete_plugins_from_yaml(@obsolete_plugins_yml)
39-
else
40-
ObsoletePluginsUtils.obsolete_plugins_from_json(@plugins_json)
41-
end
42-
43-
ObsoletePluginsUtils.notify(log, obsolete_plugins, raise_error: @raise_error)
40+
thread_create(:filter_obsolete_plugins) do
41+
obsolete_plugins =
42+
if @obsolete_plugins_yml
43+
ObsoletePluginsUtils.obsolete_plugins_from_yaml(@obsolete_plugins_yml, timeout: @timeout)
44+
else
45+
ObsoletePluginsUtils.obsolete_plugins_from_json(@plugins_json, timeout: @timeout)
46+
end
47+
48+
ObsoletePluginsUtils.notify(log, obsolete_plugins, raise_error: @raise_error)
49+
rescue Fluent::ConfigError => e
50+
log.error e
51+
raise
52+
rescue
53+
# ignore other exception
54+
end
4455
end
4556

4657
def filter(tag, time, record)

lib/fluent/plugin/obsolete_plugins_utils.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,25 @@
22
require "open-uri"
33
require "yaml"
44
require "json"
5+
require "timeout"
56

67
class Fluent::Plugin::ObsoletePluginsUtils
7-
def self.obsolete_plugins_from_yaml(url)
8-
URI.open(url) do |io|
9-
YAML.safe_load(io.read)
8+
def self.obsolete_plugins_from_yaml(url, timeout: 15)
9+
Timeout.timeout(timeout) do
10+
URI.open(url) do |io|
11+
YAML.safe_load(io.read)
12+
end
1013
end
1114
end
1215

13-
def self.obsolete_plugins_from_json(url)
14-
plugins = URI.open(url) do |io|
15-
# io.read causes Encoding::UndefinedConversionError with UTF-8 data when Ruby is started with "-Eascii-8bit:ascii-8bit".
16-
# It set the proper encoding to avoid the error.
17-
io.set_encoding("UTF-8", "UTF-8")
18-
JSON.parse(io.read)
16+
def self.obsolete_plugins_from_json(url, timeout: 15)
17+
plugins = Timeout.timeout(timeout) do
18+
URI.open(url) do |io|
19+
# io.read causes Encoding::UndefinedConversionError with UTF-8 data when Ruby is started with "-Eascii-8bit:ascii-8bit".
20+
# It set the proper encoding to avoid the error.
21+
io.set_encoding("UTF-8", "UTF-8")
22+
JSON.parse(io.read)
23+
end
1924
end
2025
plugins.select { |plugin| plugin["obsolete"] }.reduce({}) do |result, plugin|
2126
result[plugin["name"]] = plugin["note"]

test/fixtures/invalid.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[
2+
{
3+
"obsolete": null,
4+
"note": null,
5+
"name": "fluent-plugin-s3",
6+
"info": "Amazon S3 output plugin for Fluentd event collector",
7+
"authors": "Sadayuki Furuhashi, Masahiro Nakagawa",
8+
"version": "1.8.3",
9+
"downloads": 134522742,
10+
"homepage_uri": "https://github.com/fluent/fluent-plugin-s3",
11+
"source_code_uri": null

test/plugin/test_filter_obsolete_plugins.rb

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require "helper"
2-
require "fluent/plugin/filter_obsolete_plugins.rb"
2+
require "fluent/plugin/filter_obsolete_plugins"
33

44
class ObsoletePluginsFilterTest < Test::Unit::TestCase
55

@@ -61,6 +61,7 @@ class ObsoletePluginsFilterTest < Test::Unit::TestCase
6161

6262
ex = assert_raise(Fluent::ConfigError) do
6363
create_driver(CONFIG_YAML + "raise_error yes")
64+
sleep 0.5
6465
end
6566
assert_equal("Detected obsolete plugins", ex.message)
6667
end
@@ -109,14 +110,61 @@ class ObsoletePluginsFilterTest < Test::Unit::TestCase
109110

110111
ex = assert_raise(Fluent::ConfigError) do
111112
create_driver(CONFIG_JSON + "raise_error yes")
113+
sleep 0.5
112114
end
113115
assert_equal("Detected obsolete plugins", ex.message)
114116
end
115117
end
116118

119+
sub_test_case "error handling" do
120+
test "ignore error with invalid json" do
121+
assert_nothing_raised {
122+
create_driver("plugins_json #{fixture_path('invalid.json')}")
123+
sleep 0.5
124+
}
125+
end
126+
127+
test "timeout with slow server and skip detecting obsolete plugins" do
128+
server = create_slow_webserver(port: 12345)
129+
130+
mock(Fluent::Plugin::ObsoletePluginsUtils).notify.never
131+
132+
d = create_driver(%[
133+
plugins_json http://localhost:12345/plugins.json
134+
timeout 1
135+
])
136+
137+
d.run(default_tag: "test") do
138+
d.feed({ message: "This is test message." })
139+
end
140+
assert_equal([{ message: "This is test message." }], d.filtered_records)
141+
142+
sleep 2
143+
144+
assert_equal([], d.logs)
145+
ensure
146+
server.shutdown
147+
end
148+
149+
end
150+
117151
private
118152

119153
def create_driver(conf)
120154
Fluent::Test::Driver::Filter.new(Fluent::Plugin::ObsoletePluginsFilter).configure(conf)
121155
end
156+
157+
def create_slow_webserver(port: 12345)
158+
require "webrick"
159+
160+
server = WEBrick::HTTPServer.new(Port: port)
161+
server.mount_proc '/' do |req, res|
162+
sleep 60
163+
164+
res['Content-Type'] = 'application/json'
165+
res.body = File.read(fixture_path("plugins.json"))
166+
end
167+
168+
server
169+
end
122170
end

0 commit comments

Comments
 (0)