Skip to content

Conversation

@colinsurprenant
Copy link
Contributor

@colinsurprenant colinsurprenant commented May 6, 2020

  • add new delimiter option with \n as default

  • add new input_type config option for either line based data or stream based data with line as default

    • in input_type => line each data chunk provided by the input is considered a complete CSV line or multi-lines document. In this mode line breaks in columns are supported. This will typically used for inputs like file or http.
    • in input_type => stream CSV data can be incomplete in each data chunk and spawn multiple chunks for completeness where the delimiter option will identify the data boundary. This will typically used for inputs like stdin or tcp.

TODO

  • Look into @yaauie's proposal with using StringIO and CSV.new to better control parsing exceptions.
  • Update docs for new options
  • Document line break support for line and stream input types
  • Re-evaluate version bump in favor of a minor bump

@elasticsearch-bot elasticsearch-bot self-assigned this May 7, 2020
@yaauie yaauie assigned yaauie and unassigned elasticsearch-bot May 7, 2020
CONVERTERS.freeze

def register
@buffer = FileWatch::BufferedTokenizer.new(@delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the buffered tokenizer is tricky here, because CSV data can have literal newlines as long as they are quoted. By pre-tokenizing before handing off to the CSV parser that has the context, we effectively prevent legal literal newlines from being used.

Additionally, input plugins that already use the BufferedTokenizer (e.g. File Input) will strip the newlines from their input before passing off each entry to the codec.

I think another approach would be to use CSV::parse instead of CSV::parse_line, and then iterate over the resulting entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL line breaks inside CSV values 😮

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV in practice is a giant mess, because it developed organically without a formal specification over the course of decades. When one product encountered an edge-case, they came up with a solution to their problem but often solved it in a way that introduced new and weirder edge cases (e.g., with no agreed-upon escape sequence, some solved the comma- and newline-in-field problem by adding quoting, which made quote characters magical and precludes our ability to pre-tokenize records)

Ruby's CSV implementation is pretty robust to the variety of data under the CSV umbrella, giving us options like row_sep to control the record delimiter, col_sep to control the field delimiter, quote_char to control how it understands quoted sequences, etc.

I included a recommendation in my previous review to use CSV::parse, instead of CSV::parse_line, because it is capable of handling multiple entries but otherwise remains the same (the parse_line variant simply ignores any additional records). We can still pipe through the "delimiter" option to CSV's row_sep parameter, and it should handle quoted literal row separators for us..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did test CSV::parse and it does work correctly in handling both line breaks in columns and line breaks at end of line but it does not work for streaming input scenarios where the BufferedTokenizer is useful but breaks the line breaks in columns case. Will recap and continue discussion in main thread.

def parse(line, &block)
begin
values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)
values = CSV.parse_line(line, :col_sep => @separator, :quote_char => @quote_char)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like:

CSV.parse(line, :col_sep => @separator, :quote_char => @quote_char).each do |values|
  next if values.nil?

  ## same implementation, using `next` instead of `return` when finding column names
end

@colinsurprenant
Copy link
Contributor Author

@yaauie good catch about the line break that can be part of a quoted CSV value. This is a tricky one; this is not standardized and it seems like many implementations do not support it either. The Ruby CSV library does support it though.

The problem we face here is, as you pointed out, (and this is related to the long-lasting streaming vs line-oriented data) with the BufferedTokenizer a column containing a line break will be split in 2 lines so line breaks in columns will not work with the BufferedTokenizer. On the other hand, if we don't use the BufferedTokenizer then this codec will not work with streaming input like the tcp input. And as you also pointed out, when used with the file input, line breaks will already be processed (but note that if using the file input, line breaks in columns will not work either, regardless of the csv codec implementation).

This is in fact very similar to the problem described in logstash-plugins/logstash-codec-multiline#63 where I suggested introducing a a streaming_input config option to deal with this.

Copy link
Contributor

@yaauie yaauie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add a test that handles several independent lines without their trailing newline, to ensure that this plugin will continue to work as expected in the real world, since codecs are not guaranteed to hand off byte sequences that end with newlines (e.g., line-oriented inputs like the File Input strips the delimiter before handing off a sequence to the codec).

CONVERTERS.freeze

