Skip to content

Commit 6e20c0f

Browse files
committed
Add buffer_size_limit and buffer_overflow_method
See test code for more details. Signed-off-by: Kenji Okimoto <[email protected]>
1 parent d134fdf commit 6e20c0f

File tree

2 files changed

+163
-2
lines changed

2 files changed

+163
-2
lines changed

lib/fluent/plugin/filter_concat.rb

Lines changed: 45 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ class ConcatFilter < Filter
3232
config_param :partial_value, :string, default: nil
3333
desc "If true, keep partial_key in concatenated records"
3434
config_param :keep_partial_key, :bool, default: false
35+
desc ""
36+
config_param :buffer_size_limit, :size, default: 500 * 1024 # 500k
37+
desc ""
38+
config_param :buffer_overflow_method, :enum, list: [:ignore, :truncate, :drop, :new], default: :ignore
3539

3640
class TimeoutError < StandardError
3741
end
@@ -40,6 +44,7 @@ def initialize
4044
super
4145

4246
@buffer = Hash.new {|h, k| h[k] = [] }
47+
@buffer_size = Hash.new(0)
4348
@timeout_map_mutex = Thread::Mutex.new
4449
@timeout_map_mutex.synchronize do
4550
@timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now }
@@ -167,13 +172,39 @@ def process_line(stream_identity, tag, time, record)
167172

168173
def process_partial(stream_identity, tag, time, record)
169174
new_es = Fluent::MultiEventStream.new
170-
@buffer[stream_identity] << [tag, time, record]
171-
unless @partial_value == record[@partial_key]
175+
force_flush = false
176+
if overflow?(stream_identity, record)
177+
force_flush = case @buffer_overflow_method
178+
when :ignore
179+
@buffer[stream_identity] << [tag, time, record]
180+
false
181+
when :truncate
182+
true
183+
when :drop
184+
@buffer[stream_identity] = []
185+
false
186+
when :new
187+
true
188+
end
189+
else
190+
@buffer[stream_identity] << [tag, time, record]
191+
end
192+
if force_flush || @partial_value != record[@partial_key]
172193
new_time, new_record = flush_buffer(stream_identity)
173194
time = new_time if @use_first_timestamp
174195
new_record.delete(@partial_key)
175196
new_es.add(time, new_record)
176197
end
198+
if force_flush && @buffer_overflow_method == :new
199+
@buffer[stream_identity] << [tag, time, record]
200+
@buffer_size[stream_identity] = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
201+
if @partial_value != record[@partial_key]
202+
new_time, new_record = flush_buffer(stream_identity)
203+
time = new_time if @use_first_timestamp
204+
new_record.delete(@partial_key)
205+
new_es.add(time, new_record)
206+
end
207+
end
177208
new_es
178209
end
179210

@@ -244,6 +275,17 @@ def continuous_line?(text)
244275
end
245276
end
246277

278+
def overflow?(stream_identity, record)
279+
size = record.keys.sum(&:bytesize) + record.values.sum(&:bytesize)
280+
if @buffer_size[stream_identity] + size > @buffer_size_limit
281+
@buffer_size[stream_identity] = 0
282+
true
283+
else
284+
@buffer_size[stream_identity] += size
285+
false
286+
end
287+
end
288+
247289
def flush_buffer(stream_identity, new_element = nil)
248290
lines = @buffer[stream_identity].map {|_tag, _time, record| record[@key] }
249291
_tag, time, first_record = @buffer[stream_identity].first
@@ -252,6 +294,7 @@ def flush_buffer(stream_identity, new_element = nil)
252294
}
253295
@buffer[stream_identity] = []
254296
@buffer[stream_identity] << new_element if new_element
297+
@buffer_size[stream_identity] = 0
255298
[time, first_record.merge(new_record)]
256299
end
257300

test/plugin/test_filter_concat.rb

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -733,4 +733,122 @@ def filter_with_time(conf, messages, wait: nil)
733733
assert_equal(expected, filtered)
734734
end
735735
end
736+
737+
sub_test_case "overflow" do
738+
test "ignore" do
739+
config = <<-CONFIG
740+
key message
741+
partial_key partial_message
742+
partial_value true
743+
keep_partial_key false
744+
buffer_size_limit 10
745+
buffer_overflow_method ignore
746+
CONFIG
747+
messages = [
748+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
749+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
750+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
751+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
752+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
753+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
754+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
755+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
756+
]
757+
filtered = filter(config, messages, wait: 3)
758+
expected = [
759+
{ "container_id" => "1", "message" => "start\n message 1\n message 2\nend" },
760+
{ "container_id" => "1", "message" => "start\n message 3\n message 4\nend" },
761+
]
762+
assert_equal(expected, filtered)
763+
end
764+
765+
test "truncate" do
766+
config = <<-CONFIG
767+
key message
768+
partial_key partial_message
769+
partial_value true
770+
keep_partial_key false
771+
buffer_size_limit 60
772+
buffer_overflow_method truncate
773+
CONFIG
774+
messages = [
775+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
776+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
777+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
778+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
779+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
780+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
781+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
782+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
783+
]
784+
filtered = filter(config, messages, wait: 3)
785+
expected = [
786+
{ "container_id" => "1", "message" => "start" },
787+
{ "container_id" => "1", "message" => " message 2" },
788+
{ "container_id" => "1", "message" => "start" },
789+
{ "container_id" => "1", "message" => " message 4" },
790+
]
791+
assert_equal(expected, filtered)
792+
end
793+
794+
test "drop" do
795+
config = <<-CONFIG
796+
key message
797+
partial_key partial_message
798+
partial_value true
799+
keep_partial_key false
800+
buffer_size_limit 100
801+
buffer_overflow_method drop
802+
CONFIG
803+
messages = [
804+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
805+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
806+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
807+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
808+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
809+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
810+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
811+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
812+
]
813+
filtered = filter(config, messages, wait: 3)
814+
expected = [
815+
{ "container_id" => "1", "message" => "end" },
816+
{ "container_id" => "1", "message" => "end" },
817+
]
818+
assert_equal(expected, filtered)
819+
end
820+
821+
test "new" do
822+
config = <<-CONFIG
823+
key message
824+
partial_key partial_message
825+
partial_value true
826+
keep_partial_key false
827+
buffer_size_limit 90
828+
buffer_overflow_method new
829+
CONFIG
830+
messages = [
831+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
832+
{ "container_id" => "1", "message" => " message 1", "partial_message" => "true" },
833+
{ "container_id" => "1", "message" => " message 2", "partial_message" => "true" },
834+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
835+
{ "container_id" => "1", "message" => "start", "partial_message" => "true" },
836+
{ "container_id" => "1", "message" => " message 3", "partial_message" => "true" },
837+
{ "container_id" => "1", "message" => " message 4", "partial_message" => "true" },
838+
{ "container_id" => "1", "message" => "end", "partial_message" => "false" },
839+
]
840+
filtered = filter(config, messages, wait: 3)
841+
expected = [
842+
{ "container_id" => "1", "message" => "start" },
843+
{ "container_id" => "1", "message" => " message 1" },
844+
{ "container_id" => "1", "message" => " message 2" },
845+
{ "container_id" => "1", "message" => "end" },
846+
{ "container_id" => "1", "message" => "start" },
847+
{ "container_id" => "1", "message" => " message 3" },
848+
{ "container_id" => "1", "message" => " message 4" },
849+
{ "container_id" => "1", "message" => "end" },
850+
]
851+
assert_equal(expected, filtered)
852+
end
853+
end
736854
end

0 commit comments

Comments
 (0)