diff --git a/CHANGELOG.md b/CHANGELOG.md index f7b229f..2166042 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +## 1.0.1 + - Fixed parsing of line delimited data [#8](https://github.com/logstash-plugins/logstash-codec-csv/pull/8) ## 1.0.0 - Fixed dependencies to work with logstash v6 and up. Overhauled to match features of the CSV Filter. Improved spec coverage [#4](https://github.com/logstash-plugins/logstash-codec-csv/pull/4) ## 0.1.5 diff --git a/docs/index.asciidoc b/docs/index.asciidoc index a2aba1e..67eb268 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -33,6 +33,7 @@ The csv codec takes CSV data, parses it and passes it along. | <> |<>, one of `["ASCII-8BIT", "UTF-8", "US-ASCII", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB2312", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-31J", "Windows-1250", "Windows-1251", "Windows-1252", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "IBM037", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "EUC-JIS-2004", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "ebcdic-cp-us", "eucJP", "euc-jp-ms", "EUC-JISX0213", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "ISO8859-2", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP932", "csWindows31J", "SJIS", "PCK", "CP1250", "CP1251", "CP1252", "external", "locale"]`|No | <> |<>|No | <> |<>|No +| <> |<>|No | <> |<>|No | <> |<>|No | <> |<>|No @@ -102,6 +103,14 @@ Possible conversions are: `integer`, `float`, `date`, `date_time`, `boolean` } } +[id="plugins-{type}s-{plugin}-delimiter"] +===== `delimiter` + + * Value type is <> + * Default value is `"\n"` + +Define the line delimiter. + [id="plugins-{type}s-{plugin}-include_headers"] ===== `include_headers` diff --git a/lib/logstash/codecs/csv.rb b/lib/logstash/codecs/csv.rb index 07d6416..0509aaf 100644 --- a/lib/logstash/codecs/csv.rb +++ b/lib/logstash/codecs/csv.rb @@ -1,6 +1,7 @@ # encoding: utf-8 require "logstash/codecs/base" require "logstash/util/charset" +require "logstash/util/buftok" require "csv" class LogStash::Codecs::CSV < LogStash::Codecs::Base @@ -42,6 +43,9 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base # Defaults to false. config :autodetect_column_names, :validate => :boolean, :default => false + # Define the line delimiter + config :delimiter, :validate => :string, :default => "\n" + # Define a set of datatype conversions to be applied to columns. # Possible conversions are integer, float, date, date_time, boolean # @@ -58,6 +62,12 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base # "CP1252". config :charset, :validate => ::Encoding.name_list, :default => "UTF-8" + # The input type the associated input plugin is providing. Inputs such as the file or http input + # plugins which provide complete data chunks such as lines or documents to the codec need the `line` + # input type, while other inputs such as stdin or tcp where data can be incomplete across data + # chunks need to use the 'stream' input type. + config :input_type, :validate => ["line", "stream"], :default => "line" + CONVERTERS = { :integer => lambda do |value| CSV::Converters[:integer].call(value) @@ -88,6 +98,10 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base CONVERTERS.freeze def register + @streaming = @input_type == "stream" + if @streaming + @buffer = FileWatch::BufferedTokenizer.new(@delimiter) + end @converter = LogStash::Util::Charset.new(@charset) @converter.logger = @logger @@ -108,31 +122,13 @@ def register @logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char) end - def decode(data) - data = @converter.convert(data) - begin - values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char) - - if (@autodetect_column_names && @columns.empty?) - @columns = values - @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect) - return + def decode(data, &block) + if @streaming + @buffer.extract(data).each do |line| + parse(@converter.convert(line), &block) end - - decoded = {} - values.each_index do |i| - unless (@skip_empty_columns && (values[i].nil? || values[i].empty?)) - unless ignore_field?(i) - field_name = @columns[i] || "column#{i + 1}" - decoded[field_name] = transform(field_name, values[i]) - end - end - end - - yield LogStash::Event.new(decoded) - rescue CSV::MalformedCSVError => e - @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data) - yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"]) + else + parse(@converter.convert(data), &block) end end @@ -149,8 +145,46 @@ def encode(event) @on_event.call(event, csv_data) end + def flush(&block) + if @streaming + remainder = @buffer.flush + if !remainder.empty? + parse(@converter.convert(remainder), &block) + end + end + end + private + def parse(line, &block) + begin + CSV.parse(line, :col_sep => @separator, :quote_char => @quote_char).each do |values| + next if values.nil? || values.empty? + + if (@autodetect_column_names && @columns.empty?) + @columns = values + @logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect) + next + end + + decoded = {} + values.each_index do |i| + unless (@skip_empty_columns && (values[i].nil? || values[i].empty?)) + unless ignore_field?(i) + field_name = @columns[i] || "column#{i + 1}" + decoded[field_name] = transform(field_name, values[i]) + end + end + end + + yield LogStash::Event.new(decoded) + rescue CSV::MalformedCSVError => e + @logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line) + yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"]) + end + end + end + def select_values(event) if @columns.empty? event.to_hash.values diff --git a/logstash-codec-csv.gemspec b/logstash-codec-csv.gemspec index 6418a21..6c86eda 100644 --- a/logstash-codec-csv.gemspec +++ b/logstash-codec-csv.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-codec-csv' - s.version = '1.0.0' + s.version = '1.0.1' s.licenses = ['Apache License (2.0)'] s.summary = "The csv codec take CSV data, parses it and passes it away" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/codecs/csv_spec.rb b/spec/codecs/csv_spec.rb index 898b3a7..e3bde58 100644 --- a/spec/codecs/csv_spec.rb +++ b/spec/codecs/csv_spec.rb @@ -4,192 +4,300 @@ describe LogStash::Codecs::CSV do - subject(:codec) { LogStash::Codecs::CSV.new(config) } - let(:config) { Hash.new } + subject(:codec) { LogStash::Codecs::CSV.new(base_config.merge(config)) } + let(:config) { Hash.new } before(:each) do codec.register end - describe "decode" do - let(:data) { "big,bird,sesame street" } + shared_examples "decoding tests" do + describe "decode" do - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame street") + describe "single line" do + let(:data) { "big,bird,sesame street" } + + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame street") + end + end end - end - describe "given column names" do - let(:doc) { "big,bird,sesame street" } - let(:config) do - { "columns" => ["first", "last", "address" ] } + describe "multiple lines" do + let(:data) { "big,bird\nsesame,street\nfoo,bar\n" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) + expect(events[0].get("column1")).to eq("big") + expect(events[0].get("column2")).to eq("bird") + expect(events[1].get("column1")).to eq("sesame") + expect(events[1].get("column2")).to eq("street") + expect(events[2].get("column1")).to eq("foo") + expect(events[2].get("column2")).to eq("bar") + end end - it "extract all the values" do - codec.decode(data) do |event| - expect(event.get("first")).to eq("big") - expect(event.get("last")).to eq("bird") - expect(event.get("address")).to eq("sesame street") + describe "empty lines" do + let(:data) { "big,bird\n\n\nsesame,street\nfoo,bar\n\n\n" } + + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(3) end end - context "parse csv skipping empty columns" do + describe "flush" do + let(:data) { "big,bird\nsesame,street" } - let(:data) { "val1,,val3" } + it "return events from CSV data" do + events = [] + codec.decode(data) {|event| events << event} + codec.flush {|event| events << event} + expect(events.size).to eq(2) + end + end + describe "given column names" do + let(:data) { "big,bird,sesame street" } let(:config) do - { "skip_empty_columns" => true, - "columns" => ["custom1", "custom2", "custom3"] } + { "columns" => ["first", "last", "address" ] } end it "extract all the values" do codec.decode(data) do |event| - expect(event.get("custom1")).to eq("val1") - expect(event.to_hash).not_to include("custom2") - expect(event.get("custom3")).to eq("val3") + expect(event.get("first")).to eq("big") + expect(event.get("last")).to eq("bird") + expect(event.get("address")).to eq("sesame street") + end + end + + context "parse csv skipping empty columns" do + let(:data) { "val1,,val3" } + let(:config) do + { "skip_empty_columns" => true, + "columns" => ["custom1", "custom2", "custom3"] } + end + + it "extract all the values" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq("val1") + expect(event.to_hash).not_to include("custom2") + expect(event.get("custom3")).to eq("val3") + end end end - end - context "parse csv without autogeneration of names" do + context "parse csv without autogeneration of names" do + let(:data) { "val1,val2,val3" } + let(:config) do + { + "autogenerate_column_names" => false, + "columns" => ["custom1", "custom2"] + } + end + + it "extract all the values" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq("val1") + expect(event.get("custom2")).to eq("val2") + expect(event.get("column3")).to be_falsey + end + end + end + end - let(:data) { "val1,val2,val3" } + describe "custom separator" do + let(:data) { "big,bird;sesame street" } let(:config) do - { "autogenerate_column_names" => false, - "columns" => ["custom1", "custom2"] } + { "separator" => ";" } end - it "extract all the values" do + it "return an event from CSV data" do codec.decode(data) do |event| - expect(event.get("custom1")).to eq("val1") - expect(event.get("custom2")).to eq("val2") - expect(event.get("column3")).to be_falsey + expect(event.get("column1")).to eq("big,bird") + expect(event.get("column2")).to eq("sesame street") end end end - end - - describe "custom separator" do - let(:data) { "big,bird;sesame street" } + describe "quote char" do + let(:data) { "big,bird,'sesame street'" } - let(:config) do - { "separator" => ";" } - end + let(:config) do + { "quote_char" => "'" } + end - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big,bird") - expect(event.get("column2")).to eq("sesame street") + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame street") + end end - end - end - describe "quote char" do - let(:data) { "big,bird,'sesame street'" } + context "using the default one" do + let(:data) { 'big,bird,"sesame, street"' } + let(:config) { Hash.new } - let(:config) do - { "quote_char" => "'"} - end + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq("sesame, street") + end + end + end - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame street") + context "using a null" do + let(:data) { 'big,bird,"sesame" street' } + let(:config) do + { "quote_char" => "\x00" } + end + + it "return an event from CSV data" do + codec.decode(data) do |event| + expect(event.get("column1")).to eq("big") + expect(event.get("column2")).to eq("bird") + expect(event.get("column3")).to eq('"sesame" street') + end + end end end - context "using the default one" do - let(:data) { 'big,bird,"sesame, street"' } - let(:config) { Hash.new } + describe "having headers" do + let(:data) do + [ "size,animal,movie", "big,bird,sesame street"] + end + let(:new_data) do + [ "host,country,city", "example.com,germany,berlin"] + end + let(:config) do + { "autodetect_column_names" => true } + end - it "return an event from CSV data" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq("sesame, street") + it "include header information when requested" do + codec.decode(data[0]) # Read the headers + codec.decode(data[1]) do |event| + expect(event.get("size")).to eq("big") + expect(event.get("animal")).to eq("bird") + expect(event.get("movie")).to eq("sesame street") end end end - context "using a null" do - let(:data) { 'big,bird,"sesame" street' } + describe "using field convertion" do let(:config) do - { "quote_char" => "\x00" } + { "convert" => { "column1" => "integer", "column3" => "boolean" } } end + let(:data) { "1234,bird,false" } - it "return an event from CSV data" do + it "get converted values to the expected type" do codec.decode(data) do |event| - expect(event.get("column1")).to eq("big") + expect(event.get("column1")).to eq(1234) expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq('"sesame" street') + expect(event.get("column3")).to eq(false) end end - end - end - describe "having headers" do + context "when using column names" do + let(:config) do + { "convert" => { "custom1" => "integer", "custom3" => "boolean" }, + "columns" => ["custom1", "custom2", "custom3"] } + end - let(:data) do - [ "size,animal,movie", "big,bird,sesame street"] + it "get converted values to the expected type" do + codec.decode(data) do |event| + expect(event.get("custom1")).to eq(1234) + expect(event.get("custom2")).to eq("bird") + expect(event.get("custom3")).to eq(false) + end + end + end end + end + end # shared_examples - let(:new_data) do - [ "host,country,city", "example.com,germany,berlin"] - end + describe "line input_type" do + let(:base_config) { { "input_type" => "line" } } - let(:config) do - { "autodetect_column_names" => true } - end + include_examples "decoding tests" - it "include header information when requested" do - codec.decode(data[0]) # Read the headers - codec.decode(data[1]) do |event| - expect(event.get("size")).to eq("big") - expect(event.get("animal")).to eq("bird") - expect(event.get("movie")).to eq("sesame street") - end + context "line break in column" do + let(:data) { "\"a\",\"b\",\"c\"\n1,\"text\nwith line break\",2\n2,\"foo\",3\n" } + let(:config) { { "autodetect_column_names" => true } } + + it "is supported and return events" do + events = [] + codec.decode(data) {|event| events << event} + expect(events.size).to eq(2) + expect(events[0].get("a")).to eq("1") + expect(events[0].get("b")).to eq("text\nwith line break") end end + end - describe "using field convertion" do + describe "stream input_type" do + let(:base_config) { { "input_type" => "stream" } } - let(:config) do - { "convert" => { "column1" => "integer", "column3" => "boolean" } } - end - let(:data) { "1234,bird,false" } + include_examples "decoding tests" + + context "incomplete chunks with final line break" do + let(:chunks) { ["big,bi", "rd,sesame street\n"] } - it "get converted values to the expected type" do - codec.decode(data) do |event| - expect(event.get("column1")).to eq(1234) - expect(event.get("column2")).to eq("bird") - expect(event.get("column3")).to eq(false) + it "return an event from CSV data" do + events = [] + chunks.each do |chunk| + codec.decode(chunk) { |event| events << event } end + + expect(events.size).to eq(1) + expect(events[0].get("column1")).to eq("big") + expect(events[0].get("column2")).to eq("bird") + expect(events[0].get("column3")).to eq("sesame street") end + end - context "when using column names" do + context "incomplete chunks without final line break with flush" do + let(:chunks) { ["aaa,b", "bb,ccc\ndd", "d,ee", "e,fff"] } - let(:config) do - { "convert" => { "custom1" => "integer", "custom3" => "boolean" }, - "columns" => ["custom1", "custom2", "custom3"] } + it "return an event from CSV data" do + events = [] + chunks.each do |chunk| + codec.decode(chunk) { |event| events << event } end + expect(events.size).to eq(1) - it "get converted values to the expected type" do - codec.decode(data) do |event| - expect(event.get("custom1")).to eq(1234) - expect(event.get("custom2")).to eq("bird") - expect(event.get("custom3")).to eq(false) - end - end + codec.flush { |event| events << event } + expect(events.size).to eq(2) + + expect(events[0].get("column1")).to eq("aaa") + expect(events[0].get("column2")).to eq("bbb") + expect(events[0].get("column3")).to eq("ccc") + expect(events[1].get("column1")).to eq("ddd") + expect(events[1].get("column2")).to eq("eee") + expect(events[1].get("column3")).to eq("fff") + end + end + + context "line break in column" do + let(:data) { "\"a\",\"b\",\"c\"\n1,\"text\nwith line break\",2\n2,\"foo\",3\n" } + let(:config) { { "autodetect_column_names" => true } } + + it "is unsupported" do + expect{codec.decode(data)}.to raise_error(CSV::MalformedCSVError) end end end describe "encode" do + let(:base_config) { Hash.new } + context "not including headers" do let(:event) { LogStash::Event.new({"f1" => "v1", "f2" => "v2"}) }