forked from fluent/fluentd
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparser_json.rb
More file actions
105 lines (89 loc) · 3.05 KB
/
parser_json.rb
File metadata and controls
105 lines (89 loc) · 3.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#
# Fluentd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
require 'fluent/plugin/parser'
require 'fluent/time'
require 'fluent/oj_options'
require 'yajl'
require 'json'
module Fluent
module Plugin
class JSONParser < Parser
Plugin.register_parser('json', self)
config_set_default :time_key, 'time'
desc 'Set JSON parser'
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj
# The Yajl library defines a default buffer size of 8KiB when parsing
# from IO streams, so maintain this for backwards-compatibility.
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
desc 'Set the buffer size that Yajl will use when parsing streaming input'
config_param :stream_buffer_size, :integer, default: 8192
config_set_default :time_type, :float
def configure(conf)
if conf.has_key?('time_format')
conf['time_type'] ||= 'string'
end
super
@load_proc, @error_class = configure_json_parser(@json_parser)
end
def configure_json_parser(name)
case name
when :oj
return [Oj.method(:load), Oj::ParseError] if Fluent::OjOptions.available?
log&.info "Oj is not installed, and failing back to JSON for json parser"
configure_json_parser(:json)
when :json then [JSON.method(:parse), JSON::ParserError]
when :yajl then [Yajl.method(:load), Yajl::ParseError]
else
raise "BUG: unknown json parser specified: #{name}"
end
end
def parse(text)
parsed_json = @load_proc.call(text)
if parsed_json.is_a?(Hash)
time, record = parse_one_record(parsed_json)
yield time, record
elsif parsed_json.is_a?(Array)
parsed_json.each do |record|
unless record.is_a?(Hash)
yield nil, nil
next
end
time, parsed_record = parse_one_record(record)
yield time, parsed_record
end
else
yield nil, nil
end
rescue @error_class, EncodingError # EncodingError is for oj 3.x or later
yield nil, nil
end
def parse_one_record(record)
time = parse_time(record)
convert_values(time, record)
end
def parser_type
:text
end
def parse_io(io)
y = Yajl::Parser.new
y.on_parse_complete = ->(record){
yield(parse_time(record), record)
}
y.parse(io, @stream_buffer_size)
end
end
end
end