diff --git a/lib/fluent/plugin/in_tail.rb b/lib/fluent/plugin/in_tail.rb index be9f60972e..ba5a5439b0 100644 --- a/lib/fluent/plugin/in_tail.rb +++ b/lib/fluent/plugin/in_tail.rb @@ -96,9 +96,9 @@ def initialize config_param :enable_watch_timer, :bool, default: true desc 'Enable the stat watcher based on inotify.' config_param :enable_stat_watcher, :bool, default: true - desc 'The encoding after conversion of the input.' - config_param :encoding, :string, default: nil desc 'The encoding of the input.' + config_param :encoding, :string, default: nil + desc "The original encoding of the input. If set, in_tail tries to encode string from this to 'encoding'. Must be set with 'encoding'. " config_param :from_encoding, :string, default: nil desc 'Add the log path being tailed to records. Specify the field name to be used.' config_param :path_key, :string, default: nil @@ -828,17 +828,27 @@ def statistics private def io_handler(watcher, path) - TailWatcher::IOHandler.new( - watcher, + opts = { path: path, log: log, read_lines_limit: @read_lines_limit, read_bytes_limit_per_second: @read_bytes_limit_per_second, open_on_every_update: @open_on_every_update, - from_encoding: @from_encoding, - encoding: @encoding, metrics: @metrics, max_line_size: @max_line_size, + } + unless @encoding.nil? + if @from_encoding.nil? + opts[:encoding] = @encoding + else + opts[:encoding] = @from_encoding + opts[:encoding_to_convert] = @encoding + end + end + + TailWatcher::IOHandler.new( + watcher, + **opts, &method(:receive_lines) ) end @@ -1031,46 +1041,30 @@ def swap_state(pe) end class FIFO - def initialize(from_encoding, encoding, log, max_line_size=nil) - @from_encoding = from_encoding - @encoding = encoding - @need_enc = from_encoding != encoding - @buffer = ''.force_encoding(from_encoding) - @eol = "\n".encode(from_encoding).freeze + def initialize(encoding, log, max_line_size=nil, encoding_to_convert=nil) + @buffer = ''.force_encoding(encoding) + @eol = "\n".encode(encoding).freeze + @encoding_to_convert = encoding_to_convert @max_line_size = max_line_size @skip_current_line = false @skipping_current_line_bytesize = 0 @log = log end - attr_reader :from_encoding, :encoding, :buffer, :max_line_size + attr_reader :buffer, :max_line_size def <<(chunk) - # Although "chunk" is most likely transient besides String#force_encoding itself - # won't affect the actual content of it, it is also probable that "chunk" is - # a reused buffer and changing its encoding causes some problems on the caller side. - # - # Actually, the caller here is specific and "chunk" comes from IO#partial with - # the second argument, which the function always returns as a return value. - # - # Feeding a string that has its encoding attribute set to any double-byte or - # quad-byte encoding to IO#readpartial as the second arguments results in an - # assertion failure on Ruby < 2.4.0 for unknown reasons. - orig_encoding = chunk.encoding - chunk.force_encoding(from_encoding) @buffer << chunk - # Thus the encoding needs to be reverted back here - chunk.force_encoding(orig_encoding) end def convert(s) - if @need_enc - s.encode!(@encoding, @from_encoding) + if @encoding_to_convert + s.encode!(@encoding_to_convert) else s end rescue - s.encode!(@encoding, @from_encoding, :invalid => :replace, :undef => :replace) + s.encode!(@encoding_to_convert, :invalid => :replace, :undef => :replace) end def read_lines(lines) @@ -1136,14 +1130,15 @@ class IOHandler attr_accessor :shutdown_timeout - def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, from_encoding: nil, encoding: nil, metrics:, &receive_lines) + def initialize(watcher, path:, read_lines_limit:, read_bytes_limit_per_second:, max_line_size: nil, log:, open_on_every_update:, encoding: Encoding::ASCII_8BIT, encoding_to_convert: nil, metrics:, &receive_lines) @watcher = watcher @path = path @read_lines_limit = read_lines_limit @read_bytes_limit_per_second = read_bytes_limit_per_second @receive_lines = receive_lines @open_on_every_update = open_on_every_update - @fifo = FIFO.new(from_encoding || Encoding::ASCII_8BIT, encoding || Encoding::ASCII_8BIT, log, max_line_size) + @encoding = encoding + @fifo = FIFO.new(encoding, log, max_line_size, encoding_to_convert) @lines = [] @io = nil @notify_mutex = Mutex.new @@ -1225,7 +1220,7 @@ def handle_notify end with_io do |io| - iobuf = ''.force_encoding('ASCII-8BIT') + iobuf = ''.force_encoding(@encoding) begin read_more = false has_skipped_line = false diff --git a/test/plugin/in_tail/test_fifo.rb b/test/plugin/in_tail/test_fifo.rb index 41bfbb02f5..ca66bb3160 100644 --- a/test/plugin/in_tail/test_fifo.rb +++ b/test/plugin/in_tail/test_fifo.rb @@ -5,7 +5,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#read_line' do test 'returns lines splitting per `\n`' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -15,7 +15,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'concat line when line is separated' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) text = ("test\n" * 3 + 'test').force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -30,7 +30,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'returns lines which convert encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::UTF_8, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, nil, Encoding::UTF_8) text = ("test\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -40,7 +40,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'reads lines as from_encoding' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -51,7 +51,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case 'when it includes multi byte chars' do test 'handles it as ascii_8bit' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) text = ("てすと\n" * 3).force_encoding(Encoding::ASCII_8BIT) fifo << text lines = [] @@ -61,7 +61,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'replaces character with ? when convert error happens' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::UTF_8, $log, nil, Encoding::ASCII_8BIT) text = ("てすと\n" * 3).force_encoding(Encoding::UTF_8) fifo << text lines = [] @@ -72,7 +72,7 @@ class IntailFIFO < Test::Unit::TestCase end test 'returns nothing when buffer is empty' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) lines = [] fifo.read_lines(lines) assert_equal [], lines @@ -117,7 +117,7 @@ class IntailFIFO < Test::Unit::TestCase ]) test 'return lines only that size is less than or equal to max_line_size' do |(input_texts, expected)| max_line_size = 5 - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size) lines = [] input_texts.each do |text| @@ -133,7 +133,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#<<' do test 'does not make any change about encoding to an argument' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) text = ("test\n" * 3).force_encoding(Encoding::UTF_8) assert_equal Encoding::UTF_8, text.encoding @@ -144,7 +144,7 @@ class IntailFIFO < Test::Unit::TestCase sub_test_case '#reading_bytesize' do test 'returns buffer size' do - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log) text = "test\n" * 3 + 'test' fifo << text @@ -163,7 +163,7 @@ class IntailFIFO < Test::Unit::TestCase test 'returns the entire line size even if the size is over max_line_size' do max_line_size = 20 - fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, Encoding::ASCII_8BIT, $log, max_line_size) + fifo = Fluent::Plugin::TailInput::TailWatcher::FIFO.new(Encoding::ASCII_8BIT, $log, max_line_size) lines = [] text = "long line still not having EOL" diff --git a/test/plugin/test_in_tail.rb b/test/plugin/test_in_tail.rb index e6993fbf89..231f663206 100644 --- a/test/plugin/test_in_tail.rb +++ b/test/plugin/test_in_tail.rb @@ -1223,6 +1223,22 @@ def test_encoding_with_bad_character assert_equal(Encoding::UTF_8, events[0][2]['message'].encoding) end + def test_encoding_for_regular_expression_parsing + conf = CONFIG_READ_FROM_HEAD + + config_element("", "" , { "encoding" => "utf-8" }, + [config_element("parse", "", { "@type" => "/^あ(?.*)お$/" })]) + + d = create_driver(conf) + d.run(expect_emits: 1, timeout: 5) do + Fluent::FileWrapper.open("#{@tmp_dir}/tail.txt", "wb") { |f| + f.puts "あいうえお" + } + end + events = d.events + assert_equal(true, events.length > 0) + assert_equal({"name" => "いうえ"}, events[0][2]) + end + sub_test_case "multiline" do data(flat: MULTILINE_CONFIG, parse: PARSE_MULTILINE_CONFIG)