diff --git a/lib/logstash/codecs/fluent.rb b/lib/logstash/codecs/fluent.rb index 8404973..92175f3 100644 --- a/lib/logstash/codecs/fluent.rb +++ b/lib/logstash/codecs/fluent.rb @@ -28,25 +28,17 @@ class LogStash::Codecs::Fluent < LogStash::Codecs::Base config_name "fluent" - public def register require "msgpack" @decoder = MessagePack::Unpacker.new end - public - def decode(data) - @decoder.feed(data) - @decoder.each do |tag, epochtime, map| - event = LogStash::Event.new(map.merge( - LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), - "tags" => tag - )) - yield event + def decode(data, &block) + @decoder.feed_each(data) do |item| + decode_event(item, &block) end end # def decode - public def encode(event) tag = event.get("tags") || "log" epochtime = event.timestamp.to_i @@ -59,4 +51,57 @@ def encode(event) @on_event.call(event, MessagePack.pack([tag, epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)])) end # def encode + private + + def decode_event(data, &block) + tag = data[0] + entries = data[1] + + case entries + when String + # PackedForward + option = data[2] + compressed = (option && option['compressed'] == 'gzip') + if compressed + raise(LogStash::Error, "PackedForward with compression is not supported") + end + + entries_decoder = MessagePack::Unpacker.new + entries_decoder.feed_each(entries) do |entry| + epochtime = entry[0] + map = entry[1] + event = LogStash::Event.new(map.merge( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => [ tag ] + )) + yield event + end + when Array + # Forward + entries.each do |entry| + epochtime = entry[0] + map = entry[1] + event = LogStash::Event.new(map.merge( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => [ tag ] + )) + yield event + end + when Fixnum + # Message + epochtime = entries + map = data[2] + event = LogStash::Event.new(map.merge( + LogStash::Event::TIMESTAMP => LogStash::Timestamp.at(epochtime), + "tags" => [ tag ] + )) + yield event + else + raise(LogStash::Error, "Unknown event type") + end + rescue StandardError => e + @logger.error("Fluent parse error, original data now in message field", :error => e, :data => data) + yield LogStash::Event.new("message" => data, "tags" => [ "_fluentparsefailure" ]) + end + end # class LogStash::Codecs::Fluent diff --git a/logstash-codec-fluent.gemspec b/logstash-codec-fluent.gemspec index 324d89c..2d682df 100644 --- a/logstash-codec-fluent.gemspec +++ b/logstash-codec-fluent.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-fluent' - s.version = '3.0.2' + s.version = '3.1.1' s.licenses = ['Apache License (2.0)'] s.summary = "This codec handles fluentd's msgpack schema." s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" @@ -29,6 +29,6 @@ Gem::Specification.new do |s| s.add_runtime_dependency 'msgpack' end - s.add_development_dependency 'logstash-devutils' + s.add_development_dependency 'logstash-devutils', ">= 1.0.0" end diff --git a/spec/codecs/fluent_spec.rb b/spec/codecs/fluent_spec.rb index 8d16b55..c85cd1f 100644 --- a/spec/codecs/fluent_spec.rb +++ b/spec/codecs/fluent_spec.rb @@ -37,7 +37,60 @@ it "should decode without errors" do subject.decode(message) do |event| - expect(event.get("name")).to eq("foo") + expect(event.get("name")).to eq("foo") + end + end + + end + + describe "event decoding (buckets of events)" do + + let(:tag) { "mytag" } + let(:epochtime) { event.timestamp.to_i } + let(:data) { LogStash::Util.normalize(event.to_hash) } + let(:message) do + MessagePack.pack([tag, + [ + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)], + [epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601)] + ] + ]) + end + + it "should decode without errors" do + count = 0 + + subject.decode(message) do |event| + expect(event.get("name")).to eq("foo") + count += 1 + end + + expect(count).to eq(3) + end + + end + + describe "event decoding (broken package)" do + + let(:tag) { "mytag" } + let(:epochtime) { event.timestamp.to_s } + let(:data) { LogStash::Util.normalize(event.to_hash) } + let(:message) do + MessagePack.pack([tag, + epochtime, data.merge(LogStash::Event::TIMESTAMP => event.timestamp.to_iso8601) + ]) + end + + it "should decode with errors" do + subject.decode(message) do |event| + expect(event.get("name")).not_to eq("foo") + end + end + + it "should inject a failure event" do + subject.decode(message) do |event| + expect(event.get("tags")).to include("_fluentparsefailure") end end