Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 1.0.1
- Fixed parsing of line delimited data [#8](https://github.com/logstash-plugins/logstash-codec-csv/pull/8)
## 1.0.0
- Fixed dependencies to work with logstash v6 and up. Overhauled to match features of the CSV Filter. Improved spec coverage [#4](https://github.com/logstash-plugins/logstash-codec-csv/pull/4)
## 0.1.5
Expand Down
9 changes: 9 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The csv codec takes CSV data, parses it and passes it along.
| <<plugins-{type}s-{plugin}-charset>> |<<string,string>>, one of `["ASCII-8BIT", "UTF-8", "US-ASCII", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB2312", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-31J", "Windows-1250", "Windows-1251", "Windows-1252", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "IBM037", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "EUC-JIS-2004", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "ebcdic-cp-us", "eucJP", "euc-jp-ms", "EUC-JISX0213", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "ISO8859-2", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP932", "csWindows31J", "SJIS", "PCK", "CP1250", "CP1251", "CP1252", "external", "locale"]`|No
| <<plugins-{type}s-{plugin}-columns>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-convert>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-include_headers>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-quote_char>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-separator>> |<<string,string>>|No
Expand Down Expand Up @@ -102,6 +103,14 @@ Possible conversions are: `integer`, `float`, `date`, `date_time`, `boolean`
}
}

[id="plugins-{type}s-{plugin}-delimiter"]
===== `delimiter`

* Value type is <<string,string>>
* Default value is `"\n"`

Define the line delimiter.

[id="plugins-{type}s-{plugin}-include_headers"]
===== `include_headers`

Expand Down
61 changes: 41 additions & 20 deletions lib/logstash/codecs/csv.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/codecs/base"
require "logstash/util/charset"
require "logstash/util/buftok"
require "csv"

class LogStash::Codecs::CSV < LogStash::Codecs::Base
Expand Down Expand Up @@ -42,6 +43,9 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
# Defaults to false.
config :autodetect_column_names, :validate => :boolean, :default => false

# Define the line delimiter
config :delimiter, :validate => :string, :default => "\n"

# Define a set of datatype conversions to be applied to columns.
# Possible conversions are integer, float, date, date_time, boolean
#
Expand Down Expand Up @@ -88,6 +92,7 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
CONVERTERS.freeze

def register
@buffer = FileWatch::BufferedTokenizer.new(@delimiter)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using the buffered tokenizer is tricky here, because CSV data can have literal newlines as long as they are quoted. By pre-tokenizing before handing off to the CSV parser that has the context, we effectively prevent legal literal newlines from being used.

Additionally, input plugins that already use the BufferedTokenizer (e.g. File Input) will strip the newlines from their input before passing off each entry to the codec.

I think another approach would be to use CSV::parse instead of CSV::parse_line, and then iterate over the resulting entries.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL line breaks inside CSV values 😮

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CSV in practice is a giant mess, because it developed organically without a formal specification over the course of decades. When one product encountered an edge-case, they came up with a solution to their problem but often solved it in a way that introduced new and weirder edge cases (e.g., with no agreed-upon escape sequence, some solved the comma- and newline-in-field problem by adding quoting, which made quote characters magical and precludes our ability to pre-tokenize records)

Ruby's CSV implementation is pretty robust to the variety of data under the CSV umbrella, giving us options like row_sep to control the record delimiter, col_sep to control the field delimiter, quote_char to control how it understands quoted sequences, etc.

I included a recommendation in my previous review to use CSV::parse, instead of CSV::parse_line, because it is capable of handling multiple entries but otherwise remains the same (the parse_line variant simply ignores any additional records). We can still pipe through the "delimiter" option to CSV's row_sep parameter, and it should handle quoted literal row separators for us..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I did test CSV::parse and it does work correctly in handling both line breaks in columns and line breaks at end of line but it does not work for streaming input scenarios where the BufferedTokenizer is useful but breaks the line breaks in columns case. Will recap and continue discussion in main thread.

@converter = LogStash::Util::Charset.new(@charset)
@converter.logger = @logger

Expand All @@ -108,10 +113,41 @@ def register
@logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char)
end

def decode(data)
data = @converter.convert(data)
def decode(data, &block)
puts("*** data=#{data.inspect}")
@buffer.extract(data).each do |line|
puts("*** line=#{line.inspect}")
parse(@converter.convert(line), &block)
end
end

def encode(event)
if @include_headers
csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true)
@on_event.call(event, csv_data)

# output headers only once per codec lifecycle
@include_headers = false
end

csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char)
@on_event.call(event, csv_data)
end