def register
@buffer = FileWatch::BufferedTokenizer.new(@delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV in practice is a giant mess, because it developed organically without a formal specification over the course of decades. When one product encountered an edge-case, they came up with a solution to their problem but often solved it in a way that introduced new and weirder edge cases (e.g., with no agreed-upon escape sequence, some solved the comma- and newline-in-field problem by adding quoting, which made quote characters magical and precludes our ability to pre-tokenize records)

Ruby's CSV implementation is pretty robust to the variety of data under the CSV umbrella, giving us options like row_sep to control the record delimiter, col_sep to control the field delimiter, quote_char to control how it understands quoted sequences, etc.

I included a recommendation in my previous review to use CSV::parse, instead of CSV::parse_line, because it is capable of handling multiple entries but otherwise remains the same (the parse_line variant simply ignores any additional records). We can still pipe through the "delimiter" option to CSV's row_sep parameter, and it should handle quoted literal row separators for us..

@colinsurprenant
Copy link
Contributor Author

colinsurprenant commented May 8, 2020

@yaauie let me recap the problems & alternatives we have:

Use cases

  • Streaming input (ex.: tcp or stdin)

    • To support streaming inputs we need to use the BufferedTokenizer to deal with continuous data that can be split across data chunks.
    • The BufferedTokenizer will split data at the defined line break which will also split a possible CSV quoted column containing a line break. This means that the BufferedTokenizer is incompatible with the line-break-in-quoted-column CSV feature.
  • Line/document-oriented input (ex.: file or http)

    • for strict line-oriented input like file there is no need for the BufferedTokenizer
    • OTOH for document-oriented input like http, a single request could contain a complete CSV document with multiple lines separated with line breaks and could also contain possible line-break-in-quoted-colum. Both of these cases can be correctly handled using the CSV::parse method.

Solutions

  1. We introduce a new streaming_input option (as also suggested in [WIP] use BufferedTokenizer and configurable line delimiter logstash-codec-multiline#63) which controls using the BufferedTokenizer or not to support the 2 use-cases above.
  2. We split this codec in 2 with csv and csv_lines for example, to support the 2 use-cases (similar to json/json_lines

Any other solution suggestions?

If we decide to move forward with something like (1), we could also plan the following:

  • deprecate the dual codecs we have like this: json/json_lines, plain/line and only have one which support both use cases.
  • we could introduce an input style hint from the input to the codec which would indicate what style of input it is: line oriented or streaming so that codecs could use the correct strategy by default.

WDYT?
Depending on what we decided here, I'll move the discussion in a new logstash issue to followup.

@yaauie
Copy link
Contributor

yaauie commented May 8, 2020

It is unfortunate that BufferedTokenizer consumes the delimiter, and that our codecs cannot rely on the completeness of the hunk of data arriving. In my mind, BufferedTokenizer should behave more like String#each_line than String.split, allowing the codecs to determine whether they need to chomp a trailing record separator, but that is not where we are.

The streaming_input solution certainly provides a way for users to configure a codec to re-introduce the delimiter to each chunk they receive, but it comes with its own edge-cases (e.g., closing flush on EOF introducing a newline that was never there). In general, I find that boolean flags end up overloading terms, and would recommend something like input_orientation => stream and input_orientation => line.

The other option is to push this new config to the "offending" inputs, providing them a way to declaratively include the delimiters.


Since the current behaviour of this codec is to handle each line as an event, emitting a _csvparsefailure-tagged event with the entire line when we encounter a failure, we will need to define what its behaviour should be when a multi-record hunk is given to it, especially if we have already emitted several events before encountering an error.

Here, I think CSV#readline may be helpful here too, especially combined with StringIO, since we can effectively "checkpoint" at each successful parse, and emit the remainder as the _csvparsefailure-tagged event.

Something like:

diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb
index 07d6416..186c8d5 100644
--- a/lib/logstash/codecs/csv.rb
+++ b/lib/logstash/codecs/csv.rb
@@ -21,6 +21,8 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
   # Optional.
   config :separator, :validate => :string, :default => ","
 
+  config :delimiter, :validate => :string, :default => "\n"
+
   # Define the character used to quote CSV fields. If this is not specified
   # the default is a double quote `"`.
   # Optional.
@@ -109,14 +111,20 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
   end
 
   def decode(data)
-    data = @converter.convert(data)
-    begin
-      values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)
+    data_io = StringIO.new(@converter.convert(data))
+    data_io.close_write
+    ack_position = 0
+    csv = CSV.new(data_io, :col_sep => @separator, :row_sep => @delimiter, :quote_char => @quote_char)
+
+    loop do
+      values = csv.readline
+      ack_position = data_io.pos
+      break if values.nil?
 
       if (@autodetect_column_names && @columns.empty?)
         @columns = values
         @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect)
-        return
+        next
       end
 
       decoded = {}
@@ -130,10 +138,11 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
       end
 
       yield LogStash::Event.new(decoded)
-    rescue CSV::MalformedCSVError => e
-      @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
-      yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"])
     end
+  rescue CSV::MalformedCSVError => e
+    data_io.seek(ack_position)
+    @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
+    yield LogStash::Event.new("message" => data_io.read, "tags" => ["_csvparsefailure"])
   end
 
   def encode(event)

@colinsurprenant
Copy link
Contributor Author

@yaauie
+1 input_orientation.

I don't think we actually need to re-add line breaks. You probably saw that in logstash-plugins/logstash-codec-multiline#63 but I think a better way would be to simply use or not use the BufferedTokenizer depending on input_orientation.

  • Withinput_orientation => line it would consider each received data to be a complete CSV object, be it a single line (ex. file input) or a complete document (ex. http input) and use CSV::Parse (or StringIO + CSV.readline as you suggested above) on it which would correctly parse these 2 cases.
  • Withinput_orientation => stream it would use the BufferedTokenizer and process each lines the same way with line orientation. Only limitation would be that CSV columns with line breaks would not be supported in this mode.

Do we agree on this strategy? If we do I'll go ahead and refactor for this and then we can iterate review on the implementation details.

@yaauie
Copy link
Contributor

yaauie commented May 8, 2020

The subject of this PR is "support line delimited data"; from the specs you added, I take this to mean "when CSV#decode is passed a body representing multiple records, emit each record as an event".

This goal can be achieved without adding BufferedTokenizer by using CSV#parse or by repeatedly sending CSV#readline; we can use CSV's row_sep directive and let it do the contextual tokenizing that respects quoted values in a way BufferedTokenizer cannot.

From what I can see, the only scenario in which adding a BufferedTokenizer would be useful is the opposite: when the codec is independently given multiple fragments of a single row spread across multiple CSV#decode calls, and is expected to retain and reassemble those fragments into a single row. And because many of our inputs use BufferedTokenizers of their own and swallow line-delimiters, this introduces more edge-cases.

@colinsurprenant
Copy link
Contributor Author

colinsurprenant commented May 9, 2020

@yaauie obviously the understanding of the problem has evolved and the description and specs have not yet followed, that's why I tried to recap my understanding of the problem and submitted possible solutions.

As I tried to explain, by using then input_orientation option which should certainly default to line, the BufferedTokenizer is not required and will not be used. The problem will be if you try to use a streaming input such as stdin or tcp it will not work correctly unless we use the BufferedTokenizer by configuring input_orientation => stream. The other option would be to make an extra csv_lines codec, similar to json/json_lines and plain/line codecs. My opinion is that we should eventually get rid of these duplicated codecs to serve the line-oriented and streaming inputs cases and instead move toward using an input_orientation option, which could also eventually be hinted by the input plugin, which knows what type of input it is providing (and also get rid of the weird fix_streaming_codecs method).

This IMO would provide a simple and cleaner path forward with what we have today, until we come up with a new processing framework (milling or else) at some point in the future.

@colinsurprenant colinsurprenant changed the title support line delimited data [WIP] support line delimited data May 9, 2020
@colinsurprenant colinsurprenant marked this pull request as draft May 9, 2020 16:15
@colinsurprenant
Copy link
Contributor Author

  • I updated the description and converted the PR to draft/WIP.
  • I pushed modifications to introduce the input_style option and added line vs stream specific specs as well as shared examples.

Let me know if there are any objections with this plan.

@colinsurprenant
Copy link
Contributor Author

Opened elastic/logstash#11885 for the broader discussion

@colinsurprenant
Copy link
Contributor Author

This is on hold until we conclude elastic/logstash#11885

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants