Skip to content

Commit b7ab8f8

Browse files
authored
ESQL: Add row counts to profile results (#120134)
Closes #119969 - Rename "pages_in/out" to "pages_received/emitted", to standardize the name along most operators - **There are still "pages_processed" operators**, maybe it would make sense to also rename those? - Add "pages_received/emitted" to TopN operator, as it was missing that - Added "rows_received/emitted" to most operators - Added a test to ensure all operators with status provide those metrics
1 parent 1f182e7 commit b7ab8f8

File tree

37 files changed

+846
-191
lines changed

37 files changed

+846
-191
lines changed

benchmarks/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import org.elasticsearch.gradle.internal.test.TestUtil
2+
import org.elasticsearch.gradle.OS
23

34
/*
45
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
@@ -77,7 +78,7 @@ tasks.register("copyPainless", Copy) {
7778
}
7879

7980
tasks.named("run").configure {
80-
executable = "${buildParams.runtimeJavaHome.get()}/bin/java"
81+
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
8182
args << "-Dplugins.dir=${buildDir}/plugins" << "-Dtests.index=${buildDir}/index"
8283
dependsOn "copyExpression", "copyPainless", configurations.nativeLib
8384
systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../libs/native/libraries/build/platform/").toString())

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ static TransportVersion def(int id) {
156156
public static final TransportVersion REPLACE_FAILURE_STORE_OPTIONS_WITH_SELECTOR_SYNTAX = def(8_821_00_0);
157157
public static final TransportVersion ELASTIC_INFERENCE_SERVICE_UNIFIED_CHAT_COMPLETIONS_INTEGRATION = def(8_822_00_0);
158158
public static final TransportVersion KQL_QUERY_TECH_PREVIEW = def(8_823_00_0);
159+
public static final TransportVersion ESQL_PROFILE_ROWS_PROCESSED = def(8_824_00_0);
159160

160161
/*
161162
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneCountOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,6 @@ protected Page getCheckedOutput() throws IOException {
138138
Page page = null;
139139
// emit only one page
140140
if (remainingDocs <= 0 && pagesEmitted == 0) {
141-
pagesEmitted++;
142141
LongBlock count = null;
143142
BooleanBlock seen = null;
144143
try {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneMinMaxOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -151,7 +151,6 @@ public void collect(int doc) throws IOException {
151151
Page page = null;
152152
// emit only one page
153153
if (remainingDocs <= 0 && pagesEmitted == 0) {
154-
pagesEmitted++;
155154
Block result = null;
156155
BooleanBlock seen = null;
157156
try {

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneOperator.java

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,10 @@ public abstract class LuceneOperator extends SourceOperator {
6868
long processingNanos;
6969
int pagesEmitted;
7070
boolean doneCollecting;
71+
/**
72+
* Count of rows this operator has emitted.
73+
*/
74+
private long rowsEmitted;
7175

7276
protected LuceneOperator(BlockFactory blockFactory, int maxPageSize, LuceneSliceQueue sliceQueue) {
7377
this.blockFactory = blockFactory;
@@ -115,7 +119,12 @@ public final int limit() {
115119
@Override
116120
public final Page getOutput() {
117121
try {
118-
return getCheckedOutput();
122+
Page page = getCheckedOutput();
123+
if (page != null) {
124+
pagesEmitted++;
125+
rowsEmitted += page.getPositionCount();
126+
}
127+
return page;
119128
} catch (IOException ioe) {
120129
throw new UncheckedIOException(ioe);
121130
}
@@ -252,6 +261,7 @@ public static class Status implements Operator.Status {
252261
private final int sliceMin;
253262
private final int sliceMax;
254263
private final int current;
264+
private final long rowsEmitted;
255265

256266
private Status(LuceneOperator operator) {
257267
processedSlices = operator.processedSlices;
@@ -276,6 +286,7 @@ private Status(LuceneOperator operator) {
276286
current = scorer.position;
277287
}
278288
pagesEmitted = operator.pagesEmitted;
289+
rowsEmitted = operator.rowsEmitted;
279290
}
280291

281292
Status(
@@ -288,7 +299,8 @@ private Status(LuceneOperator operator) {
288299
int pagesEmitted,
289300
int sliceMin,
290301
int sliceMax,
291-
int current
302+
int current,
303+
long rowsEmitted
292304
) {
293305
this.processedSlices = processedSlices;
294306
this.processedQueries = processedQueries;
@@ -300,6 +312,7 @@ private Status(LuceneOperator operator) {
300312
this.sliceMin = sliceMin;
301313
this.sliceMax = sliceMax;
302314
this.current = current;
315+
this.rowsEmitted = rowsEmitted;
303316
}
304317

305318
Status(StreamInput in) throws IOException {
@@ -318,6 +331,11 @@ private Status(LuceneOperator operator) {
318331
sliceMin = in.readVInt();
319332
sliceMax = in.readVInt();
320333
current = in.readVInt();
334+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
335+
rowsEmitted = in.readVLong();
336+
} else {
337+
rowsEmitted = 0;
338+
}
321339
}
322340

323341
@Override
@@ -336,6 +354,9 @@ public void writeTo(StreamOutput out) throws IOException {
336354
out.writeVInt(sliceMin);
337355
out.writeVInt(sliceMax);
338356
out.writeVInt(current);
357+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
358+
out.writeVLong(rowsEmitted);
359+
}
339360
}
340361

341362
@Override
@@ -383,6 +404,10 @@ public int current() {
383404
return current;
384405
}
385406

407+
public long rowsEmitted() {
408+
return rowsEmitted;
409+
}
410+
386411
@Override
387412
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
388413
builder.startObject();
@@ -399,6 +424,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
399424
builder.field("slice_min", sliceMin);
400425
builder.field("slice_max", sliceMax);
401426
builder.field("current", current);
427+
builder.field("rows_emitted", rowsEmitted);
402428
return builder.endObject();
403429
}
404430

@@ -416,12 +442,13 @@ public boolean equals(Object o) {
416442
&& pagesEmitted == status.pagesEmitted
417443
&& sliceMin == status.sliceMin
418444
&& sliceMax == status.sliceMax
419-
&& current == status.current;
445+
&& current == status.current
446+
&& rowsEmitted == status.rowsEmitted;
420447
}
421448

422449
@Override
423450
public int hashCode() {
424-
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current);
451+
return Objects.hash(processedSlices, sliceIndex, totalSlices, pagesEmitted, sliceMin, sliceMax, current, rowsEmitted);
425452
}
426453

427454
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneSourceOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,6 @@ public Page getCheckedOutput() throws IOException {
175175
}
176176
Page page = null;
177177
if (currentPagePos >= minPageSize || remainingDocs <= 0 || scorer.isDone()) {
178-
pagesEmitted++;
179178
IntBlock shard = null;
180179
IntBlock leaf = null;
181180
IntVector docs = null;

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperator.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,6 @@ private Page emit(boolean startEmitting) {
240240
Releasables.closeExpectNoException(shard, segments, docs, docBlock, scores);
241241
}
242242
}
243-
pagesEmitted++;
244243
return page;
245244
}
246245

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperator.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -546,8 +546,8 @@ public String toString() {
546546
}
547547

548548
@Override
549-
protected Status status(long processNanos, int pagesProcessed) {
550-
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed);
549+
protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
550+
return new Status(new TreeMap<>(readersBuilt), processNanos, pagesProcessed, rowsReceived, rowsEmitted);
551551
}
552552

553553
public static class Status extends AbstractPageMappingOperator.Status {
@@ -559,8 +559,8 @@ public static class Status extends AbstractPageMappingOperator.Status {
559559

560560
private final Map<String, Integer> readersBuilt;
561561

562-
Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed) {
563-
super(processNanos, pagesProcessed);
562+
Status(Map<String, Integer> readersBuilt, long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
563+
super(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
564564
this.readersBuilt = readersBuilt;
565565
}
566566

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/AbstractPageMappingOperator.java

Lines changed: 45 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,14 @@ public abstract class AbstractPageMappingOperator implements Operator {
3737
* Count of pages that have been processed by this operator.
3838
*/
3939
private int pagesProcessed;
40+
/**
41+
* Count of rows this operator has received.
42+
*/
43+
private long rowsReceived;
44+
/**
45+
* Count of rows this operator has emitted.
46+
*/
47+
private long rowsEmitted;
4048

4149
protected abstract Page process(Page page);
4250

@@ -52,6 +60,7 @@ public final boolean needsInput() {
5260
public final void addInput(Page page) {
5361
assert prev == null : "has pending input page";
5462
prev = page;
63+
rowsReceived += page.getPositionCount();
5564
}
5665

5766
@Override
@@ -75,18 +84,21 @@ public final Page getOutput() {
7584
long start = System.nanoTime();
7685
Page p = process(prev);
7786
pagesProcessed++;
87+
if (p != null) {
88+
rowsEmitted += p.getPositionCount();
89+
}
7890
processNanos += System.nanoTime() - start;
7991
prev = null;
8092
return p;
8193
}
8294

8395
@Override
8496
public final Status status() {
85-
return status(processNanos, pagesProcessed);
97+
return status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
8698
}
8799

88-
protected Status status(long processNanos, int pagesProcessed) {
89-
return new Status(processNanos, pagesProcessed);
100+
protected Status status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
101+
return new Status(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
90102
}
91103

92104
@Override
@@ -105,15 +117,26 @@ public static class Status implements Operator.Status {
105117

106118
private final long processNanos;
107119
private final int pagesProcessed;
120+
private final long rowsReceived;
121+
private final long rowsEmitted;
108122

109-
public Status(long processNanos, int pagesProcessed) {
123+
public Status(long processNanos, int pagesProcessed, long rowsReceived, long rowsEmitted) {
110124
this.processNanos = processNanos;
111125
this.pagesProcessed = pagesProcessed;
126+
this.rowsReceived = rowsReceived;
127+
this.rowsEmitted = rowsEmitted;
112128
}
113129

114130
protected Status(StreamInput in) throws IOException {
115131
processNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
116132
pagesProcessed = in.readVInt();
133+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
134+
rowsReceived = in.readVLong();
135+
rowsEmitted = in.readVLong();
136+
} else {
137+
rowsReceived = 0;
138+
rowsEmitted = 0;
139+
}
117140
}
118141

119142
@Override
@@ -122,6 +145,10 @@ public void writeTo(StreamOutput out) throws IOException {
122145
out.writeVLong(processNanos);
123146
}
124147
out.writeVInt(pagesProcessed);
148+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_PROFILE_ROWS_PROCESSED)) {
149+
out.writeVLong(rowsReceived);
150+
out.writeVLong(rowsEmitted);
151+
}
125152
}
126153

127154
@Override
@@ -133,6 +160,14 @@ public int pagesProcessed() {
133160
return pagesProcessed;
134161
}
135162

163+
public long rowsReceived() {
164+
return rowsReceived;
165+
}
166+
167+
public long rowsEmitted() {
168+
return rowsEmitted;
169+
}
170+
136171
public long processNanos() {
137172
return processNanos;
138173
}
@@ -153,20 +188,23 @@ protected final XContentBuilder innerToXContent(XContentBuilder builder) throws
153188
if (builder.humanReadable()) {
154189
builder.field("process_time", TimeValue.timeValueNanos(processNanos));
155190
}
156-
return builder.field("pages_processed", pagesProcessed);
191+
return builder.field("pages_processed", pagesProcessed).field("rows_received", rowsReceived).field("rows_emitted", rowsEmitted);
157192
}
158193

159194
@Override
160195
public boolean equals(Object o) {
161196
if (this == o) return true;
162197
if (o == null || getClass() != o.getClass()) return false;
163198
Status status = (Status) o;
164-
return processNanos == status.processNanos && pagesProcessed == status.pagesProcessed;
199+
return processNanos == status.processNanos
200+
&& pagesProcessed == status.pagesProcessed
201+
&& rowsReceived == status.rowsReceived
202+
&& rowsEmitted == status.rowsEmitted;
165203
}
166204

167205
@Override
168206
public int hashCode() {
169-
return Objects.hash(processNanos, pagesProcessed);
207+
return Objects.hash(processNanos, pagesProcessed, rowsReceived, rowsEmitted);
170208
}
171209

172210
@Override

0 commit comments

Comments
 (0)