Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 115 additions & 63 deletions lib/logstash/codecs/multiline.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
require "logstash/timestamp"
require "logstash/codecs/auto_flush"

require "grok-pure"
require 'logstash/patterns/core'
require "logstash/util/buftok"

# The multiline codec will collapse multiline messages and merge them into a
# single event.
#
Expand Down Expand Up @@ -111,6 +115,13 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base
# This only affects "plain" format logs since JSON is `UTF-8` already.
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

# Change the delimiter that separates lines
config :delimiter, :validate => :string, :default => "\n"

# Assume data received from input plugin as line based. For some input plugins
# like stdin or tcp/udp data is not line based and this option should be set to false.
config :line_based_input, :validate => :boolean, :default => true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Feels like this should be a contract between the input and decoder plugins and not something we need to expose via configuration to users.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but that would need us to properly annotate all input plugins? in which case this option allows us to have something working today, and eventually we can deprecate this once we have the proper info from the input plugins?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a historical perspective, I think multiline codec was not really intended to be used with single-payload inputs such as redis, rabbitmq, etc -- only intended for bytestream inputs like stdin, tcp, file(*), etc.

(*) File input currently, because the way the filewatch library works, only receives lines right now. It could simplify things if we had an api in filewatch that presented a bytestream instead of line-oriented data and make the codec deal with it.


# Tag multiline events with a given tag. This tag will only be added
# to events that actually have multiple lines in them.
config :multiline_tag, :validate => :string, :default => "multiline"
Expand All @@ -132,19 +143,10 @@ module LogStash module Codecs class Multiline < LogStash::Codecs::Base
# auto_flush_interval. No default. If unset, no auto_flush. Units: seconds
config :auto_flush_interval, :validate => :number

public

def register
require "grok-pure" # rubygem 'jls-grok'
require 'logstash/patterns/core'

# Detect if we are running from a jarfile, pick the right path.
patterns_path = []
patterns_path += [LogStash::Patterns::Core.path]

@tokenizer = FileWatch::BufferedTokenizer.new(@delimiter)
@grok = Grok.new

@patterns_dir = patterns_path.to_a + @patterns_dir
@patterns_dir = [LogStash::Patterns::Core.path] + @patterns_dir
@patterns_dir.each do |path|
if ::File.directory?(path)
path = ::File.join(path, "*")
Expand All @@ -165,12 +167,67 @@ def register

@converter = LogStash::Util::Charset.new(@charset)
@converter.logger = @logger

# TODO: (colin) I don't really understand this @last_seen_listener poutine. I needed to create
# this lamda to DRY across the close & auto_flush methods to pass the closure to the new
# @tokenizer which I figured needs to be flushed upon close. there does not seem to be
# explicit tests for this closing logic.
# what is not clear here is the initialization of @last_seen_listener which gets initialized
# in the accept method but the close method systematically call auto_flush which assumes the
# existence of @last_seen_listener. this whole logic is confusing and should be either made
# more explicit and self documenting OR documentation should be prodided.
@auto_flush_block = lambda do |event|
@last_seen_listener.process_event(event)
end

if @auto_flush_interval
# will start on first decode
@auto_flush_runner = AutoFlush.new(self, @auto_flush_interval)
end
end # def register
end

def decode(data, &block)
data = data + @delimiter if @line_based_input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider not mutating the original string. You could create a @appendage ivar once using @line_based_input in register and then do _data = data + @appendage


@tokenizer.extract(data.force_encoding("ASCII-8BIT")).each do |line|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the forced encoding to ascii-8bit? Just curious.

match_line(@converter.convert(line), &block)
end
end

def encode(event)
# Nothing to do.
@on_event.call(event, event)
end

# this is the termination or final flush API
#
# @param block [Proc] the closure that will be called for all events that need to be flushed.
def flush(&block)
remainder = @tokenizer.flush
match_line(@converter.convert(remainder), &block) unless remainder.empty?
flush_buffered_events(&block)
end

