Skip to content

Commit f723d6c

Browse files
committed
Rename driver description (#123848)
(cherry picked from commit 2fcb23a)
1 parent a943fdc commit f723d6c

File tree

10 files changed

+69
-55
lines changed

10 files changed

+69
-55
lines changed

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,10 @@ public class Driver implements Releasable, Describable {
5353
private final String sessionId;
5454

5555
/**
56-
* Description of the task this driver is running. This description should be
57-
* short and meaningful as a grouping identifier. We use the phase of the
58-
* query right now: "data", "node_reduce", "final".
56+
* Description of the driver. This description should be short and meaningful as a grouping identifier.
57+
* We use the phase of the query right now: "data", "node_reduce", "final".
5958
*/
60-
private final String taskDescription;
59+
private final String shortDescription;
6160

6261
/**
6362
* The wall clock time when this driver was created in milliseconds since epoch.
@@ -103,10 +102,8 @@ public class Driver implements Releasable, Describable {
103102
/**
104103
* Creates a new driver with a chain of operators.
105104
* @param sessionId session Id
106-
* @param taskDescription Description of the task this driver is running. This
107-
* description should be short and meaningful as a grouping
108-
* identifier. We use the phase of the query right now:
109-
* "data", "node_reduce", "final".
105+
* @param shortDescription Description of the driver. This description should be short and meaningful as a grouping identifier.
106+
* We use the phase of the query right now: "data", "node_reduce", "final".
110107
* @param driverContext the driver context
111108
* @param source source operator
112109
* @param intermediateOperators the chain of operators to execute
@@ -116,7 +113,7 @@ public class Driver implements Releasable, Describable {
116113
*/
117114
public Driver(
118115
String sessionId,
119-
String taskDescription,
116+
String shortDescription,
120117
long startTime,
121118
long startNanos,
122119
DriverContext driverContext,
@@ -128,7 +125,7 @@ public Driver(
128125
Releasable releasable
129126
) {
130127
this.sessionId = sessionId;
131-
this.taskDescription = taskDescription;
128+
this.shortDescription = shortDescription;
132129
this.startTime = startTime;
133130
this.startNanos = startNanos;
134131
this.driverContext = driverContext;
@@ -142,7 +139,7 @@ public Driver(
142139
this.status = new AtomicReference<>(
143140
new DriverStatus(
144141
sessionId,
145-
taskDescription,
142+
shortDescription,
146143
startTime,
147144
System.currentTimeMillis(),
148145
0,
@@ -481,7 +478,7 @@ public DriverProfile profile() {
481478
throw new IllegalStateException("can only get profile from finished driver");
482479
}
483480
return new DriverProfile(
484-
status.taskDescription(),
481+
status.description(),
485482
status.started(),
486483
status.lastUpdated(),
487484
finishNanos - startNanos,
@@ -528,7 +525,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
528525

529526
return new DriverStatus(
530527
sessionId,
531-
taskDescription,
528+
shortDescription,
532529
startTime,
533530
now,
534531
prev.cpuNanos() + extraCpuNanos,

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public void testToXContent() {
4545
);
4646
assertThat(Strings.toString(status, true, true), equalTo("""
4747
{
48-
"task_description" : "test",
48+
"description" : "test",
4949
"start" : "1973-11-29T09:27:00.000Z",
5050
"start_millis" : 123413220000,
5151
"stop" : "1973-11-29T09:27:23.214Z",
@@ -116,7 +116,7 @@ protected DriverProfile createTestInstance() {
116116

117117
@Override
118118
protected DriverProfile mutateInstance(DriverProfile instance) throws IOException {
119-
String taskDescription = instance.taskDescription();
119+
String shortDescription = instance.description();
120120
long startMillis = instance.startMillis();
121121
long stopMillis = instance.stopMillis();
122122
long tookNanos = instance.tookNanos();
@@ -125,7 +125,7 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio
125125
var operators = instance.operators();
126126
var sleeps = instance.sleeps();
127127
switch (between(0, 7)) {
128-
case 0 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription);
128+
case 0 -> shortDescription = randomValueOtherThan(shortDescription, DriverStatusTests::randomIdentifier);
129129
case 1 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
130130
case 2 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
131131
case 3 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong);
@@ -135,7 +135,16 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio
135135
case 7 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps);
136136
default -> throw new UnsupportedOperationException();
137137
}
138-
return new DriverProfile(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
138+
return new DriverProfile(
139+
shortDescription,
140+
startMillis,
141+
stopMillis,
142+
tookNanos,
143+
cpuNanos,
144+
iterations,
145+
operators,
146+
sleeps
147+
);
139148
}
140149

141150
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public void testToXContent() {
5252
assertThat(Strings.toString(status, true, true), equalTo("""
5353
{
5454
"session_id" : "ABC:123",
55-
"task_description" : "test",
55+
"description" : "test",
5656
"started" : "1973-11-29T09:27:00.000Z",
5757
"last_updated" : "1973-11-29T09:27:23.214Z",
5858
"cpu_nanos" : 123213,
@@ -157,7 +157,7 @@ private static DriverStatus.OperatorStatus randomOperatorStatus() {
157157
@Override
158158
protected DriverStatus mutateInstance(DriverStatus instance) throws IOException {
159159
var sessionId = instance.sessionId();
160-
var taskDescription = instance.taskDescription();
160+
var description = instance.description();
161161
long started = instance.started();
162162
long lastUpdated = instance.lastUpdated();
163163
long cpuNanos = instance.cpuNanos();
@@ -167,8 +167,8 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException
167167
var activeOperators = instance.activeOperators();
168168
var sleeps = instance.sleeps();
169169
switch (between(0, 9)) {
170-
case 0 -> sessionId = randomValueOtherThan(sessionId, this::randomSessionId);
171-
case 1 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription);
170+
case 0 -> sessionId = randomValueOtherThan(sessionId, ESTestCase::randomIdentifier);
171+
case 1 -> description = randomValueOtherThan(description, ESTestCase::randomIdentifier);
172172
case 2 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong);
173173
case 3 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong);
174174
case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
@@ -181,7 +181,7 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException
181181
}
182182
return new DriverStatus(
183183
sessionId,
184-
taskDescription,
184+
description,
185185
started,
186186
lastUpdated,
187187
cpuNanos,

x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -300,8 +300,8 @@ public void testProfile() throws IOException {
300300
for (Map<String, Object> o : operators) {
301301
sig.add(checkOperatorProfile(o));
302302
}
303-
String taskDescription = p.get("task_description").toString();
304-
switch (taskDescription) {
303+
String description = p.get("description").toString();
304+
switch (description) {
305305
case "data" -> assertMap(
306306
sig,
307307
matchesList().item("LuceneSourceOperator")
@@ -325,7 +325,7 @@ public void testProfile() throws IOException {
325325
.item("ProjectOperator")
326326
.item("OutputOperator")
327327
);
328-
default -> throw new IllegalArgumentException("can't match " + taskDescription);
328+
default -> throw new IllegalArgumentException("can't match " + description);
329329
}
330330
}
331331
}
@@ -400,7 +400,7 @@ public void testInlineStatsProfile() throws IOException {
400400
}
401401
signatures.add(sig);
402402
}
403-
// TODO adapt this to use task_description once this is reenabled
403+
// TODO adapt this to use description once this is re-enabled
404404
assertThat(
405405
signatures,
406406
containsInAnyOrder(
@@ -501,8 +501,8 @@ public void testForceSleepsProfile() throws IOException {
501501
MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty")
502502
.entry("sleep_millis", greaterThan(0L))
503503
.entry("wake_millis", greaterThan(0L));
504-
String taskDescription = p.get("task_description").toString();
505-
switch (taskDescription) {
504+
String description = p.get("description").toString();
505+
switch (description) {
506506
case "data" -> assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of()));
507507
case "node_reduce" -> {
508508
assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk());
@@ -525,13 +525,14 @@ public void testForceSleepsProfile() throws IOException {
525525
.entry("last", List.of(sleepMatcher))
526526
);
527527
}
528-
default -> throw new IllegalArgumentException("unknown task: " + taskDescription);
528+
default -> throw new IllegalArgumentException("unknown task: " + description);
529529
}
530530
}
531531
}
532532

533533
private MapMatcher commonProfile() {
534-
return matchesMap().entry("task_description", any(String.class))
534+
return matchesMap() //
535+
.entry("description", any(String.class))
535536
.entry("start_millis", greaterThan(0L))
536537
.entry("stop_millis", greaterThan(0L))
537538
.entry("iterations", greaterThan(0L))

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/CrossClusterAsyncEnrichStopIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ public void testEnrichAfterStop() throws Exception {
9797
assertBusy(() -> {
9898
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
9999
List<TaskInfo> reduceTasks = tasks.stream()
100-
.filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce"))
100+
.filter(t -> t.status() instanceof DriverStatus ds && ds.description().equals("remote_reduce"))
101101
.toList();
102102
assertThat(reduceTasks, not(empty()));
103103
});
@@ -109,7 +109,7 @@ public void testEnrichAfterStop() throws Exception {
109109
assertBusy(() -> {
110110
List<TaskInfo> tasks = getDriverTasks(client(REMOTE_CLUSTER_2));
111111
List<TaskInfo> reduceTasks = tasks.stream()
112-
.filter(t -> t.status() instanceof DriverStatus ds && ds.taskDescription().equals("remote_reduce"))
112+
.filter(t -> t.status() instanceof DriverStatus ds && ds.description().equals("remote_reduce"))
113113
.toList();
114114
assertThat(reduceTasks, empty());
115115
});

x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -103,11 +103,11 @@ public void testTaskContents() throws Exception {
103103
for (TaskInfo task : foundTasks) {
104104
DriverStatus status = (DriverStatus) task.status();
105105
assertThat(status.sessionId(), not(emptyOrNullString()));
106-
String taskDescription = status.taskDescription();
107-
for (DriverStatus.OperatorStatus o : status.activeOperators()) {
106+
String description = status.description();
107+
for (OperatorStatus o : status.activeOperators()) {
108108
logger.info("status {}", o);
109-
if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) {
110-
assertThat(taskDescription, equalTo("data"));
109+
if (o.operator().startsWith("LuceneSourceOperator[")) {
110+
assertThat(description, equalTo("data"));
111111
LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status();
112112
assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices()));
113113
assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*")));
@@ -127,7 +127,7 @@ public void testTaskContents() throws Exception {
127127
continue;
128128
}
129129
if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) {
130-
assertThat(taskDescription, equalTo("data"));
130+
assertThat(description, equalTo("data"));
131131
ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status();
132132
assertMap(
133133
oStatus.readersBuilt(),
@@ -138,15 +138,15 @@ public void testTaskContents() throws Exception {
138138
continue;
139139
}
140140
if (o.operator().equals("ExchangeSourceOperator")) {
141-
assertThat(taskDescription, either(equalTo("node_reduce")).or(equalTo("final")));
141+
assertThat(description, either(equalTo("node_reduce")).or(equalTo("final")));
142142
ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status();
143143
assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0));
144144
assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0));
145145
exchangeSources++;
146146
continue;
147147
}
148148
if (o.operator().equals("ExchangeSinkOperator")) {
149-
assertThat(taskDescription, either(equalTo("data")).or(equalTo("node_reduce")));
149+
assertThat(description, either(equalTo("data")).or(equalTo("node_reduce")));
150150
ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status();
151151
assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(0));
152152
exchangeSinks++;
@@ -192,7 +192,7 @@ public void testCancelRead() throws Exception {
192192
ActionFuture<EsqlQueryResponse> response = startEsql();
193193
try {
194194
List<TaskInfo> infos = getTasksStarting();
195-
TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).findFirst().get();
195+
TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).description().equals("data")).findFirst().get();
196196
cancelTask(running.taskId());
197197
assertCancelled(response);
198198
} finally {
@@ -204,7 +204,7 @@ public void testCancelMerge() throws Exception {
204204
ActionFuture<EsqlQueryResponse> response = startEsql();
205205
try {
206206
List<TaskInfo> infos = getTasksStarting();
207-
TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).findFirst().get();
207+
TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).description().equals("final")).findFirst().get();
208208
cancelTask(running.taskId());
209209
assertCancelled(response);
210210
} finally {
@@ -288,8 +288,8 @@ private List<TaskInfo> getTasksStarting() throws Exception {
288288
for (TaskInfo task : tasks) {
289289
assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME));
290290
DriverStatus status = (DriverStatus) task.status();
291-
logger.info("task {} {} {}", status.taskDescription(), task.description(), status);
292-
assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final")));
291+
logger.info("task {} {} {}", status.description(), task.description(), status);
292+
assertThat(status.description(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final")));
293293
/*
294294
* Accept tasks that are either starting or have gone
295295
* immediately async. The coordinating task is likely
@@ -313,8 +313,8 @@ private List<TaskInfo> getTasksRunning() throws Exception {
313313
for (TaskInfo task : tasks) {
314314
assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME));
315315
DriverStatus status = (DriverStatus) task.status();
316-
assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final")));
317-
if (status.taskDescription().equals("data")) {
316+
assertThat(status.description(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final")));
317+
if (status.description().equals("data")) {
318318
assertThat(status.status(), equalTo(DriverStatus.Status.RUNNING));
319319
} else {
320320
assertThat(status.status(), equalTo(DriverStatus.Status.ASYNC));
@@ -348,15 +348,15 @@ private List<TaskInfo> getDriverTasks() throws Exception {
348348
}
349349

350350
private List<TaskInfo> dataTasks(List<TaskInfo> tasks) {
351-
return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).toList();
351+
return tasks.stream().filter(t -> ((DriverStatus) t.status()).description().equals("data")).toList();
352352
}
353353

354354
private List<TaskInfo> nodeReduceTasks(List<TaskInfo> tasks) {
355-
return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("node_reduce")).toList();
355+
return tasks.stream().filter(t -> ((DriverStatus) t.status()).description().equals("node_reduce")).toList();
356356
}
357357

358358
private List<TaskInfo> coordinatorTasks(List<TaskInfo> tasks) {
359-
return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).toList();
359+
return tasks.stream().filter(t -> ((DriverStatus) t.status()).description().equals("final")).toList();
360360
}
361361

362362
private void assertCancelled(ActionFuture<EsqlQueryResponse> response) throws Exception {

0 commit comments

Comments
 (0)