-
Notifications
You must be signed in to change notification settings - Fork 12
[WIP] support line delimited data #8
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
colinsurprenant
wants to merge
6
commits into
logstash-plugins:master
from
colinsurprenant:multiline
Closed
Changes from 5 commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
314b2c5
support decoding multiple lines
colinsurprenant 5063f95
add docs
colinsurprenant cfeb539
bump version to 1.0.1
colinsurprenant 7ea51e5
flush spec
colinsurprenant fa041f3
remove debug traces
colinsurprenant 9d56a62
add input_type config option
colinsurprenant File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| # encoding: utf-8 | ||
| require "logstash/codecs/base" | ||
| require "logstash/util/charset" | ||
| require "logstash/util/buftok" | ||
| require "csv" | ||
|
|
||
| class LogStash::Codecs::CSV < LogStash::Codecs::Base | ||
|
|
@@ -42,6 +43,9 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base | |
| # Defaults to false. | ||
| config :autodetect_column_names, :validate => :boolean, :default => false | ||
|
|
||
| # Define the line delimiter | ||
| config :delimiter, :validate => :string, :default => "\n" | ||
|
|
||
| # Define a set of datatype conversions to be applied to columns. | ||
| # Possible conversions are integer, float, date, date_time, boolean | ||
| # | ||
|
|
@@ -88,6 +92,7 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base | |
| CONVERTERS.freeze | ||
|
|
||
| def register | ||
| @buffer = FileWatch::BufferedTokenizer.new(@delimiter) | ||
| @converter = LogStash::Util::Charset.new(@charset) | ||
| @converter.logger = @logger | ||
|
|
||
|
|
@@ -108,10 +113,38 @@ def register | |
| @logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char) | ||
| end | ||
|
|
||
| def decode(data) | ||
| data = @converter.convert(data) | ||
| def decode(data, &block) | ||
| @buffer.extract(data).each do |line| | ||
| parse(@converter.convert(line), &block) | ||
| end | ||
| end | ||
|
|
||
| def encode(event) | ||
| if @include_headers | ||
| csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true) | ||
| @on_event.call(event, csv_data) | ||
|
|
||
| # output headers only once per codec lifecycle | ||
| @include_headers = false | ||
| end | ||
|
|
||
| csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char) | ||
| @on_event.call(event, csv_data) | ||
| end | ||
|
|
||
| def flush(&block) | ||
| remainder = @buffer.flush | ||
| if !remainder.empty? | ||
| parse(@converter.convert(remainder), &block) | ||
| end | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def parse(line, &block) | ||
| begin | ||
| values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char) | ||
| values = CSV.parse_line(line, :col_sep => @separator, :quote_char => @quote_char) | ||
|
||
| return if values.nil? | ||
|
|
||
| if (@autodetect_column_names && @columns.empty?) | ||
| @columns = values | ||
|
|
@@ -131,26 +164,11 @@ def decode(data) | |
|
|
||
| yield LogStash::Event.new(decoded) | ||
| rescue CSV::MalformedCSVError => e | ||
| @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data) | ||
| yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"]) | ||
| end | ||
| end | ||
|
|
||
| def encode(event) | ||
| if @include_headers | ||
| csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true) | ||
| @on_event.call(event, csv_data) | ||
|
|
||
| # output headers only once per codec lifecycle | ||
| @include_headers = false | ||
| @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line) | ||
| yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"]) | ||
| end | ||
|
|
||
| csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char) | ||
| @on_event.call(event, csv_data) | ||
| end | ||
|
|
||
| private | ||
|
|
||
| def select_values(event) | ||
| if @columns.empty? | ||
| event.to_hash.values | ||
|
|
||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using the buffered tokenizer is tricky here, because CSV data can have literal newlines as long as they are quoted. By pre-tokenizing before handing off to the CSV parser that has the context, we effectively prevent legal literal newlines from being used.
Additionally, input plugins that already use the
BufferedTokenizer(e.g. File Input) will strip the newlines from their input before passing off each entry to the codec.I think another approach would be to use
CSV::parseinstead ofCSV::parse_line, and then iterate over the resulting entries.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TIL line breaks inside CSV values 😮
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CSV in practice is a giant mess, because it developed organically without a formal specification over the course of decades. When one product encountered an edge-case, they came up with a solution to their problem but often solved it in a way that introduced new and weirder edge cases (e.g., with no agreed-upon escape sequence, some solved the comma- and newline-in-field problem by adding quoting, which made quote characters magical and precludes our ability to pre-tokenize records)
Ruby's CSV implementation is pretty robust to the variety of data under the CSV umbrella, giving us options like
row_septo control the record delimiter,col_septo control the field delimiter,quote_charto control how it understands quoted sequences, etc.I included a recommendation in my previous review to use
CSV::parse, instead ofCSV::parse_line, because it is capable of handling multiple entries but otherwise remains the same (theparse_linevariant simply ignores any additional records). We can still pipe through the "delimiter" option to CSV'srow_sepparameter, and it should handle quoted literal row separators for us..There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I did test
CSV::parseand it does work correctly in handling both line breaks in columns and line breaks at end of line but it does not work for streaming input scenarios where theBufferedTokenizeris useful but breaks the line breaks in columns case. Will recap and continue discussion in main thread.