@@ -1036,17 +1036,17 @@ def generate_format_proc
10361036 # iteration of event stream, and it should be done just once even if total event stream size
10371037 # is bigger than chunk_limit_size because of performance.
10381038 def handle_stream_with_custom_format ( tag , es , enqueue : false )
1039- meta_and_data = { }
1039+ meta_and_data = Hash . new { | h , k | h [ k ] = [ ] }
10401040 records = 0
10411041 es . each ( unpacker : Fluent ::MessagePackFactory . thread_local_msgpack_unpacker ) do |time , record |
10421042 meta = metadata ( tag , time , record )
1043- meta_and_data [ meta ] ||= [ ]
10441043 res = format ( tag , time , record )
10451044 if res
10461045 meta_and_data [ meta ] << res
10471046 records += 1
10481047 end
10491048 end
1049+ meta_and_data . default_proc = nil
10501050 write_guard do
10511051 @buffer . write ( meta_and_data , enqueue : enqueue )
10521052 end
@@ -1057,14 +1057,14 @@ def handle_stream_with_custom_format(tag, es, enqueue: false)
10571057
10581058 def handle_stream_with_standard_format ( tag , es , enqueue : false )
10591059 format_proc = generate_format_proc
1060- meta_and_data = { }
1060+ meta_and_data = Hash . new { | h , k | h [ k ] = MultiEventStream . new }
10611061 records = 0
10621062 es . each ( unpacker : Fluent ::MessagePackFactory . thread_local_msgpack_unpacker ) do |time , record |
10631063 meta = metadata ( tag , time , record )
1064- meta_and_data [ meta ] ||= MultiEventStream . new
10651064 meta_and_data [ meta ] . add ( time , record )
10661065 records += 1
10671066 end
1067+ meta_and_data . default_proc = nil
10681068 write_guard do
10691069 @buffer . write ( meta_and_data , format : format_proc , enqueue : enqueue )
10701070 end
0 commit comments