Skip to content

Commit 608b968

Browse files
authored
Add timeout parameter and error handling (#6)
This PR will fix following problems. ## Problem 1 If the data of obsolete plugins is corrupted, Fluentd will immediately stop by parse error at configuration phase. * Even if an error occurs, Fluentd should not be stopped. ## Problem 2 If data cannot be obtained immediately due to network failures, currently, it blocks the configuration of other plugins. * It should not block the configuration of other plugins with any reason. * After a certain amount of time has elapsed, it should abort to get data.
1 parent 5ed4a58 commit 608b968

File tree

5 files changed

+89
-12
lines changed

5 files changed

+89
-12
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,13 @@ Path to `obsolete-plugins.yml`. This parameter is deprecated. Please use `plugin
4343

4444
Default value: `nil`
4545

46+
### timeout (integer) (optional)
47+
48+
Timeout to read data of obsolete plugins.
49+
If it occurs timeout, it just skips to detect obsolete plugins.
50+
51+
Default value: `5`
52+
4653
### raise_error (bool) (optional)
4754

4855
Raise error if obsolete plugins are detected

lib/fluent/plugin/filter_obsolete_plugins.rb

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ class ObsoletePluginsFilter < Fluent::Plugin::Filter
2727
config_param :obsolete_plugins_yml, :string, default: nil, deprecated: "use plugins_json parameter instead"
2828
desc "Path to plugins.json"
2929
config_param :plugins_json, :string, default: PLUGINS_JSON_URL
30+
desc "Timeout value to read data of obsolete plugins"
31+
config_param :timeout, :integer, default: 5
3032
desc "Raise error if obsolete plugins are detected"
3133
config_param :raise_error, :bool, default: false
3234

@@ -35,12 +37,16 @@ def configure(conf)
3537

3638
obsolete_plugins =
3739
if @obsolete_plugins_yml
38-
ObsoletePluginsUtils.obsolete_plugins_from_yaml(@obsolete_plugins_yml)
40+
ObsoletePluginsUtils.obsolete_plugins_from_yaml(@obsolete_plugins_yml, timeout: @timeout)
3941
else
40-
ObsoletePluginsUtils.obsolete_plugins_from_json(@plugins_json)
42+
ObsoletePluginsUtils.obsolete_plugins_from_json(@plugins_json, timeout: @timeout)
4143
end
4244

4345
ObsoletePluginsUtils.notify(log, obsolete_plugins, raise_error: @raise_error)
46+
rescue Fluent::ConfigError
47+
raise
48+
rescue => e
49+
log.info("Failed to notfify obsolete plugins", error: e)
4450
end
4551

4652
def filter(tag, time, record)

lib/fluent/plugin/obsolete_plugins_utils.rb

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,25 @@
22
require "open-uri"
33
require "yaml"
44
require "json"
5+
require "timeout"
56

67
class Fluent::Plugin::ObsoletePluginsUtils
7-
def self.obsolete_plugins_from_yaml(url)
8-
URI.open(url) do |io|
9-
YAML.safe_load(io.read)
8+
def self.obsolete_plugins_from_yaml(url, timeout: 5)
9+
Timeout.timeout(timeout) do
10+
URI.open(url) do |io|
11+
YAML.safe_load(io.read)
12+
end
1013
end
1114
end
1215

13-
def self.obsolete_plugins_from_json(url)
14-
plugins = URI.open(url) do |io|
15-
# io.read causes Encoding::UndefinedConversionError with UTF-8 data when Ruby is started with "-Eascii-8bit:ascii-8bit".
16-
# It set the proper encoding to avoid the error.
17-
io.set_encoding("UTF-8", "UTF-8")
18-
JSON.parse(io.read)
16+
def self.obsolete_plugins_from_json(url, timeout: 5)
17+
plugins = Timeout.timeout(timeout) do
18+
URI.open(url) do |io|
19+
# io.read causes Encoding::UndefinedConversionError with UTF-8 data when Ruby is started with "-Eascii-8bit:ascii-8bit".
20+
# It set the proper encoding to avoid the error.
21+
io.set_encoding("UTF-8", "UTF-8")
22+
JSON.parse(io.read)
23+
end
1924
end
2025
plugins.select { |plugin| plugin["obsolete"] }.reduce({}) do |result, plugin|
2126
result[plugin["name"]] = plugin["note"]

test/fixtures/invalid.json

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[
2+
{
3+
"obsolete": null,
4+
"note": null,
5+
"name": "fluent-plugin-s3",
6+
"info": "Amazon S3 output plugin for Fluentd event collector",
7+
"authors": "Sadayuki Furuhashi, Masahiro Nakagawa",
8+
"version": "1.8.3",
9+
"downloads": 134522742,
10+
"homepage_uri": "https://github.com/fluent/fluent-plugin-s3",
11+
"source_code_uri": null

test/plugin/test_filter_obsolete_plugins.rb

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
require "helper"
2-
require "fluent/plugin/filter_obsolete_plugins.rb"
2+
require "fluent/plugin/filter_obsolete_plugins"
33

44
class ObsoletePluginsFilterTest < Test::Unit::TestCase
55

@@ -114,9 +114,57 @@ class ObsoletePluginsFilterTest < Test::Unit::TestCase
114114
end
115115
end
116116

117+
sub_test_case "error handling" do
118+
test "invalid json" do
119+
d = create_driver("plugins_json #{fixture_path('invalid.json')}")
120+
121+
expected_logs = [
122+
"#{@time} [info]: Failed to notfify obsolete plugins error_class=JSON::ParserError error=\"expected ',' or '}' after object value, got: EOF at line 11 column 1\"\n",
123+
]
124+
125+
assert_equal(expected_logs, d.logs)
126+
end
127+
128+
test "timeout with slow server" do
129+
server = create_slow_webserver(port: 12345)
130+
131+
mock(Fluent::Plugin::ObsoletePluginsUtils).notify.never
132+
133+
d = create_driver(%[
134+
plugins_json http://localhost:12345/plugins.json
135+
timeout 1
136+
])
137+
138+
sleep 2
139+
140+
expected_logs = [
141+
"#{@time} [info]: Failed to notfify obsolete plugins error_class=Timeout::Error error=\"execution expired\"\n",
142+
]
143+
144+
assert_equal(expected_logs, d.logs)
145+
ensure
146+
server.shutdown
147+
end
148+
149+
end
150+
117151
private
118152

119153
def create_driver(conf)
120154
Fluent::Test::Driver::Filter.new(Fluent::Plugin::ObsoletePluginsFilter).configure(conf)
121155
end
156+
157+
def create_slow_webserver(port: 12345)
158+
require "webrick"
159+
160+
server = WEBrick::HTTPServer.new(Port: port)
161+
server.mount_proc '/' do |req, res|
162+
sleep 60
163+
164+
res['Content-Type'] = 'application/json'
165+
res.body = File.read(fixture_path("plugins.json"))
166+
end
167+
168+
server
169+
end
122170
end

0 commit comments

Comments
 (0)