Skip to content

Commit 72678e8

Browse files
authored
Add cluster and node name to driver profile output (#123470)
1 parent 4269c73 commit 72678e8

File tree

15 files changed

+157
-84
lines changed

15 files changed

+157
-84
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ static TransportVersion def(int id) {
204204
public static final TransportVersion VOYAGE_AI_INTEGRATION_ADDED = def(9_014_0_00);
205205
public static final TransportVersion BYTE_SIZE_VALUE_ALWAYS_USES_BYTES = def(9_015_0_00);
206206
public static final TransportVersion ESQL_SERIALIZE_SOURCE_FUNCTIONS_WARNINGS = def(9_016_0_00);
207+
public static final TransportVersion ESQL_DRIVER_NODE_DESCRIPTION = def(9_017_0_00);
207208

208209
/*
209210
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,8 @@ public class Driver implements Releasable, Describable {
117117
public Driver(
118118
String sessionId,
119119
String taskDescription,
120+
String clusterName,
121+
String nodeName,
120122
long startTime,
121123
long startNanos,
122124
DriverContext driverContext,
@@ -143,6 +145,8 @@ public Driver(
143145
new DriverStatus(
144146
sessionId,
145147
taskDescription,
148+
clusterName,
149+
nodeName,
146150
startTime,
147151
System.currentTimeMillis(),
148152
0,
@@ -471,6 +475,8 @@ public DriverProfile profile() {
471475
}
472476
return new DriverProfile(
473477
status.taskDescription(),
478+
status.clusterName(),
479+
status.nodeName(),
474480
status.started(),
475481
status.lastUpdated(),
476482
finishNanos - startNanos,
@@ -518,6 +524,8 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus.
518524
return new DriverStatus(
519525
sessionId,
520526
taskDescription,
527+
prev.clusterName(),
528+
prev.nodeName(),
521529
startTime,
522530
now,
523531
prev.cpuNanos() + extraCpuNanos,

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727
*
2828
* @param taskDescription Description of the task this driver is running. This description should be short and meaningful
2929
* as a grouping identifier. We use the phase of the query right now: "data", "node_reduce", "final".
30+
* @param clusterName The name of the cluster this driver is running on.
31+
* @param nodeName The name of the node this driver is running on.
3032
* @param startMillis Millis since epoch when the driver started.
3133
* @param stopMillis Millis since epoch when the driver stopped.
3234
* @param tookNanos Nanos between creation and completion of the {@link Driver}.
@@ -36,6 +38,8 @@
3638
*/
3739
public record DriverProfile(
3840
String taskDescription,
41+
String clusterName,
42+
String nodeName,
3943
long startMillis,
4044
long stopMillis,
4145
long tookNanos,
@@ -49,6 +53,8 @@ public static DriverProfile readFrom(StreamInput in) throws IOException {
4953
return new DriverProfile(
5054
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)
5155
|| in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "",
56+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION) ? in.readString() : "",
57+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION) ? in.readString() : "",
5258
in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readVLong() : 0,
5359
in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0) ? in.readVLong() : 0,
5460
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0,
@@ -65,6 +71,10 @@ public void writeTo(StreamOutput out) throws IOException {
6571
|| out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
6672
out.writeString(taskDescription);
6773
}
74+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION)) {
75+
out.writeString(clusterName);
76+
out.writeString(nodeName);
77+
}
6878
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) {
6979
out.writeVLong(startMillis);
7080
out.writeVLong(stopMillis);
@@ -82,6 +92,8 @@ public void writeTo(StreamOutput out) throws IOException {
8292
public Iterator<? extends ToXContent> toXContentChunked(ToXContent.Params params) {
8393
return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> {
8494
b.field("task_description", taskDescription);
95+
b.field("cluster_name", clusterName);
96+
b.field("node_name", nodeName);
8597
b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis);
8698
b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis);
8799
b.field("took_nanos", tookNanos);

x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
*
2929
* @param sessionId The session for this driver.
3030
* @param taskDescription Description of the task this driver is running.
31+
* @param clusterName The name of the cluster this driver is running on.
32+
* @param nodeName The name of the node this driver is running on.
3133
* @param started When this {@link Driver} was started.
3234
* @param lastUpdated When this status was generated.
3335
* @param cpuNanos Nanos this {@link Driver} has been running on the cpu. Does not include async or waiting time.
@@ -39,6 +41,8 @@
3941
public record DriverStatus(
4042
String sessionId,
4143
String taskDescription,
44+
String clusterName,
45+
String nodeName,
4246
long started,
4347
long lastUpdated,
4448
long cpuNanos,
@@ -60,6 +64,8 @@ public static DriverStatus readFrom(StreamInput in) throws IOException {
6064
in.readString(),
6165
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)
6266
|| in.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90) ? in.readString() : "",
67+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION) ? in.readString() : "",
68+
in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION) ? in.readString() : "",
6369
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0,
6470
in.readLong(),
6571
in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0,
@@ -80,6 +86,10 @@ public void writeTo(StreamOutput out) throws IOException {
8086
|| out.getTransportVersion().isPatchFrom(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION_90)) {
8187
out.writeString(taskDescription);
8288
}
89+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_NODE_DESCRIPTION)) {
90+
out.writeString(clusterName);
91+
out.writeString(nodeName);
92+
}
8393
if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) {
8494
out.writeLong(started);
8595
}
@@ -106,6 +116,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
106116
builder.startObject();
107117
builder.field("session_id", sessionId);
108118
builder.field("task_description", taskDescription);
119+
builder.field("cluster_name", clusterName);
120+
builder.field("node_name", nodeName);
109121
builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started));
110122
builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated));
111123
builder.field("cpu_nanos", cpuNanos);

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase<DriverPr
2828
public void testToXContent() {
2929
DriverProfile status = new DriverProfile(
3030
"test",
31+
"elasticsearch",
32+
"node-1",
3133
123413220000L,
3234
123413243214L,
3335
10012,
@@ -46,6 +48,8 @@ public void testToXContent() {
4648
assertThat(Strings.toString(status, true, true), equalTo("""
4749
{
4850
"task_description" : "test",
51+
"cluster_name" : "elasticsearch",
52+
"node_name" : "node-1",
4953
"start" : "1973-11-29T09:27:00.000Z",
5054
"start_millis" : 123413220000,
5155
"stop" : "1973-11-29T09:27:23.214Z",
@@ -103,7 +107,9 @@ protected Writeable.Reader<DriverProfile> instanceReader() {
103107
@Override
104108
protected DriverProfile createTestInstance() {
105109
return new DriverProfile(
106-
DriverStatusTests.randomTaskDescription(),
110+
randomIdentifier(),
111+
randomIdentifier(),
112+
randomIdentifier(),
107113
randomNonNegativeLong(),
108114
randomNonNegativeLong(),
109115
randomNonNegativeLong(),
@@ -117,25 +123,40 @@ protected DriverProfile createTestInstance() {
117123
@Override
118124
protected DriverProfile mutateInstance(DriverProfile instance) throws IOException {
119125
String taskDescription = instance.taskDescription();
126+
String clusterName = instance.clusterName();
127+
String nodeName = instance.nodeName();
120128
long startMillis = instance.startMillis();
121129
long stopMillis = instance.stopMillis();
122130
long tookNanos = instance.tookNanos();
123131
long cpuNanos = instance.cpuNanos();
124132
long iterations = instance.iterations();
125133
var operators = instance.operators();
126134
var sleeps = instance.sleeps();
127-
switch (between(0, 7)) {
128-
case 0 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription);
129-
case 1 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
130-
case 2 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
131-
case 3 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong);
132-
case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
133-
case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
134-
case 6 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses);
135-
case 7 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps);
135+
switch (between(0, 9)) {
136+
case 0 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomIdentifier);
137+
case 1 -> clusterName = randomValueOtherThan(clusterName, DriverStatusTests::randomIdentifier);
138+
case 2 -> nodeName = randomValueOtherThan(nodeName, DriverStatusTests::randomIdentifier);
139+
case 3 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
140+
case 4 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong);
141+
case 5 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong);
142+
case 6 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
143+
case 7 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
144+
case 8 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses);
145+
case 9 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps);
136146
default -> throw new UnsupportedOperationException();
137147
}
138-
return new DriverProfile(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps);
148+
return new DriverProfile(
149+
taskDescription,
150+
clusterName,
151+
nodeName,
152+
startMillis,
153+
stopMillis,
154+
tookNanos,
155+
cpuNanos,
156+
iterations,
157+
operators,
158+
sleeps
159+
);
139160
}
140161

141162
@Override

x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77

88
package org.elasticsearch.compute.operator;
99

10-
import com.carrotsearch.randomizedtesting.generators.RandomStrings;
11-
1210
import org.elasticsearch.common.Strings;
1311
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1412
import org.elasticsearch.common.io.stream.Writeable;
@@ -33,6 +31,8 @@ public void testToXContent() {
3331
DriverStatus status = new DriverStatus(
3432
"ABC:123",
3533
"test",
34+
"elasticsearch",
35+
"node-1",
3636
123413220000L,
3737
123413243214L,
3838
123213L,
@@ -53,6 +53,8 @@ public void testToXContent() {
5353
{
5454
"session_id" : "ABC:123",
5555
"task_description" : "test",
56+
"cluster_name" : "elasticsearch",
57+
"node_name" : "node-1",
5658
"started" : "1973-11-29T09:27:00.000Z",
5759
"last_updated" : "1973-11-29T09:27:23.214Z",
5860
"cpu_nanos" : 123213,
@@ -115,8 +117,10 @@ protected Writeable.Reader<DriverStatus> instanceReader() {
115117
@Override
116118
protected DriverStatus createTestInstance() {
117119
return new DriverStatus(
118-
randomSessionId(),
119-
randomTaskDescription(),
120+
randomIdentifier(),
121+
randomIdentifier(),
122+
randomIdentifier(),
123+
randomIdentifier(),
120124
randomNonNegativeLong(),
121125
randomNonNegativeLong(),
122126
randomNonNegativeLong(),
@@ -128,14 +132,6 @@ protected DriverStatus createTestInstance() {
128132
);
129133
}
130134

131-
private String randomSessionId() {
132-
return RandomStrings.randomAsciiLettersOfLengthBetween(random(), 1, 15);
133-
}
134-
135-
public static String randomTaskDescription() {
136-
return RandomStrings.randomAsciiLettersOfLength(random(), 5);
137-
}
138-
139135
private DriverStatus.Status randomStatus() {
140136
return randomFrom(DriverStatus.Status.values());
141137
}
@@ -158,6 +154,8 @@ private static OperatorStatus randomOperatorStatus() {
158154
protected DriverStatus mutateInstance(DriverStatus instance) throws IOException {
159155
var sessionId = instance.sessionId();
160156
var taskDescription = instance.taskDescription();
157+
var clusterName = instance.clusterName();
158+
var nodeName = instance.nodeName();
161159
long started = instance.started();
162160
long lastUpdated = instance.lastUpdated();
163161
long cpuNanos = instance.cpuNanos();
@@ -166,22 +164,26 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException
166164
var completedOperators = instance.completedOperators();
167165
var activeOperators = instance.activeOperators();
168166
var sleeps = instance.sleeps();
169-
switch (between(0, 9)) {
170-
case 0 -> sessionId = randomValueOtherThan(sessionId, this::randomSessionId);
171-
case 1 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription);
172-
case 2 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong);
173-
case 3 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong);
174-
case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
175-
case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
176-
case 6 -> status = randomValueOtherThan(status, this::randomStatus);
177-
case 7 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses);
178-
case 8 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses);
179-
case 9 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps);
167+
switch (between(0, 11)) {
168+
case 0 -> sessionId = randomValueOtherThan(sessionId, ESTestCase::randomIdentifier);
169+
case 1 -> taskDescription = randomValueOtherThan(taskDescription, ESTestCase::randomIdentifier);
170+
case 2 -> clusterName = randomValueOtherThan(clusterName, ESTestCase::randomIdentifier);
171+
case 3 -> nodeName = randomValueOtherThan(nodeName, ESTestCase::randomIdentifier);
172+
case 4 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong);
173+
case 5 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong);
174+
case 6 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong);
175+
case 7 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong);
176+
case 8 -> status = randomValueOtherThan(status, this::randomStatus);
177+
case 9 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses);
178+
case 10 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses);
179+
case 11 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps);
180180
default -> throw new UnsupportedOperationException();
181181
}
182182
return new DriverStatus(
183183
sessionId,
184184
taskDescription,
185+
clusterName,
186+
nodeName,
185187
started,
186188
lastUpdated,
187189
cpuNanos,

0 commit comments

Comments
 (0)