Skip to content

Commit 6e9e021

Browse files
committed
Use PackedForward instead of Message
See fluent/fluentd#671
1 parent c94649a commit 6e9e021

File tree

2 files changed

+26
-14
lines changed

2 files changed

+26
-14
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
require 'monitor'
2121
require 'logger'
2222
require 'json'
23+
require 'base64'
24+
require 'securerandom'
2325

2426
module Fluent
2527
module Logger
@@ -98,14 +100,16 @@ def last_error
98100
def post_with_time(tag, map, time)
99101
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
100102
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
101-
write [tag, time.to_i, map]
103+
write(tag, time.to_i, map)
102104
end
103105

104106
def close
105107
@mon.synchronize {
106108
if @pending
107109
begin
108-
send_data(@pending)
110+
@pending.each do |tag, record|
111+
send_data([tag, record].to_msgpack)
112+
end
109113
rescue => e
110114
set_last_error(e)
111115
@logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
@@ -140,36 +144,40 @@ def suppress_sec
140144
end
141145
end
142146

143-
def write(msg)
147+
def write(tag, time, map)
144148
begin
145-
data = to_msgpack(msg)
149+
record = to_msgpack([time, map])
146150
rescue => e
147151
set_last_error(e)
152+
msg = [tag, time, map]
148153
@logger.error("FluentLogger: Can't convert to msgpack: #{msg.inspect}: #{$!}")
149154
return false
150155
end
151156

152157
@mon.synchronize {
153158
if @pending
154-
@pending << data
159+
@pending[tag] << record
155160
else
156-
@pending = data
161+
@pending = Hash.new{|h, k| h[k] = "" }
162+
@pending[tag] = record
157163
end
158164

159165
# suppress reconnection burst
160-
if !@connect_error_history.empty? && @pending.bytesize <= @limit
166+
if !@connect_error_history.empty? && @pending.to_s.bytesize <= @limit
161167
if Time.now.to_i - @connect_error_history.last < suppress_sec
162168
return false
163169
end
164170
end
165171

166172
begin
167-
send_data(@pending)
173+
@pending.each do |tag, record|
174+
send_data([tag, record].to_msgpack)
175+
end
168176
@pending = nil
169177
true
170178
rescue => e
171179
set_last_error(e)
172-
if @pending.bytesize > @limit
180+
if @pending.to_s.bytesize > @limit
173181
@logger.error("FluentLogger: Can't send logs to #{@host}:#{@port}: #{$!}")
174182
call_buffer_overflow_handler(@pending)
175183
@pending = nil
@@ -199,6 +207,7 @@ def send_data(data)
199207
# end
200208
# data = data[n..-1]
201209
#end
210+
202211
true
203212
end
204213

spec/fluent_logger_spec.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ def wait_transfer
203203
logger_io.rewind
204204
log = logger_io.read
205205
expect(log).to match /Failed to connect/
206-
expect(log).to match /Can't send logs to/
206+
expect(log).to match /Can\'t send logs to/
207207
}
208208

209209
it ('post limit over') do
@@ -213,11 +213,11 @@ def wait_transfer
213213
expect(queue.last).to be_nil
214214

215215
logger_io.rewind
216-
expect(logger_io.read).not_to match /Can't send logs to/
216+
expect(logger_io.read).not_to match /Can\'t send logs to/
217217

218218
logger.post('tag', {'a' => ('c' * 1000)})
219219
logger_io.rewind
220-
expect(logger_io.read).to match /Can't send logs to/
220+
expect(logger_io.read).to match /Can\'t send logs to/
221221
end
222222

223223
it ('log connect error once') do
@@ -240,8 +240,11 @@ class BufferOverflowHandler
240240

241241
def flush(messages)
242242
@buffer ||= []
243-
MessagePack::Unpacker.new.feed_each(messages) do |msg|
244-
@buffer << msg
243+
messages.each do |tag, message|
244+
unpacker = MessagePack::Unpacker.new(StringIO.new(message))
245+
unpacker.each do |time, record|
246+
@buffer << [tag, time, record]
247+
end
245248
end
246249
end
247250
end

0 commit comments

Comments
 (0)