Skip to content

Commit e44b2fb

Browse files
authored
Merge branch 'main' into add-data-stream-lifecycle-retention-to-templates
2 parents 1b545ea + 1e67573 commit e44b2fb

File tree

245 files changed

+9951
-2692
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

245 files changed

+9951
-2692
lines changed

.coderabbit.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
22
reviews:
3+
# Do not post "Review skipped" on PRs without the labels below
4+
review_status: false
35
auto_review:
46
labels:
57
- "Team:Delivery"
Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.benchmark.compute.operator;
11+
12+
import org.apache.lucene.util.BytesRef;
13+
import org.elasticsearch.benchmark.Utils;
14+
import org.elasticsearch.common.breaker.NoopCircuitBreaker;
15+
import org.elasticsearch.common.util.BigArrays;
16+
import org.elasticsearch.compute.data.Block;
17+
import org.elasticsearch.compute.data.BlockFactory;
18+
import org.elasticsearch.compute.data.Page;
19+
import org.elasticsearch.xpack.esql.core.expression.Attribute;
20+
import org.elasticsearch.xpack.esql.core.expression.FieldAttribute;
21+
import org.elasticsearch.xpack.esql.core.tree.Source;
22+
import org.elasticsearch.xpack.esql.core.type.DataType;
23+
import org.elasticsearch.xpack.esql.core.type.EsField;
24+
import org.elasticsearch.xpack.esql.datasources.CloseableIterator;
25+
import org.elasticsearch.xpack.esql.datasources.ParallelParsingCoordinator;
26+
import org.elasticsearch.xpack.esql.datasources.spi.SegmentableFormatReader;
27+
import org.elasticsearch.xpack.esql.datasources.spi.SourceMetadata;
28+
import org.elasticsearch.xpack.esql.datasources.spi.StorageObject;
29+
import org.elasticsearch.xpack.esql.datasources.spi.StoragePath;
30+
import org.openjdk.jmh.annotations.Benchmark;
31+
import org.openjdk.jmh.annotations.BenchmarkMode;
32+
import org.openjdk.jmh.annotations.Fork;
33+
import org.openjdk.jmh.annotations.Level;
34+
import org.openjdk.jmh.annotations.Measurement;
35+
import org.openjdk.jmh.annotations.Mode;
36+
import org.openjdk.jmh.annotations.OutputTimeUnit;
37+
import org.openjdk.jmh.annotations.Param;
38+
import org.openjdk.jmh.annotations.Scope;
39+
import org.openjdk.jmh.annotations.Setup;
40+
import org.openjdk.jmh.annotations.State;
41+
import org.openjdk.jmh.annotations.TearDown;
42+
import org.openjdk.jmh.annotations.Warmup;
43+
import org.openjdk.jmh.infra.Blackhole;
44+
45+
import java.io.ByteArrayInputStream;
46+
import java.io.IOException;
47+
import java.io.InputStream;
48+
import java.nio.ByteBuffer;
49+
import java.nio.charset.StandardCharsets;
50+
import java.time.Instant;
51+
import java.util.List;
52+
import java.util.Map;
53+
import java.util.concurrent.ExecutorService;
54+
import java.util.concurrent.Executors;
55+
import java.util.concurrent.TimeUnit;
56+
57+
/**
58+
* Benchmarks parallel parsing throughput with varying parallelism levels.
59+
* Measures the coordinator overhead and scaling characteristics.
60+
*/
61+
@Warmup(iterations = 3)
62+
@Measurement(iterations = 5)
63+
@BenchmarkMode(Mode.AverageTime)
64+
@OutputTimeUnit(TimeUnit.MILLISECONDS)
65+
@State(Scope.Thread)
66+
@Fork(1)
67+
public class ParallelParsingBenchmark {
68+
69+
static {
70+
Utils.configureBenchmarkLogging();
71+
}
72+
73+
private static final BlockFactory BLOCK_FACTORY = BlockFactory.builder(BigArrays.NON_RECYCLING_INSTANCE)
74+
.breaker(new NoopCircuitBreaker("bench"))
75+
.build();
76+
77+
private static final List<Attribute> SCHEMA = List.of(
78+
new FieldAttribute(Source.EMPTY, "line", new EsField("line", DataType.KEYWORD, Map.of(), false, EsField.TimeSeriesFieldType.NONE))
79+
);
80+
81+
@Param({ "1", "2", "4" })
82+
public int parallelism;
83+
84+
@Param({ "50000", "200000" })
85+
public int lineCount;
86+
87+
private byte[] fileData;
88+
private ExecutorService executor;
89+
90+
@Setup(Level.Trial)
91+
public void setup() {
92+
StringBuilder sb = new StringBuilder();
93+
for (int i = 0; i < lineCount; i++) {
94+
sb.append("line-").append(String.format("%08d", i)).append(",value-").append(i % 1000).append("\n");
95+
}
96+
fileData = sb.toString().getBytes(StandardCharsets.UTF_8);
97+
executor = Executors.newFixedThreadPool(Math.max(parallelism, 1));
98+
}
99+
100+
@TearDown(Level.Trial)
101+
public void tearDown() {
102+
executor.shutdown();
103+
try {
104+
executor.awaitTermination(10, TimeUnit.SECONDS);
105+
} catch (InterruptedException e) {
106+
Thread.currentThread().interrupt();
107+
}
108+
}
109+
110+
@Benchmark
111+
public void parallelParse(Blackhole bh) throws Exception {
112+
StorageObject obj = new InMemoryStorageObject(fileData);
113+
BenchLineReader reader = new BenchLineReader();
114+
115+
try (
116+
CloseableIterator<Page> iter = ParallelParsingCoordinator.parallelRead(
117+
reader,
118+
obj,
119+
List.of("line"),
120+
1000,
121+
parallelism,
122+
executor,
123+
SCHEMA
124+
)
125+
) {
126+
while (iter.hasNext()) {
127+
Page page = iter.next();
128+
bh.consume(page.getPositionCount());
129+
page.releaseBlocks();
130+
}
131+
}
132+
}
133+
134+
private static class BenchLineReader implements SegmentableFormatReader {
135+
136+
@Override
137+
public long findNextRecordBoundary(InputStream stream) throws IOException {
138+
long consumed = 0;
139+
byte[] buf = new byte[8192];
140+
int bytesRead;
141+
while ((bytesRead = stream.read(buf, 0, buf.length)) > 0) {
142+
for (int i = 0; i < bytesRead; i++) {
143+
consumed++;
144+
if (buf[i] == '\n') {
145+
return consumed;
146+
}
147+
}
148+
}
149+
return -1;
150+
}
151+
152+
@Override
153+
public long minimumSegmentSize() {
154+
return 1;
155+
}
156+
157+
@Override
158+
public SourceMetadata metadata(StorageObject object) {
159+
return null;
160+
}
161+
162+
@Override
163+
public CloseableIterator<Page> read(StorageObject object, List<String> projectedColumns, int batchSize) throws IOException {
164+
return readSplit(object, projectedColumns, batchSize, false, true, SCHEMA);
165+
}
166+
167+
@Override
168+
public CloseableIterator<Page> readSplit(
169+
StorageObject object,
170+
List<String> projectedColumns,
171+
int batchSize,
172+
boolean skipFirstLine,
173+
boolean lastSplit,
174+
List<Attribute> resolvedAttributes
175+
) throws IOException {
176+
final byte[] data;
177+
try (InputStream stream = object.newStream()) {
178+
data = stream.readAllBytes();
179+
}
180+
181+
return new CloseableIterator<>() {
182+
private int pos = 0;
183+
private Page nextPage = null;
184+
185+
{
186+
// Skip first line if needed
187+
if (skipFirstLine) {
188+
while (pos < data.length && data[pos] != '\n') {
189+
pos++;
190+
}
191+
if (pos < data.length) {
192+
pos++; // skip the \n itself
193+
}
194+
}
195+
}
196+
197+
@Override
198+
public boolean hasNext() {
199+
if (nextPage != null) {
200+
return true;
201+
}
202+
nextPage = readBatch();
203+
return nextPage != null;
204+
}
205+
206+
@Override
207+
public Page next() {
208+
if (hasNext() == false) {
209+
throw new java.util.NoSuchElementException();
210+
}
211+
Page p = nextPage;
212+
nextPage = null;
213+
return p;
214+
}
215+
216+
private Page readBatch() {
217+
if (pos >= data.length) {
218+
return null;
219+
}
220+
int count = 0;
221+
// Find up to batchSize lines
222+
int[] lineStarts = new int[batchSize];
223+
int[] lineLengths = new int[batchSize];
224+
225+
while (count < batchSize && pos < data.length) {
226+
int lineStart = pos;
227+
// Find end of line
228+
while (pos < data.length && data[pos] != '\n') {
229+
pos++;
230+
}
231+
int lineLen = pos - lineStart;
232+
if (pos < data.length) {
233+
pos++; // skip \n
234+
}
235+
if (lineLen == 0) {
236+
continue; // skip empty lines
237+
}
238+
lineStarts[count] = lineStart;
239+
lineLengths[count] = lineLen;
240+
count++;
241+
}
242+
243+
if (count == 0) {
244+
return null;
245+
}
246+
247+
// Build BytesRef views directly into the buffer - zero copy
248+
try (var builder = BLOCK_FACTORY.newBytesRefBlockBuilder(count)) {
249+
BytesRef ref = new BytesRef();
250+
ref.bytes = data;
251+
for (int i = 0; i < count; i++) {
252+
ref.offset = lineStarts[i];
253+
ref.length = lineLengths[i];
254+
builder.appendBytesRef(ref);
255+
}
256+
Block block = builder.build();
257+
return new Page(count, block);
258+
}
259+
}
260+
261+
@Override
262+
public void close() {
263+
// data buffer will be GC'd
264+
}
265+
};
266+
}
267+
268+
@Override
269+
public String formatName() {
270+
return "bench-line";
271+
}
272+
273+
@Override
274+
public List<String> fileExtensions() {
275+
return List.of(".txt");
276+
}
277+
278+
@Override
279+
public void close() {}
280+
}
281+
282+
private static class InMemoryStorageObject implements StorageObject {
283+
private final byte[] data;
284+
285+
InMemoryStorageObject(byte[] data) {
286+
this.data = data;
287+
}
288+
289+
@Override
290+
public InputStream newStream() {
291+
return new ByteArrayInputStream(data);
292+
}
293+
294+
@Override
295+
public InputStream newStream(long position, long length) {
296+
return new ByteArrayInputStream(data, (int) position, (int) length);
297+
}
298+
299+
@Override
300+
public int readBytes(long position, ByteBuffer target) {
301+
int pos = (int) position;
302+
if (pos >= data.length) {
303+
return -1;
304+
}
305+
int len = Math.min(target.remaining(), data.length - pos);
306+
target.put(data, pos, len);
307+
return len;
308+
}
309+
310+
@Override
311+
public long length() {
312+
return data.length;
313+
}
314+
315+
@Override
316+
public Instant lastModified() {
317+
return Instant.EPOCH;
318+
}
319+
320+
@Override
321+
public boolean exists() {
322+
return true;
323+
}
324+
325+
@Override
326+
public StoragePath path() {
327+
return StoragePath.of("mem://bench");
328+
}
329+
}
330+
}

