Skip to content

Commit 5663f46

Browse files
committed
in_tail: refactor code
Signed-off-by: Daijiro Fukuda <fukuda@clear-code.com>
1 parent f454bbd commit 5663f46

File tree

2 files changed

+39
-45
lines changed

2 files changed

+39
-45
lines changed

lib/fluent/plugin/in_tail.rb

Lines changed: 28 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -96,9 +96,9 @@ def initialize
9696
config_param :enable_watch_timer, :bool, default: true
9797
desc 'Enable the stat watcher based on inotify.'
9898
config_param :enable_stat_watcher, :bool, default: true
99-
desc 'The encoding after conversion of the input.'
100-
config_param :encoding, :string, default: nil
10199
desc 'The encoding of the input.'
100+
config_param :encoding, :string, default: nil
101+
desc "The original encoding of the input. If set, in_tail tries to encode string from this to 'encoding'. Must be set with 'encoding'. "
102102
config_param :from_encoding, :string, default: nil
103103
desc 'Add the log path being tailed to records. Specify the field name to be used.'
104104
config_param :path_key, :string, default: nil
@@ -828,17 +828,27 @@ def statistics
828828
private
829829

830830
def io_handler(watcher, path)
831-
TailWatcher::IOHandler.new(
832-
watcher,
831+
opts = {
833832
path: path,
834833
log: log,
835834
read_lines_limit: @read_lines_limit,
836835
read_bytes_limit_per_second: @read_bytes_limit_per_second,
837836
open_on_every_update: @open_on_every_update,
838-
from_encoding: @from_encoding,
839-
encoding: @encoding,
840837
metrics: @metrics,
841838
max_line_size: @max_line_size,
839+
}
840+
unless @encoding.nil?
841+
if @from_encoding.nil?
842+
opts[:encoding] = @encoding
843+
else
844+
opts[:encoding] = @from_encoding
845+
opts[:encoding_to_convert] = @encoding
846+
end
847+
end
848+
849+
TailWatcher::IOHandler.new(
850+
watcher,
851+
**opts,
842852
&method(:receive_lines)
843853
)
844854
end
@@ -1031,47 +1041,30 @@ def swap_state(pe)
10311041
end
10321042

10331043
class FIFO
1034-
def initialize(from_encoding, encoding, log, max_line_size=nil)
1035-
@from_encoding = from_encoding
1036-
@encoding = encoding
1037-
@encoding_for_appending = from_encoding || encoding || Encoding::ASCII_8BIT
1038-
@need_enc = from_encoding && encoding && (from_encoding != encoding)
1039-
@buffer = ''.force_encoding(@encoding_for_appending)
1040-
@eol = "\n".encode(@encoding_for_appending).freeze
1044+
def initialize(encoding, log, max_line_size=nil, encoding_to_convert=nil)
1045+
@buffer = ''.force_encoding(encoding)
1046+
@eol = "\n".encode(encoding).freeze
1047+
@encoding_to_convert = encoding_to_convert
10411048
@max_line_size = max_line_size
10421049
@skip_current_line = false
10431050
@skipping_current_line_bytesize = 0
10441051
@log = log
10451052
end
10461053

1047-
attr_reader :from_encoding, :encoding, :buffer, :max_line_size
1054+
attr_reader :buffer, :max_line_size
10481055

10491056
def <<(chunk)
1050-
# Although "chunk" is most likely transient besides String#force_encoding itself
1051-
# won't affect the actual content of it, it is also probable that "chunk" is
1052-
# a reused buffer and changing its encoding causes some problems on the caller side.
1053-
#
1054-
# Actually, the caller here is specific and "chunk" comes from IO#partial with
1055-
# the second argument, which the function always returns as a return value.
1056-
#
1057-
# Feeding a string that has its encoding attribute set to any double-byte or
1058-
# quad-byte encoding to IO#readpartial as the second arguments results in an
1059-
# assertion failure on Ruby < 2.4.0 for unknown reasons.
1060-
orig_encoding = chunk.encoding
1061-
chunk.force_encoding(@encoding_for_appending)
10621057
@buffer << chunk
1063-
# Thus the encoding needs to be reverted back here
1064-
chunk.force_encoding(orig_encoding)
10651058
end
10661059

10671060
def convert(s)
1068-
if @need_enc
1069-
s.encode!(@encoding, @from_encoding)
1061+
if @encoding_to_convert
1062+
s.encode!(@encoding_to_convert)
10701063
else
10711064
s
10721065
end
10731066
rescue
1074-
s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace)
1067+
s.encode!(@encoding_to_convert, :invalid => :replace, :undef => :replace)
10751068
end
10761069

