diff --git a/lib/fluent/plugin/filter_concat.rb b/lib/fluent/plugin/filter_concat.rb index 6905599..c12c524 100644 --- a/lib/fluent/plugin/filter_concat.rb +++ b/lib/fluent/plugin/filter_concat.rb @@ -50,11 +50,10 @@ class TimeoutError < StandardError def initialize super - @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map_mutex = Thread::Mutex.new @timeout_map_mutex.synchronize do - @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } + @timeout_map = Hash.new {|h, k| h[k] = time_event_now } end end @@ -241,7 +240,7 @@ def process(tag, time, record) end end @timeout_map_mutex.synchronize do - @timeout_map[stream_identity] = Fluent::Engine.now + @timeout_map[stream_identity] = time_event_now end case @mode when :line @@ -389,7 +388,7 @@ def flush_buffer(stream_identity, new_element = nil) end def flush_timeout_buffer - now = Fluent::Engine.now + now = time_event_now timeout_stream_identities = [] @timeout_map_mutex.synchronize do @timeout_map.each do |stream_identity, previous_timestamp| @@ -432,5 +431,11 @@ def handle_timeout_error(tag, time, record, message) router.emit_error_event(tag, time, record, TimeoutError.new(message)) end end + + def time_event_now + now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) + Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000) + end + end end diff --git a/test/plugin/test_filter_concat.rb b/test/plugin/test_filter_concat.rb index 0c765e6..e860beb 100644 --- a/test/plugin/test_filter_concat.rb +++ b/test/plugin/test_filter_concat.rb @@ -3,7 +3,7 @@ class FilterConcatTest < Test::Unit::TestCase def setup Fluent::Test.setup - @time = Fluent::Engine.now + @time = time_event_now end CONFIG = %[ @@ -34,6 +34,10 @@ def filter_with_time(conf, messages, wait: nil) d.run(default_tag: "test") do sleep 0.1 # run event loop messages.each do |time, message| + now = time_event_now + if now < time + sleep time-now + end d.feed(time, message) end sleep wait if wait @@ -41,6 +45,11 @@ def filter_with_time(conf, messages, wait: nil) d.filtered end + def time_event_now + now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond) + Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000) + end + sub_test_case "config" do test "empty" do assert_raise(Fluent::ConfigError.new("'key' parameter is required")) do @@ -242,6 +251,43 @@ def filter_with_time(conf, messages, wait: nil) assert_equal([], filtered) end + test "timeout with timeout_label for multiline start regex" do + config = <<-CONFIG + key message + multiline_start_regexp /^start/ + flush_interval 1s + timeout_label @TIMEOUT + CONFIG + wait = 3 + delay_message_4_to_5 = 3 + delay_message_5_to_6 = 1 + + messages = [ + [@time, { "container_id" => "1", "message" => "start" }], + [@time, { "container_id" => "1", "message" => " message 1" }], + [@time, { "container_id" => "1", "message" => " message 2" }], + [@time, { "container_id" => "1", "message" => "starting" }], + [@time + delay_message_4_to_5, { "container_id" => "1", "message" => " message 3" }], + [@time + delay_message_4_to_5 + delay_message_5_to_6 , { "container_id" => "1", "message" => " message 4" }], + ] + + filtered = filter_with_time(config, messages, wait: wait) do |d| + errored = { "container_id" => "1", "message" => "starting" } + event_router = mock(Object.new).emit("test", anything, errored) + mock(Fluent::Test::Driver::TestEventRouter).new(anything) { event_router } + #commented out due to timing inconsistency + #stub.proxy(d.instance).flush_timeout_buffer.times(wait + delay_message_4_to_5 + delay_message_5_to_6) + stub.proxy(d.instance).handle_timeout_error.times(1) + + end + expected = [ + [@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }], + [@time + 3, { "container_id" => "1", "message" => " message 3" }], + [@time + 4, { "container_id" => "1", "message" => " message 4" }], + ] + assert_equal(expected, filtered) + end + test "no timeout" do messages = [ { "container_id" => "1", "message" => "message 1" }, @@ -975,11 +1021,13 @@ def filter_with_time(conf, messages, wait: nil) [@time + 2, { "container_id" => "1", "message" => " message 4" }], ] filtered = filter_with_time(config, messages, wait: 3) do |d| - errored = { "container_id" => "1", "message" => "start\n message 3\n message 4" } + errored = { "container_id" => "1", "message" => "start" } mock(d.instance.router).emit_error_event("test", @time, errored, anything) end expected = [ - [@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }] + [@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }], + [@time + 1, { "container_id" => "1", "message" => " message 3" }], + [@time + 2, { "container_id" => "1", "message" => " message 4" }], ] assert_equal(expected, filtered) end