Skip to content

Commit ed40220

Browse files
authored
Implement BufferedTokenizer to return an iterable that can verify size limit for every token emitted (#17229)
Reimplements BufferedTokenizer to leverage pure Java classes instead of use JRuby runtime's classes. Reimplement the `BufferedTokenizerExt` in pure Java using an iterable, while the `BufferedTokenizerExt` is a shell around this new class. The principal method `extract`, which dice the data by separator, instead of returning a `RubyArray` now return an `Iterable` which applies the size limit check. The returned iterable is made of `DataSplitter` iterator which accumulates data in a `StringBuilder` and then dice the data by separator. This iterator moves the size check limit on the read side (before was on the write phase), however, it implements a logic to avoid accumulating segments when the size limit has been already passed on during the writing side. To be compliant with some usage patterns which expect an `empty?` method to be present in the returned object from `extract`, like [this](https://github.com/logstash-plugins/logstash-input-file/blob/55a4a7099f05f29351672417036c1342850c7adc/lib/filewatch/watched_file.rb#L250), the `extract` method of the `BufferedTokenizerExt` return an `Iterable` adapter custom class with such method.
1 parent 443e527 commit ed40220

File tree

11 files changed

+628
-403
lines changed

11 files changed

+628
-403
lines changed
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch B.V. licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.logstash.benchmark;
21+
22+
import org.logstash.common.BufferedTokenizer;
23+
import org.openjdk.jmh.annotations.Benchmark;
24+
import org.openjdk.jmh.annotations.BenchmarkMode;
25+
import org.openjdk.jmh.annotations.Fork;
26+
import org.openjdk.jmh.annotations.Level;
27+
import org.openjdk.jmh.annotations.Measurement;
28+
import org.openjdk.jmh.annotations.Mode;
29+
import org.openjdk.jmh.annotations.OutputTimeUnit;
30+
import org.openjdk.jmh.annotations.Scope;
31+
import org.openjdk.jmh.annotations.Setup;
32+
import org.openjdk.jmh.annotations.State;
33+
import org.openjdk.jmh.annotations.Warmup;
34+
import org.openjdk.jmh.infra.Blackhole;
35+
36+
import java.util.concurrent.TimeUnit;
37+
38+
39+
@Warmup(iterations = 3, time = 100, timeUnit = TimeUnit.MILLISECONDS)
40+
@Measurement(iterations = 10, time = 3000, timeUnit = TimeUnit.MILLISECONDS)
41+
@Fork(1)
42+
@BenchmarkMode(Mode.Throughput)
43+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
44+
@State(Scope.Thread)
45+
public class BufferedTokenizerBenchmark {
46+
47+
private BufferedTokenizer sut;
48+
private String singleTokenPerFragment;
49+
private String multipleTokensPerFragment;
50+
private String multipleTokensSpreadMultipleFragments_1;
51+
private String multipleTokensSpreadMultipleFragments_2;
52+
private String multipleTokensSpreadMultipleFragments_3;
53+
54+
@Setup(Level.Invocation)
55+
public void setUp() {
56+
sut = new BufferedTokenizer();
57+
singleTokenPerFragment = "a".repeat(512) + "\n";
58+
59+
multipleTokensPerFragment = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(512) + "\n";
60+
61+
multipleTokensSpreadMultipleFragments_1 = "a".repeat(512) + "\n" + "b".repeat(512) + "\n" + "c".repeat(256);
62+
multipleTokensSpreadMultipleFragments_2 = "c".repeat(256) + "\n" + "d".repeat(512) + "\n" + "e".repeat(256);
63+
multipleTokensSpreadMultipleFragments_3 = "f".repeat(256) + "\n" + "g".repeat(512) + "\n" + "h".repeat(512) + "\n";
64+
}
65+
66+
@Benchmark
67+
public final void onlyOneTokenPerFragment(Blackhole blackhole) {
68+
Iterable<String> tokens = sut.extract(singleTokenPerFragment);
69+
tokens.forEach(blackhole::consume);
70+
blackhole.consume(tokens);
71+
}
72+
73+
@Benchmark
74+
public final void multipleTokenPerFragment(Blackhole blackhole) {
75+
Iterable<String> tokens = sut.extract(multipleTokensPerFragment);
76+
tokens.forEach(blackhole::consume);
77+
blackhole.consume(tokens);
78+
}
79+
80+
@Benchmark
81+
public final void multipleTokensCrossingMultipleFragments(Blackhole blackhole) {
82+
Iterable<String> tokens = sut.extract(multipleTokensSpreadMultipleFragments_1);
83+
tokens.forEach(t -> {});
84+
blackhole.consume(tokens);
85+
86+
tokens = sut.extract(multipleTokensSpreadMultipleFragments_2);
87+
tokens.forEach(t -> {});
88+
blackhole.consume(tokens);
89+
90+
tokens = sut.extract(multipleTokensSpreadMultipleFragments_3);
91+
tokens.forEach(blackhole::consume);
92+
blackhole.consume(tokens);
93+
}
94+
}

logstash-core/benchmarks/src/main/java/org/logstash/benchmark/BufferedTokenizerExtBenchmark.java

Lines changed: 0 additions & 83 deletions
This file was deleted.

logstash-core/spec/logstash/util/buftok_spec.rb

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,27 +20,37 @@
2020
describe FileWatch::BufferedTokenizer do
2121
subject { FileWatch::BufferedTokenizer.new }
2222

23+
24+
# A matcher that ensures the result of BufferedTokenizer#extract "quacks like" an expected ruby Array in two respects:
25+
# - #empty? -> boolean: true indicates that the _next_ Enumerable#each will emit zero items.
26+
# - #entries -> Array: the ordered entries
27+
def emit_exactly(expected_array)
28+
# note: order matters; Iterator#each and the methods that delegate to it consume the iterator
29+
have_attributes(:empty? => expected_array.empty?,
30+
:entries => expected_array.entries) # consumes iterator, must be done last
31+
end
32+
2333
it "should tokenize a single token" do
24-
expect(subject.extract("foo\n")).to eq(["foo"])
34+
expect(subject.extract("foo\n")).to emit_exactly(["foo"])
2535
end
2636

2737
it "should merge multiple token" do
28-
expect(subject.extract("foo")).to eq([])
29-
expect(subject.extract("bar\n")).to eq(["foobar"])
38+
expect(subject.extract("foo")).to emit_exactly([])
39+
expect(subject.extract("bar\n")).to emit_exactly(["foobar"])
3040
end
3141

3242
it "should tokenize multiple token" do
33-
expect(subject.extract("foo\nbar\n")).to eq(["foo", "bar"])
43+
expect(subject.extract("foo\nbar\n")).to emit_exactly(["foo", "bar"])
3444
end
3545

3646
it "should ignore empty payload" do
37-
expect(subject.extract("")).to eq([])
38-
expect(subject.extract("foo\nbar")).to eq(["foo"])
47+
expect(subject.extract("")).to emit_exactly([])
48+
expect(subject.extract("foo\nbar")).to emit_exactly(["foo"])
3949
end
4050

4151
it "should tokenize empty payload with newline" do
42-
expect(subject.extract("\n")).to eq([""])
43-
expect(subject.extract("\n\n\n")).to eq(["", "", ""])
52+
expect(subject.extract("\n")).to emit_exactly([""])
53+
expect(subject.extract("\n\n\n")).to emit_exactly(["", "", ""])
4454
end
4555

4656
describe 'flush' do
@@ -83,12 +93,12 @@
8393
let(:delimiter) { "||" }
8494

8595
it "should tokenize multiple token" do
86-
expect(subject.extract("foo||b|r||")).to eq(["foo", "b|r"])
96+
expect(subject.extract("foo||b|r||")).to emit_exactly(["foo", "b|r"])
8797
end
8898

8999
it "should ignore empty payload" do
90-
expect(subject.extract("")).to eq([])
91-
expect(subject.extract("foo||bar")).to eq(["foo"])
100+
expect(subject.extract("")).to emit_exactly([])
101+
expect(subject.extract("foo||bar")).to emit_exactly(["foo"])
92102
end
93103
end
94104
end

logstash-core/src/main/java/org/logstash/RubyUtil.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.logstash.ackedqueue.ext.JRubyWrappedAckedQueueExt;
3535
import org.logstash.common.AbstractDeadLetterQueueWriterExt;
3636
import org.logstash.common.BufferedTokenizerExt;
37+
import org.logstash.common.BufferedTokenizer;
3738
import org.logstash.config.ir.compiler.AbstractFilterDelegatorExt;
3839
import org.logstash.config.ir.compiler.AbstractOutputDelegatorExt;
3940
import org.logstash.config.ir.compiler.FilterDelegatorExt;

0 commit comments

Comments
 (0)