10771070
def read_lines(lines)
@@ -1137,14 +1130,15 @@ class IOHandler
11371130

11381131
attr_accessor :shutdown_timeout
11391132

1140-
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines)
1133+
def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, encoding: Encoding::ASCII_8BIT, encoding_to_convert: nil, metrics:, &receive_lines)
11411134
@watcher = watcher
11421135
@path = path
11431136
@read_lines_limit = read_lines_limit
11441137
@read_bytes_limit_per_second = read_bytes_limit_per_second
11451138
@receive_lines = receive_lines
11461139
@open_on_every_update = open_on_every_update
1147-
@fifo = FIFO.new(from_encoding, encoding, log, max_line_size)
1140+
@encoding = encoding
1141+
@fifo = FIFO.new(encoding, log, max_line_size, encoding_to_convert)
11481142
@lines = []
11491143
@io = nil
11501144
@notify_mutex = Mutex.new
@@ -1226,7 +1220,7 @@ def handle_notify
12261220
end
12271221

12281222
with_io do |io|
1229-
iobuf = ''.force_encoding('ASCII-8BIT')
1223+
iobuf = ''.force_encoding(@encoding)
12301224
begin
12311225
read_more = false
12321226
has_skipped_line = false

test/plugin/in_tail/test_fifo.rb

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
class IntailFIFO < Test::Unit::TestCase
66
sub_test_case '#read_line' do
77
test 'returns lines splitting per `\n`' do
8-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
8+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
99
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
1010
fifo << text
1111
lines = []
@@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase
1515
end
1616

1717
test 'concat line when line is separated' do
18-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
18+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
1919
text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT)
2020
fifo << text
2121
lines = []
@@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase
3030
end
3131

3232
test 'returns lines which convert encoding' do
33-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log)
33+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, nil, Encoding::UTF_8)
3434
text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT)
3535
fifo << text
3636
lines = []
@@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase
4040
end
4141

4242
test 'reads lines as from_encoding' do
43-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
43+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
4444
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
4545
fifo << text
4646
lines = []
@@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase
5151

5252
sub_test_case 'when it includes multi byte chars' do
5353
test 'handles it as ascii_8bit' do
54-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
54+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
5555
text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT)
5656
fifo << text
5757
lines = []
@@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase
6161
end
6262

6363
test 'replaces character with ? when convert error happens' do
64-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log)
64+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT)
6565
text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8)
6666
fifo << text
6767
lines = []
@@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase
7272
end
7373

7474
test 'returns nothing when buffer is empty' do
75-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
75+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
7676
lines = []
7777
fifo.read_lines(lines)
7878
assert_equal [], lines
@@ -117,7 +117,7 @@ class IntailFIFO < Test::Unit::TestCase
117117
])
118118
test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)|
119119
max_line_size = 5
120-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
120+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
121121
lines = []
122122

123123
input_texts.each do |text|
@@ -133,7 +133,7 @@ class IntailFIFO < Test::Unit::TestCase
133133

134134
sub_test_case '#<<' do
135135
test 'does not make any change about encoding to an argument' do
136-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
136+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
137137
text = ("test\n" * 3).force_encoding(Encoding::UTF_8)
138138

139139
assert_equal Encoding::UTF_8, text.encoding
@@ -144,7 +144,7 @@ class IntailFIFO < Test::Unit::TestCase
144144

145145
sub_test_case '#reading_bytesize' do
146146
test 'returns buffer size' do
147-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log)
147+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log)
148148
text = "test\n" * 3 + 'test'
149149
fifo << text
150150

@@ -163,7 +163,7 @@ class IntailFIFO < Test::Unit::TestCase
163163

164164
test 'returns the entire line size even if the size is over max_line_size' do
165165
max_line_size = 20
166-
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size)
166+
fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size)
167167
lines = []
168168

169169
text = "long line still not having EOL"

0 commit comments

Comments
 (0)