Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
## 1.0.1
- Fixed parsing of line delimited data [#8](https://github.com/logstash-plugins/logstash-codec-csv/pull/8)
## 1.0.0
- Fixed dependencies to work with logstash v6 and up. Overhauled to match features of the CSV Filter. Improved spec coverage [#4](https://github.com/logstash-plugins/logstash-codec-csv/pull/4)
## 0.1.5
Expand Down
9 changes: 9 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ The csv codec takes CSV data, parses it and passes it along.
| <<plugins-{type}s-{plugin}-charset>> |<<string,string>>, one of `["ASCII-8BIT", "UTF-8", "US-ASCII", "Big5", "Big5-HKSCS", "Big5-UAO", "CP949", "Emacs-Mule", "EUC-JP", "EUC-KR", "EUC-TW", "GB2312", "GB18030", "GBK", "ISO-8859-1", "ISO-8859-2", "ISO-8859-3", "ISO-8859-4", "ISO-8859-5", "ISO-8859-6", "ISO-8859-7", "ISO-8859-8", "ISO-8859-9", "ISO-8859-10", "ISO-8859-11", "ISO-8859-13", "ISO-8859-14", "ISO-8859-15", "ISO-8859-16", "KOI8-R", "KOI8-U", "Shift_JIS", "UTF-16BE", "UTF-16LE", "UTF-32BE", "UTF-32LE", "Windows-31J", "Windows-1250", "Windows-1251", "Windows-1252", "IBM437", "IBM737", "IBM775", "CP850", "IBM852", "CP852", "IBM855", "CP855", "IBM857", "IBM860", "IBM861", "IBM862", "IBM863", "IBM864", "IBM865", "IBM866", "IBM869", "Windows-1258", "GB1988", "macCentEuro", "macCroatian", "macCyrillic", "macGreek", "macIceland", "macRoman", "macRomania", "macThai", "macTurkish", "macUkraine", "CP950", "CP951", "IBM037", "stateless-ISO-2022-JP", "eucJP-ms", "CP51932", "EUC-JIS-2004", "GB12345", "ISO-2022-JP", "ISO-2022-JP-2", "CP50220", "CP50221", "Windows-1256", "Windows-1253", "Windows-1255", "Windows-1254", "TIS-620", "Windows-874", "Windows-1257", "MacJapanese", "UTF-7", "UTF8-MAC", "UTF-16", "UTF-32", "UTF8-DoCoMo", "SJIS-DoCoMo", "UTF8-KDDI", "SJIS-KDDI", "ISO-2022-JP-KDDI", "stateless-ISO-2022-JP-KDDI", "UTF8-SoftBank", "SJIS-SoftBank", "BINARY", "CP437", "CP737", "CP775", "IBM850", "CP857", "CP860", "CP861", "CP862", "CP863", "CP864", "CP865", "CP866", "CP869", "CP1258", "Big5-HKSCS:2008", "ebcdic-cp-us", "eucJP", "euc-jp-ms", "EUC-JISX0213", "eucKR", "eucTW", "EUC-CN", "eucCN", "CP936", "ISO2022-JP", "ISO2022-JP2", "ISO8859-1", "ISO8859-2", "ISO8859-3", "ISO8859-4", "ISO8859-5", "ISO8859-6", "CP1256", "ISO8859-7", "CP1253", "ISO8859-8", "CP1255", "ISO8859-9", "CP1254", "ISO8859-10", "ISO8859-11", "CP874", "ISO8859-13", "CP1257", "ISO8859-14", "ISO8859-15", "ISO8859-16", "CP878", "MacJapan", "ASCII", "ANSI_X3.4-1968", "646", "CP65000", "CP65001", "UTF-8-MAC", "UTF-8-HFS", "UCS-2BE", "UCS-4BE", "UCS-4LE", "CP932", "csWindows31J", "SJIS", "PCK", "CP1250", "CP1251", "CP1252", "external", "locale"]`|No
| <<plugins-{type}s-{plugin}-columns>> |<<array,array>>|No
| <<plugins-{type}s-{plugin}-convert>> |<<hash,hash>>|No
| <<plugins-{type}s-{plugin}-delimiter>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-include_headers>> |<<boolean,boolean>>|No
| <<plugins-{type}s-{plugin}-quote_char>> |<<string,string>>|No
| <<plugins-{type}s-{plugin}-separator>> |<<string,string>>|No
Expand Down Expand Up @@ -102,6 +103,14 @@ Possible conversions are: `integer`, `float`, `date`, `date_time`, `boolean`
}
}

