forked from logstash-plugins/logstash-codec-fluent
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathfluent.rb
More file actions
107 lines (97 loc) · 3.34 KB
/
fluent.rb
File metadata and controls
107 lines (97 loc) · 3.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
# encoding: utf-8
require "logstash/codecs/base"
require "logstash/util/charset"
require "logstash/timestamp"
require "logstash/util"
# This codec handles fluentd's msgpack schema.
#
# For example, you can receive logs from `fluent-logger-ruby` with:
# [source,ruby]
# input {
# tcp {
# codec => fluent
# port => 4000
# }
# }
#
# And from your ruby code in your own application:
# [source,ruby]
# logger = Fluent::Logger::FluentLogger.new(nil, :host => "example.log", :port => 4000)
# logger.post("some_tag", { "your" => "data", "here" => "yay!" })
#
# Notes:
#
# * the fluent uses a second-precision time for events, so you will never see
# subsecond precision on events processed by this codec.
#
class LogStash::Codecs::Fluent < LogStash::Codecs::Base
config_name "fluent"
def register
require "msgpack"
@decoder = MessagePack::Unpacker.new
end
def decode(data, &block)
@decoder.feed_each(data) do |item|
decode_event(item, &block)
end
end # def decode
def encode(event)
tag = event.get("tags") || "log"
epochtime = event.timestamp.to_i
# use normalize to make sure returned Hash is pure Ruby for
# MessagePack#pack which relies on pure Ruby object recognition
data = LogStash::Util.normalize(event.to_hash)
# timestamp is serialized as a iso8601 string
# merge to avoid modifying data which could have side effects if multiple outputs
@on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)]))
end # def encode
private
def decode_event(data, &block)
tag = data[0]
entries = data[1]
case entries
when String
# PackedForward
option = data[2]
compressed = (option && option['compressed'] == 'gzip')
if compressed
raise(LogStash::Error, "PackedForward with compression is not supported")
end
entries_decoder = MessagePack::Unpacker.new
entries_decoder.feed_each(entries) do |entry|
epochtime = entry[0]
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
end
when Array
# Forward
entries.each do |entry|
epochtime = entry[0]
map = entry[1]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
end
when Fixnum
# Message
epochtime = entries
map = data[2]
event = LogStash::Event.new(map.merge(
LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime),
"tags" => [ tag ]
))
yield event
else
raise(LogStash::Error, "Unknown event type")
end
rescue StandardError => e
@logger.error("Fluent parse error, original data now in message field", :error => e, :data => data)
yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ])
end
end # class LogStash::Codecs::Fluent