def flush(&block)
remainder = @buffer.flush
if !remainder.empty?
puts("flush: remainder=#{remainder}")
parse(@converter.convert(remainder), &block)
end
end

private

def parse(line, &block)
begin
values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)
values = CSV.parse_line(line, :col_sep => @separator, :quote_char => @quote_char)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about something like:

CSV.parse(line, :col_sep => @separator, :quote_char => @quote_char).each do |values|
  next if values.nil?

  ## same implementation, using `next` instead of `return` when finding column names
end

return if values.nil?

if (@autodetect_column_names && @columns.empty?)
@columns = values
Expand All @@ -131,26 +167,11 @@ def decode(data)

yield LogStash::Event.new(decoded)
rescue CSV::MalformedCSVError => e
@logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"])
end
end

def encode(event)
if @include_headers
csv_data = CSV.generate_line(select_keys(event), :col_sep => @separator, :quote_char => @quote_char, :headers => true)
@on_event.call(event, csv_data)

# output headers only once per codec lifecycle
@include_headers = false
@logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line)
yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"])
end

csv_data = CSV.generate_line(select_values(event), :col_sep => @separator, :quote_char => @quote_char)
@on_event.call(event, csv_data)
end

private

def select_values(event)
if @columns.empty?
event.to_hash.values
Expand Down
2 changes: 1 addition & 1 deletion logstash-codec-csv.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-csv'
s.version = '1.0.0'
s.version = '1.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "The csv codec take CSV data, parses it and passes it away"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
56 changes: 43 additions & 13 deletions spec/codecs/csv_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,44 @@
end
end

describe "multiple lines" do
let(:data) { "big,bird\nsesame,street\nfoo,bar\n" }

it "return events from CSV data" do
events = []
codec.decode(data) {|event| events << event}
expect(events.size).to eq(3)
expect(events[0].get("column1")).to eq("big")
expect(events[0].get("column2")).to eq("bird")
expect(events[1].get("column1")).to eq("sesame")
expect(events[1].get("column2")).to eq("street")
expect(events[2].get("column1")).to eq("foo")
expect(events[2].get("column2")).to eq("bar")
end
end

describe "empty lines" do
let(:data) { "big,bird\n\n\nsesame,street\nfoo,bar\n\n\n" }

it "return events from CSV data" do
events = []
codec.decode(data) {|event| events << event}
expect(events.size).to eq(3)
end
end

describe "flush" do
let(:data) { "big,bird\nsesame,street" }

it "return events from CSV data" do
events = []
codec.decode(data) {|event| events << event}
expect(events.size).to eq(1)
codec.flush {|event| events << event}
expect(events.size).to eq(2)
end
end

describe "given column names" do
let(:doc) { "big,bird,sesame street" }
let(:config) do
Expand All @@ -37,9 +75,7 @@
end

context "parse csv skipping empty columns" do

let(:data) { "val1,,val3" }

let(:config) do
{ "skip_empty_columns" => true,
"columns" => ["custom1", "custom2", "custom3"] }
Expand All @@ -55,11 +91,12 @@
end

context "parse csv without autogeneration of names" do

let(:data) { "val1,val2,val3" }
let(:config) do
{ "autogenerate_column_names" => false,
"columns" => ["custom1", "custom2"] }
{
"autogenerate_column_names" => false,
"columns" => ["custom1", "custom2"]
}
end

it "extract all the values" do
Expand All @@ -70,12 +107,10 @@
end
end
end

end

describe "custom separator" do
let(:data) { "big,bird;sesame street" }

let(:config) do
{ "separator" => ";" }
end
Expand All @@ -92,7 +127,7 @@
let(:data) { "big,bird,'sesame street'" }

let(:config) do
{ "quote_char" => "'"}
{ "quote_char" => "'" }
end

it "return an event from CSV data" do
Expand Down Expand Up @@ -133,15 +168,12 @@
end

describe "having headers" do

let(:data) do
[ "size,animal,movie", "big,bird,sesame street"]
end

let(:new_data) do
[ "host,country,city", "example.com,germany,berlin"]
end

let(:config) do
{ "autodetect_column_names" => true }
end
Expand All @@ -157,7 +189,6 @@
end

describe "using field convertion" do

let(:config) do
{ "convert" => { "column1" => "integer", "column3" => "boolean" } }
end
Expand All @@ -172,7 +203,6 @@
end

context "when using column names" do

let(:config) do
{ "convert" => { "custom1" => "integer", "custom3" => "boolean" },
"columns" => ["custom1", "custom2", "custom3"] }
Expand Down