docs/changelog/140217.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 140217
2+
summary: "ESQL: Support intra-row field references in ROW command"
3+
area: ES|QL
4+
type: enhancement
5+
issues:
6+
- 140119

docs/changelog/141050.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: "ES|QL"
2+
issues: []
3+
pr: 141050
4+
summary: Add Views Security Model
5+
type: enhancement

docs/changelog/143460.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: [141920, 141925]
3+
pr: 143460
4+
summary: Prevent pushdown of unmapped fields in filters and sorts
5+
type: "bug"

docs/changelog/143567.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
area: Inference
2+
issues: []
3+
pr: 143567
4+
summary: "[Inference API] Update authorized endpoints when their fingerprint or version\
5+
\ changed"
6+
type: enhancement

docs/changelog/143673.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 143673
2+
summary: Add mode and codec fields to Stack Monitoring index template
3+
area: Monitoring
4+
type: enhancement
5+
issues: []

docs/changelog/143696.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 143696
4+
summary: Enable distributed pipeline breakers for external sources via `FragmentExec`
5+
type: enhancement

docs/changelog/143700.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 143700
4+
summary: Buffer reuse in `ParquetStorageObjectAdapter` and `StorageObject`
5+
type: enhancement

docs/changelog/143703.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
area: ES|QL
2+
issues: []
3+
pr: 143703
4+
summary: Add positional `readBytes` API to `StorageObject` SPI
5+
type: enhancement

0 commit comments

Comments
 (0)