Skip to content

Commit f1c0996

Browse files
committed
Add nanosecond_precision parameter
1 parent b712f76 commit f1c0996

File tree

1 file changed

+39
-2
lines changed

1 file changed

+39
-2
lines changed

lib/fluent/logger/fluent_logger.rb

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,30 @@
2323

2424
module Fluent
2525
module Logger
26+
class EventTime
27+
TYPE = 0
28+
29+
def initialize(raw_time)
30+
@time = raw_time
31+
end
32+
33+
def to_msgpack(io = nil)
34+
@time.sec.to_msgpack(io)
35+
end
36+
37+
def to_msgpack_ext
38+
[@time.sec, @time.nsec].pack('NN')
39+
end
40+
41+
def self.from_msgpack_ext(data)
42+
new(*data.unpack('NN'))
43+
end
44+
45+
def to_json(*args)
46+
@time.sec
47+
end
48+
end
49+
2650
class FluentLogger < LoggerBase
2751
BUFFER_LIMIT = 8*1024*1024
2852
RECONNECT_WAIT = 0.5
@@ -55,6 +79,13 @@ def initialize(tag_prefix = nil, *args)
5579
@host = options[:host]
5680
@port = options[:port]
5781
@socket_path = options[:socket_path]
82+
@nanosecond_precision = options[:nanosecond_precision]
83+
84+
factory = MessagePack::Factory.new
85+
if @nanosecond_precision
86+
factory.register_type(EventTime::TYPE, EventTime)
87+
end
88+
@packer = factory.packer
5889

5990
@mon = Monitor.new
6091
@pending = nil
@@ -98,7 +129,11 @@ def last_error
98129
def post_with_time(tag, map, time)
99130
@logger.debug { "event: #{tag} #{map.to_json}" rescue nil } if @logger.debug?
100131
tag = "#{@tag_prefix}.#{tag}" if @tag_prefix
101-
write [tag, time.to_i, map]
132+
if @nanosecond_precision && time.is_a?(Time)
133+
write [tag, EventTime.new(time), map]
134+
else
135+
write [tag, time.to_i, map]
136+
end
102137
end
103138

104139
def close
@@ -147,7 +182,9 @@ def pending_bytesize
147182

148183
def to_msgpack(msg)
149184
begin
150-
msg.to_msgpack
185+
res = @packer.pack(msg).to_s
186+
@packer.clear
187+
res
151188
rescue NoMethodError
152189
JSON.parse(JSON.generate(msg)).to_msgpack
153190
end

0 commit comments

Comments
 (0)