[id="plugins-{type}s-{plugin}-delimiter"]
===== `delimiter`

* Value type is <<string,string>>
* Default value is `"\n"`

Define the line delimiter.

[id="plugins-{type}s-{plugin}-include_headers"]
===== `include_headers`

Expand Down
82 changes: 58 additions & 24 deletions lib/logstash/codecs/csv.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# encoding: utf-8
require "logstash/codecs/base"
require "logstash/util/charset"
require "logstash/util/buftok"
require "csv"

class LogStash::Codecs::CSV < LogStash::Codecs::Base
Expand Down Expand Up @@ -42,6 +43,9 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
# Defaults to false.
config :autodetect_column_names, :validate => :boolean, :default => false

# Define the line delimiter
config :delimiter, :validate => :string, :default => "\n"

# Define a set of datatype conversions to be applied to columns.
# Possible conversions are integer, float, date, date_time, boolean
#
Expand All @@ -58,6 +62,12 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
# "CP1252".
config :charset, :validate => ::Encoding.name_list, :default => "UTF-8"

# The input type the associated input plugin is providing. Inputs such as the file or http input
# plugins which provide complete data chunks such as lines or documents to the codec need the `line`
# input type, while other inputs such as stdin or tcp where data can be incomplete across data
# chunks need to use the 'stream' input type.
config :input_type, :validate => ["line", "stream"], :default => "line"

CONVERTERS = {
:integer => lambda do |value|
CSV::Converters[:integer].call(value)
Expand Down Expand Up @@ -88,6 +98,10 @@ class LogStash::Codecs::CSV < LogStash::Codecs::Base
CONVERTERS.freeze

def register
@streaming = @input_type == "stream"
if @streaming
@buffer = FileWatch::BufferedTokenizer.new(@delimiter)
end
@converter = LogStash::Util::Charset.new(@charset)
@converter.logger = @logger

Expand All @@ -108,31 +122,13 @@ def register
@logger.debug? && @logger.debug("CSV parsing options", :col_sep => @separator, :quote_char => @quote_char)
end

def decode(data)
data = @converter.convert(data)
begin
values = CSV.parse_line(data, :col_sep => @separator, :quote_char => @quote_char)

if (@autodetect_column_names && @columns.empty?)
@columns = values
@logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect)
return
def decode(data, &block)
if @streaming
@buffer.extract(data).each do |line|
parse(@converter.convert(line), &block)
end

decoded = {}
values.each_index do |i|
unless (@skip_empty_columns && (values[i].nil? || values[i].empty?))
unless ignore_field?(i)
field_name = @columns[i] || "column#{i + 1}"
decoded[field_name] = transform(field_name, values[i])
end
end
end

yield LogStash::Event.new(decoded)
rescue CSV::MalformedCSVError => e
@logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => data)
yield LogStash::Event.new("message" => data, "tags" => ["_csvparsefailure"])
else
parse(@converter.convert(data), &block)
end
end

Expand All @@ -149,8 +145,46 @@ def encode(event)
@on_event.call(event, csv_data)
end

def flush(&block)
if @streaming
remainder = @buffer.flush
if !remainder.empty?
parse(@converter.convert(remainder), &block)
end
end
end

private

def parse(line, &block)
begin
CSV.parse(line, :col_sep => @separator, :quote_char => @quote_char).each do |values|
next if values.nil? || values.empty?

if (@autodetect_column_names && @columns.empty?)
@columns = values
@logger.debug? && @logger.debug("Auto detected the following columns", :columns => @columns.inspect)
next
end

decoded = {}
values.each_index do |i|
unless (@skip_empty_columns && (values[i].nil? || values[i].empty?))
unless ignore_field?(i)
field_name = @columns[i] || "column#{i + 1}"
decoded[field_name] = transform(field_name, values[i])
end
end
end

yield LogStash::Event.new(decoded)
rescue CSV::MalformedCSVError => e
@logger.error("CSV parse failure. Falling back to plain-text", :error => e, :data => line)
yield LogStash::Event.new("message" => line, "tags" => ["_csvparsefailure"])
end
end
end

def select_values(event)
if @columns.empty?
event.to_hash.values
Expand Down
2 changes: 1 addition & 1 deletion logstash-codec-csv.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-codec-csv'
s.version = '1.0.0'
s.version = '1.0.1'
s.licenses = ['Apache License (2.0)']
s.summary = "The csv codec take CSV data, parses it and passes it away"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
Loading