# TODO: (colin) I believe there is a problem here in calling auto_flush. auto_flush depends on
# @auto_flush_block which references @last_seen_listener which is only initialized in the context of
# IdentityMapCodec which in turn I believe cannot by assumed. the multiline codec could run without
# IdentityMapCodec.
def close
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This codec did not override the base close method before auto_flush was introduced, meaning that if some lines were buffered then they were lost on close - this was normal behaviour. This was because when the pipeline called close on an input in another thread the decode block binding is not available.

With auto_flush and the last_seen_listener it is now possible to flush on close - because a regular auto_flush may be pending and would not get executed as LS is closing down.

if auto_flush_runner.pending?
#will cancel task if necessary
auto_flush_runner.stop
end

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we can return if @last_seen_listener.nil? because there is no point continuing further.

remainder = @tokenizer.flush
match_line(@converter.convert(remainder), &@auto_flush_block) unless remainder.empty?

auto_flush
end

# TODO: (colin) what is the pupose of this accept method? AFAICT it is only used if this codec is used
# within the IdentityMapCodec. it is not clear when & in which context this method is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not true. It can be called directly or indirectly (via the IdentityMapCodec) by any input that has been modified to supply the listener callback object.

# I believe the codec should still be able to live outside the context of an IdentityMapCodec but there
# are usage of ivars like @last_seen_listener only inititalized int the context of IdentityMapCodec.
def accept(listener)
# memoize references to listener that holds upstream state
@previous_listener = @last_seen_listener || listener
Expand All @@ -180,45 +237,62 @@ def accept(listener)
end
end

def decode(text, &block)
text = @converter.convert(text)
text.split("\n").each do |line|
match = @grok.match(line)
@logger.debug("Multiline", :pattern => @pattern, :text => line,
:match => !match.nil?, :negate => @negate)
def buffer(line)
@buffer_bytes += line.bytesize
@buffer.push(line)
end

# Add negate option
match = (match and !@negate) || (!match and @negate)
@handler.call(line, match, &block)
end
end # def decode
def reset_buffer
@buffer = []
@buffer_bytes = 0
end

# TODO: (colin) this method is not clearly documented as being required by the AutoFlush class & tasks.
# I belive there is a problem here with the usage of @auto_flush_block which assumes to be in the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This auto_flush method is available to be called by another object (input, pipeline or component) if it wished to.
At the moment the AutoFlush class calls it in a retriggerable timer.

# context of an IdentityMapCodec but the multiline codec could run without IdentityMapCodec
def auto_flush
flush_buffered_events(&@auto_flush_block)
end

def buffer(text)
@buffer_bytes += text.bytesize
@buffer.push(text)
# TODO: (colin) auto_flush_active? doesn't seem to be used anywhere. any reason to keep this api?
def auto_flush_active?
!@auto_flush_interval.nil?
end

def flush(&block)
# private

# merge all currently bufferred events and call the passed block for the resulting merged event
#
# @param block [Proc] the closure that will be called for the resulting merged event
def flush_buffered_events(&block)
if block_given? && @buffer.any?
no_error = true
events = merge_events
event = merge_events
begin
yield events
yield event
rescue ::Exception => e
# need to rescue everything
# likliest cause: backpressure or timeout by exception
# can't really do anything but leave the data in the buffer for next time if there is one
@logger.error("Multiline: flush downstream error", :exception => e)
@logger.error("Multiline: buffered events flush downstream error", :exception => e)
no_error = false
end
reset_buffer if no_error
end
end

def auto_flush
flush do |event|
@last_seen_listener.process_event(event)
end
# evalutate if a given line matches the configured pattern and call the appropriate do_next or do_previous
# handler given the match state.
#
# @param line [String] the string to match against the pattern
# @param block [Proc] the closure that will be called for each event that might result after processing this line
def match_line(line, &block)
match = @grok.match(line)
@logger.debug? && @logger.debug("Multiline", :pattern => @pattern, :line => line, :match => !match.nil?, :negate => @negate)

# Add negate option
match = (match and !@negate) || (!match and @negate)
@handler.call(line, match, &block)
end

def merge_events
Expand All @@ -229,11 +303,6 @@ def merge_events
event
end

def reset_buffer
@buffer = []
@buffer_bytes = 0
end

