Skip to content

Commit 01a5509

Browse files
committed
Add FluentLogger#pending_bytesize
1 parent 77a423c commit 01a5509

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ def connection_string
135135
@socket_path ? "#{@socket_path}" : "#{@host}:#{@port}"
136136
end
137137

138+
def pending_bytesize
139+
if @pending
140+
@pending.bytesize
141+
else
142+
0
143+
end
144+
end
145+
138146
private
139147

140148
def to_msgpack(msg)
@@ -170,7 +178,7 @@ def write(msg)
170178
end
171179

172180
# suppress reconnection burst
173-
if !@connect_error_history.empty? && @pending.bytesize <= @limit
181+
if !@connect_error_history.empty? && pending_bytesize <= @limit
174182
if Time.now.to_i - @connect_error_history.last < suppress_sec
175183
return false
176184
end
@@ -182,7 +190,7 @@ def write(msg)
182190
true
183191
rescue => e
184192
set_last_error(e)
185-
if @pending.bytesize > @limit
193+
if pending_bytesize > @limit
186194
@logger.error("FluentLogger: Can't send logs to #{connection_string}: #{$!}")
187195
call_buffer_overflow_handler(@pending)
188196
@pending = nil

spec/fluent_logger_spec.rb

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,9 +58,11 @@
5858

5959
context('post') do
6060
it ('success') {
61+
expect(logger.pending_bytesize).to eq 0
6162
expect(logger.post('tag', {'a' => 'b'})).to be true
6263
fluentd.wait_transfer
6364
expect(fluentd.queue.last).to eq ['logger-test.tag', {'a' => 'b'}]
65+
expect(logger.pending_bytesize).to eq 0
6466
}
6567

6668
it ('close after post') {
@@ -72,6 +74,7 @@
7274
expect(logger).to be_connect
7375
fluentd.wait_transfer
7476
expect(fluentd.queue.last).to eq ['logger-test.tag', {'b' => 'c'}]
77+
expect(logger.pending_bytesize).to eq 0
7578
}
7679

7780
it ('large data') {
@@ -150,9 +153,11 @@
150153
context "not running fluentd" do
151154
context('fluent logger interface') do
152155
it ('post & close') {
156+
expect(logger.pending_bytesize).to eq 0
153157
expect(logger.post('tag', {'a' => 'b'})).to be false
154158
fluentd.wait_transfer # even if wait
155159
expect(fluentd.queue.last).to be_nil
160+
expect(logger.pending_bytesize).to be > 0
156161
logger.close
157162
logger_io.rewind
158163
log = logger_io.read
@@ -204,18 +209,21 @@ def flush(messages)
204209
let(:buffer_overflow_handler) { Proc.new { |messages| handler.flush(messages) } }
205210

206211
it ('post limit over') do
212+
expect(logger.pending_bytesize).to eq 0
207213
logger.limit = 100
208214
event_1 = {'a' => 'b'}
209215
logger.post('tag', event_1)
210216
fluentd.wait_transfer # even if wait
211217
expect(fluentd.queue.last).to be(nil)
218+
expect(logger.pending_bytesize).to be > 0
212219

213220
logger_io.rewind
214221
expect(logger_io.read).not_to match(/Can't send logs to/)
215222

216223
event_2 = {'a' => ('c' * 1000)}
217224
logger.post('tag', event_2)
218225
logger_io.rewind
226+
expect(logger.pending_bytesize).to eq 0
219227
expect(logger_io.read).to match(/Can't send logs to/)
220228

221229
buffer = handler.buffer

0 commit comments

Comments
 (0)