Skip to content

Commit e6ef491

Browse files
committed
parser_json: Add stream_buffer_size config param
Allow configuration of the size of the buffer that Yajl uses when parsing streaming input. The advantage of this is that when using `out_exec_filter`, and parsing as JSON, it's now possible to configure this plugin to avoid having to wait for 8092 bytes of data to be parsed before events are emitted. Configuration in the `out_exec_filter` tests has been modified to use this parameter, as it shaves 60 seconds off the test run time. Signed-off-by: Ben Wheatley <contact@benwh.com>
1 parent fe59adc commit e6ef491

File tree

4 files changed

+38
-1
lines changed

4 files changed

+38
-1
lines changed

lib/fluent/plugin/out_exec_filter.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ class ExecFilterOutput < Output
9595
COMPAT_PARSE_PARAMS = {
9696
'out_format' => '@type',
9797
'out_keys' => 'keys',
98+
'out_stream_buffer_size' => 'stream_buffer_size',
9899
}
99100
COMPAT_EXTRACT_PARAMS = {
100101
'out_tag_key' => 'tag_key',

lib/fluent/plugin/parser_json.rb

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ class JSONParser < Parser
3030
desc 'Set JSON parser'
3131
config_param :json_parser, :enum, list: [:oj, :yajl, :json], default: :oj
3232

33+
# The Yajl library defines a default buffer size of 8092 when parsing
34+
# from IO streams, so maintain this for backwards-compatibility.
35+
# https://www.rubydoc.info/github/brianmario/yajl-ruby/Yajl%2FParser:parse
36+
config_param :stream_buffer_size, :integer, default: 8092
37+
3338
config_set_default :time_type, :float
3439

3540
def configure(conf)
@@ -81,7 +86,7 @@ def parse_io(io, &block)
8186
y.on_parse_complete = ->(record){
8287
block.call(parse_time(record), record)
8388
}
84-
y.parse(io)
89+
y.parse(io, @stream_buffer_size)
8590
end
8691
end
8792
end

test/plugin/test_out_exec_filter.rb

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ def create_driver(conf)
328328
</format>
329329
<parse>
330330
@type json
331+
stream_buffer_size 1
331332
</parse>
332333
<extract>
333334
tag_key tag
@@ -338,6 +339,7 @@ def create_driver(conf)
338339
command cat
339340
in_keys message
340341
out_format json
342+
out_stream_buffer_size 1
341343
time_key time
342344
tag_key tag
343345
]
@@ -372,6 +374,7 @@ def create_driver(conf)
372374
</format>
373375
<parse>
374376
@type json
377+
stream_buffer_size 1
375378
</parse>
376379
<extract>
377380
tag_key tag
@@ -382,6 +385,7 @@ def create_driver(conf)
382385
command cat
383386
in_keys message
384387
out_format json
388+
out_stream_buffer_size 1
385389
time_key time
386390
tag_key tag
387391
]
@@ -414,6 +418,7 @@ def create_driver(conf)
414418
</format>
415419
<parse>
416420
@type json
421+
stream_buffer_size 1
417422
</parse>
418423
<extract>
419424
tag_key tag
@@ -426,6 +431,7 @@ def create_driver(conf)
426431
command cat
427432
in_keys message
428433
out_format json
434+
out_stream_buffer_size 1
429435
time_key time
430436
time_format %d/%b/%Y %H:%M:%S.%N %z
431437
tag_key tag

test/plugin/test_parser_json.rb

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,4 +111,29 @@ def test_parse_with_keep_time_key_without_time_format(data)
111111
assert_equal text, record['time']
112112
end
113113
end
114+
115+
data('yajl' => 'yajl')
116+
def test_yajl_parse_io_with_buffer_smaller_than_input(data)
117+
parser = Fluent::Test::Driver::Parser.new(Fluent::Plugin::JSONParser)
118+
parser.configure(
119+
'keep_time_key' => 'true',
120+
'json_parser' => data,
121+
'stream_buffer_size' => 1,
122+
)
123+
text = "100"
124+
125+
waiting(5) do
126+
rd, wr = IO.pipe
127+
wr.write "{\"time\":\"#{text}\"}"
128+
129+
parser.instance.parse_io(rd) do |time, record|
130+
assert_equal text.to_i, time.sec
131+
assert_equal text, record['time']
132+
133+
# Once a record has been received the 'write' end of the pipe must be
134+
# closed, otherwise the test will block waiting for more input.
135+
wr.close
136+
end
137+
end
138+
end
114139
end

0 commit comments

Comments
 (0)