Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions lib/fluent/plugin/output.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1036,17 +1036,17 @@ def generate_format_proc
# iteration of event stream, and it should be done just once even if total event stream size
# is bigger than chunk_limit_size because of performance.
def handle_stream_with_custom_format(tag, es, enqueue: false)
meta_and_data = {}
meta_and_data = Hash.new { |h, k| h[k] = [] }
records = 0
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= []
res = format(tag, time, record)
if res
meta_and_data[meta] << res
records += 1
end
end
meta_and_data.default_proc = nil
write_guard do
@buffer.write(meta_and_data, enqueue: enqueue)
end
Expand All @@ -1057,14 +1057,14 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)

def handle_stream_with_standard_format(tag, es, enqueue: false)
format_proc = generate_format_proc
meta_and_data = {}
meta_and_data = Hash.new { |h, k| h[k] = MultiEventStream.new }
records = 0
es.each(unpacker: Fluent::MessagePackFactory.thread_local_msgpack_unpacker) do |time, record|
meta = metadata(tag, time, record)
meta_and_data[meta] ||= MultiEventStream.new
meta_and_data[meta].add(time, record)
records += 1
end
meta_and_data.default_proc = nil
write_guard do
@buffer.write(meta_and_data, format: format_proc, enqueue: enqueue)
end
Expand Down
Loading