diff --git a/lib/fluent/plugin/filter_concat.rb b/lib/fluent/plugin/filter_concat.rb index f990aa7..6510ce3 100644 --- a/lib/fluent/plugin/filter_concat.rb +++ b/lib/fluent/plugin/filter_concat.rb @@ -35,6 +35,7 @@ def initialize @buffer = Hash.new {|h, k| h[k] = [] } @timeout_map = Hash.new {|h, k| h[k] = Fluent::Engine.now } + @mutex = Thread::Mutex.new end def configure(conf) @@ -122,9 +123,13 @@ def process(tag, time, record) def process_line(stream_identity, tag, time, record) new_es = Fluent::MultiEventStream.new - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end if @buffer[stream_identity].size >= @n_lines - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end @@ -136,26 +141,36 @@ def process_regexp(stream_identity, tag, time, record) case when firstline?(record[@key]) if @buffer[stream_identity].empty? - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end if lastline?(record[@key]) - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end else - new_time, new_record = flush_buffer(stream_identity, [tag, time, record]) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity, [tag, time, record]) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) if lastline?(record[@key]) - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) end return new_es end when lastline?(record[@key]) - @buffer[stream_identity] << [tag, time, record] - new_time, new_record = flush_buffer(stream_identity) + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + new_time, new_record = flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) return new_es @@ -166,9 +181,13 @@ def process_regexp(stream_identity, tag, time, record) else if continuous_line?(record[@key]) # Continuation of the previous line - @buffer[stream_identity] << [tag, time, record] + @mutex.synchronize do + @buffer[stream_identity] << [tag, time, record] + end else - new_time, new_record = flush_buffer(stream_identity) + new_time, new_record = @mutex.synchronize do + flush_buffer(stream_identity) + end time = new_time if @use_first_timestamp new_es.add(time, new_record) new_es.add(time, record) @@ -211,7 +230,10 @@ def flush_timeout_buffer @timeout_map.each do |stream_identity, previous_timestamp| next if @flush_interval > (now - previous_timestamp) next if @buffer[stream_identity].empty? - time, flushed_record = flush_buffer(stream_identity) + next if @mutex.locked? + time, flushed_record = @mutex.synchronize do + flush_buffer(stream_identity) + end timeout_stream_identities << stream_identity tag = stream_identity.split(":").first message = "Timeout flush: #{stream_identity}"