Skip to content

Commit 9761c89

Browse files
committed
[MK]fixed timeout eating log messages
1 parent bf2a1f6 commit 9761c89

File tree

2 files changed

+60
-8
lines changed

2 files changed

+60
-8
lines changed

lib/fluent/plugin/filter_concat.rb

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,10 @@ class TimeoutError < StandardError
5050

5151
def initialize
5252
super
53-
5453
@buffer = Hash.new {|h, k| h[k] = [] }
5554
@timeout_map_mutex = Thread::Mutex.new
5655
@timeout_map_mutex.synchronize do
57-
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
56+
@timeout_map = Hash.new {|h, k| h[k] = time_event_now }
5857
end
5958
end
6059

@@ -241,7 +240,7 @@ def process(tag, time, record)
241240
end
242241
end
243242
@timeout_map_mutex.synchronize do
244-
@timeout_map[stream_identity] = Fluent::Engine.now
243+
@timeout_map[stream_identity] = time_event_now
245244
end
246245
case @mode
247246
when :line
@@ -389,7 +388,7 @@ def flush_buffer(stream_identity, new_element = nil)
389388
end
390389

391390
def flush_timeout_buffer
392-
now = Fluent::Engine.now
391+
now =time_event_now
393392
timeout_stream_identities = []
394393
@timeout_map_mutex.synchronize do
395394
@timeout_map.each do |stream_identity, previous_timestamp|
@@ -432,5 +431,11 @@ def handle_timeout_error(tag, time, record, message)
432431
router.emit_error_event(tag, time, record, TimeoutError.new(message))
433432
end
434433
end
434+
435+
def time_event_now
436+
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
437+
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
438+
end
439+
435440
end
436441
end

test/plugin/test_filter_concat.rb

Lines changed: 51 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,14 @@
1-
require "helper"
1+
require "bundler/setup"
2+
require "test-unit"
3+
require "test/unit/rr"
4+
require "fluent/test"
5+
require "fluent/plugin/filter_concat"
6+
27
require "fluent/test/driver/filter"
38
class FilterConcatTest < Test::Unit::TestCase
49
def setup
510
Fluent::Test.setup
6-
@time = Fluent::Engine.now
11+
@time = time_event_now
712
end
813

914
CONFIG = %[
@@ -34,13 +39,19 @@ def filter_with_time(conf, messages, wait: nil)
3439
d.run(default_tag: "test") do
3540
sleep 0.1 # run event loop
3641
messages.each do |time, message|
42+
sleep time-time_event_now
3743
d.feed(time, message)
3844
end
3945
sleep wait if wait
4046
end
4147
d.filtered
4248
end
4349

50+
def time_event_now
51+
now = Process.clock_gettime(Process::CLOCK_REALTIME, :nanosecond)
52+
Fluent::EventTime.new(now / 1_000_000_000, now % 1_000_000_000)
53+
end
54+
4455
sub_test_case "config" do
4556
test "empty" do
4657
assert_raise(Fluent::ConfigError.new("'key' parameter is required")) do
@@ -242,6 +253,40 @@ def filter_with_time(conf, messages, wait: nil)
242253
assert_equal([], filtered)
243254
end
244255

256+
test "timeout with timeout_label for multiline start regex" do
257+
config = <<-CONFIG
258+
key message
259+
multiline_start_regexp /^start/
260+
flush_interval 1s
261+
timeout_label @TIMEOUT
262+
CONFIG
263+
wait = 8
264+
delay_message_4_to_5 = 3
265+
delay_message_5_to_6 = 1
266+
267+
messages = [
268+
[@time, { "container_id" => "1", "message" => "start" }],
269+
[@time, { "container_id" => "1", "message" => " message 1" }],
270+
[@time, { "container_id" => "1", "message" => " message 2" }],
271+
[@time, { "container_id" => "1", "message" => "starting" }],
272+
[@time + delay_message_4_to_5, { "container_id" => "1", "message" => " message 3" }],
273+
[@time + delay_message_4_to_5 + delay_message_5_to_6 , { "container_id" => "1", "message" => " message 4" }],
274+
]
275+
276+
filtered = filter_with_time(config, messages, wait: wait) do |d|
277+
errored = { "container_id" => "1", "message" => "starting" }
278+
event_router = mock(Object.new).emit("test", anything, errored)
279+
mock(Fluent::Test::Driver::TestEventRouter).new(anything) { event_router }
280+
stub.proxy(d.instance).flush_timeout_buffer.times(wait + delay_message_4_to_5 + delay_message_5_to_6)
281+
end
282+
expected = [
283+
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }],
284+
[@time + 3, { "container_id" => "1", "message" => " message 3" }],
285+
[@time + 4, { "container_id" => "1", "message" => " message 4" }],
286+
]
287+
assert_equal(expected, filtered)
288+
end
289+
245290
test "no timeout" do
246291
messages = [
247292
{ "container_id" => "1", "message" => "message 1" },
@@ -975,11 +1020,13 @@ def filter_with_time(conf, messages, wait: nil)
9751020
[@time + 2, { "container_id" => "1", "message" => " message 4" }],
9761021
]
9771022
filtered = filter_with_time(config, messages, wait: 3) do |d|
978-
errored = { "container_id" => "1", "message" => "start\n message 3\n message 4" }
1023+
errored = { "container_id" => "1", "message" => "start" }
9791024
mock(d.instance.router).emit_error_event("test", @time, errored, anything)
9801025
end
9811026
expected = [
982-
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }]
1027+
[@time, { "container_id" => "1", "message" => "start\n message 1\n message 2" }],
1028+
[@time + 1, { "container_id" => "1", "message" => " message 3" }],
1029+
[@time + 2, { "container_id" => "1", "message" => " message 4" }],
9831030
]
9841031
assert_equal(expected, filtered)
9851032
end

0 commit comments

Comments
 (0)