def doing_previous?
@what == "previous"
end
Expand All @@ -242,16 +311,16 @@ def what_based_listener
doing_previous? ? @previous_listener : @last_seen_listener
end

def do_next(text, matched, &block)
buffer(text)
def do_next(line, matched, &block)
buffer(line)
auto_flush_runner.start
flush(&block) if !matched || buffer_over_limits?
flush_buffered_events(&block) if !matched || buffer_over_limits?
end

def do_previous(text, matched, &block)
flush(&block) if !matched || buffer_over_limits?
def do_previous(line, matched, &block)
flush_buffered_events(&block) if !matched || buffer_over_limits?
auto_flush_runner.start
buffer(text)
buffer(line)
end

def over_maximum_lines?
Expand All @@ -266,24 +335,7 @@ def buffer_over_limits?
over_maximum_lines? || over_maximum_bytes?
end

def encode(event)
# Nothing to do.
@on_event.call(event, event)
end # def encode

def close
if auto_flush_runner.pending?
#will cancel task if necessary
auto_flush_runner.stop
end
auto_flush
end

def auto_flush_active?
!@auto_flush_interval.nil?
end

def auto_flush_runner
@auto_flush_runner || AutoFlushUnset.new(nil, nil)
end
end end end # class LogStash::Codecs::Multiline
end end end
39 changes: 32 additions & 7 deletions spec/codecs/multiline_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@
expect(events[1]["message"]).to eq "0987654321"
end

it "should handle message continuation across decode calls (i.e. use buftok)" do
config.update(
"pattern" => '\D',
"what" => "previous",
"line_based_input" => false,
)
lineio = StringIO.new("1234567890\nA234567890\nB234567890\n0987654321\n")
until lineio.eof
line = lineio.read(5)
codec.decode(line) {|evt| events.push(evt)}
end
codec.flush { |e| events << e }
expect(events[0]["message"]).to eq "1234567890\nA234567890\nB234567890"
expect(events[1]["message"]).to eq "0987654321"
end

it "should allow grok patterns to be used" do
config.update(
"pattern" => "^%{NUMBER} %{TIME}",
Expand Down Expand Up @@ -220,9 +236,11 @@
let(:codec) { Mlc::MultilineRspec.new(config).tap {|c| c.register} }
let(:events) { [] }
let(:lines) do
{ "en.log" => ["hello world", " second line", " third line"],
{
"en.log" => ["hello world", " second line", " third line"],
"fr.log" => ["Salut le Monde", " deuxième ligne", " troisième ligne"],
"de.log" => ["Hallo Welt"] }
"de.log" => ["Hallo Welt"]
}
end
let(:listener_class) { Mlc::LineListener }
let(:auto_flush_interval) { 0.5 }
Expand Down Expand Up @@ -252,13 +270,17 @@
let(:listener_class) { Mlc::LineErrorListener }

it "does not build any events, logs an error and the buffer data remains" do
config.update("pattern" => "^\\s", "what" => "previous",
"auto_flush_interval" => auto_flush_interval)
config.update(
"pattern" => "^\\s",
"what" => "previous",
"auto_flush_interval" => auto_flush_interval
)

codec.logger = Mlc::MultilineLogTracer.new
line_producer.call("en.log")
sleep(auto_flush_interval + 0.1)
msg, args = codec.logger.trace_for(:error)
expect(msg).to eq("Multiline: flush downstream error")
expect(msg).to eq("Multiline: buffered events flush downstream error")
expect(args[:exception].message).to eq(errmsg)
expect(events.size).to eq(0)
expect(codec.buffer_size).to eq(3)
Expand All @@ -274,8 +296,11 @@ def assert_produced_events(key, sleeping)

context "mode: previous, when there are pauses between multiline file writes" do
it "auto-flushes events from the accumulated lines to the queue" do
config.update("pattern" => "^\\s", "what" => "previous",
"auto_flush_interval" => auto_flush_interval)
config.update(
"pattern" => "^\\s",
"what" => "previous",
"auto_flush_interval" => auto_flush_interval
)

assert_produced_events("en.log", auto_flush_interval + 0.1) do
expect(events[0]).to match_path_and_line("en.log", lines["en.log"])
Expand Down