Skip to content

Commit b544227

Browse files
authored
ESQL: Add description to status and profile (elastic#121783) (elastic#121823)
This adds a `task_description` field to `profile` output and task `status`. This looks like: ``` ... "profile" : { "drivers" : [ { "task_description" : "final", "start_millis" : 1738768795349, "stop_millis" : 1738768795405, ... "task_description" : "node_reduce", "start_millis" : 1738768795392, "stop_millis" : 1738768795406, ... "task_description" : "data", "start_millis" : 1738768795391, "stop_millis" : 1738768795404, ... ``` Previously you had to look at the signature of the operators in the driver to figure out what the driver is *doing*. You had to know enough about how ESQL works to guess. Now you can look at this description to see what the server *thinks* it is doing. No more manual classification. This will be useful when debugging failures and performance regressions because it is much easier to use `jq` to group on it: ``` | jq '.profile[] | group_by(.task_description)[]' ```
1 parent 45f6c66 commit b544227

File tree

48 files changed

+392
-157
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+392
-157
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ static TransportVersion def(int id) {
176176
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
177177
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
178178
public static final TransportVersion REMOVE_DESIRED_NODE_VERSION = def(9_004_0_00);
179-
179+
public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION = def(9_005_0_00);
180180
/*
181181
* STOP! READ THIS FIRST! No, really,
182182
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ public class Driver implements Releasable, Describable {
5252

5353
private final String sessionId;
5454

55+
/**
56+
* Description of the task this driver is running. This description should be
57+
* short and meaningful as a grouping identifier. We use the phase of the
58+
* query right now: "data", "node_reduce", "final".
59+
*/
60+
private final String taskDescription;
61+
5562
/**
5663
* The wall clock time when this driver was created in milliseconds since epoch.
5764
* Compared to {@link #startNanos} this is less accurate and is measured by a
@@ -96,6 +103,10 @@ public class Driver implements Releasable, Describable {
96103
/**
97104
* Creates a new driver with a chain of operators.
98105
* @param sessionId session Id
106+
* @param taskDescription Description of the task this driver is running. This
107+
* description should be short and meaningful as a grouping
108+
* identifier. We use the phase of the query right now:
109+
* "data", "node_reduce", "final".
99110
* @param driverContext the driver context
100111
* @param source source operator
101112
* @param intermediateOperators the chain of operators to execute
@@ -105,6 +116,7 @@ public class Driver implements Releasable, Describable {
105116
*/
106117
public Driver(
107118
String sessionId,
119+
String taskDescription,
108120
long startTime,
109121
long startNanos,
110122
DriverContext driverContext,
@@ -116,6 +128,7 @@ public Driver(
116128
Releasable releasable
117129
) {
118130
this.sessionId = sessionId;
131+
this.taskDescription = taskDescription;
119132
this.startTime = startTime;
120133
this.startNanos = startNanos;
121134
this.driverContext = driverContext;
@@ -129,6 +142,7 @@ public Driver(
129142
this.status = new AtomicReference<>(
130143
new DriverStatus(
131144
sessionId,
145+
taskDescription,
132146
startTime,
133147
System.currentTimeMillis(),
134148
0,
@@ -150,6 +164,7 @@ public Driver(
150164
* @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion
151165
*/
152166
public Driver(
167+
String taskDescription,
153168
DriverContext driverContext,
154169
SourceOperator source,
155170
List<Operator> intermediateOperators,
@@ -158,6 +173,7 @@ public Driver(
158173
) {
159174
this(
160175
"unset",
176+
taskDescription,
161177
System.currentTimeMillis(),
162178
System.nanoTime(),
163179
driverContext,
@@ -485,6 +501,7 @@ public DriverProfile profile() {
485501
throw new IllegalStateException("can only get profile from finished driver");
486502
}
487503
return new DriverProfile(
504+
status.taskDescription(),
488505
status.started(),
489506
status.lastUpdated(),
490507
finishNanos - startNanos,
@@ -531,6 +548,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
531548

532549
return new DriverStatus(
533550
sessionId,
551+
taskDescription,
534552
startTime,
535553
now,
536554
prev.cpuNanos() + extraCpuNanos,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,13 @@
2727
* Profile results from a single {@link Driver}.
2828
*/
2929
public class DriverProfile implements Writeable, ChunkedToXContentObject {
30+
/**
31+
* Description of the task this driver is running. This description should be
32+
* short and meaningful as a grouping identifier. We use the phase of the
33+
* query right now: "data", "node_reduce", "final".
34+
*/
35+
private final String taskDescription;
36+
3037
/**
3138
* Millis since epoch when the driver started.
3239
*/
@@ -62,6 +69,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject {
6269
private final DriverSleeps sleeps;
6370

6471
public DriverProfile(
72+
String taskDescription,
6573
long startMillis,
6674
long stopMillis,
6775
long tookNanos,
@@ -70,6 +78,7 @@ public DriverProfile(
7078
List<DriverStatus.OperatorStatus> operators,
7179
DriverSleeps sleeps
7280
) {
81+
this.taskDescription = taskDescription;
7382
this.startMillis = startMillis;
7483
this.stopMillis = stopMillis;
7584
this.tookNanos = tookNanos;
@@ -80,6 +89,7 @@ public DriverProfile(
8089
}
8190

8291
public DriverProfile(StreamInput in) throws IOException {
92+
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : "";
8393
if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
8494
this.startMillis = in.readVLong();
8595
this.stopMillis = in.readVLong();
@@ -102,6 +112,9 @@ public DriverProfile(StreamInput in) throws IOException {
102112

103113
@Override
104114
public void writeTo(StreamOutput out) throws IOException {
115+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) {
116+
out.writeString(taskDescription);
117+
}
105118
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
106119
out.writeVLong(startMillis);
107120
out.writeVLong(stopMillis);
@@ -115,6 +128,13 @@ public void writeTo(StreamOutput out) throws IOException {
115128
sleeps.writeTo(out);
116129
}
117130

131+
/**
132+
* Description of the task this driver is running.
133+
*/
134+
public String taskDescription() {
135+
return taskDescription;
136+
}
137+
118138
/**
119139
* Millis since epoch when the driver started.
120140
*/
@@ -169,6 +189,7 @@ public DriverSleeps sleeps() {
169189
@Override
170190
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
171191
return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> {
192+
b.field("task_description", taskDescription);
172193
b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
173194
b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
174195
b.field("took_nanos", tookNanos);
@@ -197,7 +218,8 @@ public boolean equals(Object o) {
197218
return false;
198219
}
199220
DriverProfile that = (DriverProfile) o;
200-
return startMillis == that.startMillis
221+
return taskDescription.equals(that.taskDescription)
222+
&& startMillis == that.startMillis
201223
&& stopMillis == that.stopMillis
202224
&& tookNanos == that.tookNanos
203225
&& cpuNanos == that.cpuNanos
@@ -208,7 +230,7 @@ public boolean equals(Object o) {
208230

209231
@Override
210232
public int hashCode() {
211-
return Objects.hash(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
233+
return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
212234
}
213235

214236
@Override

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ public class DriverStatus implements Task.Status {
4242
*/
4343
private final String sessionId;
4444

45+
/**
46+
* Description of the task this driver is running.
47+
*/
48+
private final String taskDescription;
49+
4550
/**
4651
* Milliseconds since epoch when this driver started.
4752
*/
@@ -83,6 +88,7 @@ public class DriverStatus implements Task.Status {
8388

8489
DriverStatus(
8590
String sessionId,
91+
String taskDescription,
8692
long started,
8793
long lastUpdated,
8894
long cpuTime,
@@ -93,6 +99,7 @@ public class DriverStatus implements Task.Status {
9399
DriverSleeps sleeps
94100
) {
95101
this.sessionId = sessionId;
102+
this.taskDescription = taskDescription;
96103
this.started = started;
97104
this.lastUpdated = lastUpdated;
98105
this.cpuNanos = cpuTime;
@@ -105,6 +112,7 @@ public class DriverStatus implements Task.Status {
105112

106113
public DriverStatus(StreamInput in) throws IOException {
107114
this.sessionId = in.readString();
115+
this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : "";
108116
this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0;
109117
this.lastUpdated = in.readLong();
110118
this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0;
@@ -122,6 +130,9 @@ public DriverStatus(StreamInput in) throws IOException {
122130
@Override
123131
public void writeTo(StreamOutput out) throws IOException {
124132
out.writeString(sessionId);
133+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) {
134+
out.writeString(taskDescription);
135+
}
125136
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
126137
out.writeLong(started);
127138
}
@@ -150,6 +161,15 @@ public String sessionId() {
150161
return sessionId;
151162
}
152163

164+
/**
165+
* Description of the task this driver is running. This description should be
166+
* short and meaningful as a grouping identifier. We use the phase of the
167+
* query right now: "data", "node_reduce", "final".
168+
*/
169+
public String taskDescription() {
170+
return taskDescription;
171+
}
172+
153173
/**
154174
* When this {@link Driver} was started.
155175
*/
@@ -211,7 +231,8 @@ public List<OperatorStatus> activeOperators() {
211231
@Override
212232
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
213233
builder.startObject();
214-
builder.field("sessionId", sessionId);
234+
builder.field("session_id", sessionId);
235+
builder.field("task_description", taskDescription);
215236
builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
216237
builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
217238
builder.field("cpu_nanos", cpuNanos);
@@ -240,6 +261,7 @@ public boolean equals(Object o) {
240261
if (o == null || getClass() != o.getClass()) return false;
241262
DriverStatus that = (DriverStatus) o;
242263
return sessionId.equals(that.sessionId)
264+
&& taskDescription.equals(that.taskDescription)
243265
&& started == that.started
244266
&& lastUpdated == that.lastUpdated
245267
&& cpuNanos == that.cpuNanos
@@ -252,7 +274,18 @@ public boolean equals(Object o) {
252274

253275
@Override
254276
public int hashCode() {
255-
return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps);
277+
return Objects.hash(
278+
sessionId,
279+
taskDescription,
280+
started,
281+
lastUpdated,
282+
cpuNanos,
283+
iterations,
284+
status,
285+
completedOperators,
286+
activeOperators,
287+
sleeps
288+
);
256289
}
257290

258291
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void testQueryOperator() throws IOException {
123123
}
124124
});
125125
DriverContext driverContext = driverContext();
126-
drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
126+
drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {}));
127127
}
128128
OperatorTestCase.runDriver(drivers);
129129
Set<Integer> expectedDocIds = searchForDocIds(reader, query);
@@ -215,6 +215,7 @@ public String toString() {
215215
)
216216
);
217217
Driver driver = new Driver(
218+
"test",
218219
driverContext,
219220
luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext),
220221
operators,
@@ -248,6 +249,7 @@ public void testLimitOperator() {
248249
DriverContext driverContext = driverContext();
249250
try (
250251
var driver = new Driver(
252+
"test",
251253
driverContext,
252254
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
253255
List.of((new LimitOperator.Factory(limit)).get(driverContext)),
@@ -335,6 +337,7 @@ public void testHashLookup() {
335337
var actualPrimeOrds = new ArrayList<>();
336338
try (
337339
var driver = new Driver(
340+
"test",
338341
driverContext,
339342
new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100),
340343
List.of(

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ public final void testIgnoresNulls() {
111111

112112
try (
113113
Driver d = new Driver(
114+
"test",
114115
driverContext,
115116
new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory),
116117
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void testRejectsDouble() {
6565
BlockFactory blockFactory = driverContext.blockFactory();
6666
try (
6767
Driver d = new Driver(
68+
"test",
6869
driverContext,
6970
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
7071
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ public void testRejectsDouble() {
6666
BlockFactory blockFactory = driverContext.blockFactory();
6767
try (
6868
Driver d = new Driver(
69+
"test",
6970
driverContext,
7071
new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))),
7172
List.of(simple().get(driverContext)),

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public void testOverflowSucceeds() {
5353
List<Page> results = new ArrayList<>();
5454
try (
5555
Driver d = new Driver(
56+
"test",
5657
driverContext,
5758
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)),
5859
List.of(simple().get(driverContext)),
@@ -71,6 +72,7 @@ public void testSummationAccuracy() {
7172
List<Page> results = new ArrayList<>();
7273
try (
7374
Driver d = new Driver(
75+
"test",
7476
driverContext,
7577
new SequenceDoubleBlockSourceOperator(
7678
driverContext.blockFactory(),
@@ -100,6 +102,7 @@ public void testSummationAccuracy() {
100102
driverContext = driverContext();
101103
try (
102104
Driver d = new Driver(
105+
"test",
103106
driverContext,
104107
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)),
105108
List.of(simple().get(driverContext)),
@@ -122,6 +125,7 @@ public void testSummationAccuracy() {
122125
driverContext = driverContext();
123126
try (
124127
Driver d = new Driver(
128+
"test",
125129
driverContext,
126130
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
127131
List.of(simple().get(driverContext)),
@@ -141,6 +145,7 @@ public void testSummationAccuracy() {
141145
driverContext = driverContext();
142146
try (
143147
Driver d = new Driver(
148+
"test",
144149
driverContext,
145150
new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)),
146151
List.of(simple().get(driverContext)),

0 commit comments

Comments
 (0)