diff --git a/server/src/main/java/org/elasticsearch/TransportVersions.java b/server/src/main/java/org/elasticsearch/TransportVersions.java index 59ad137116ffd..c9d181d4cb0b6 100644 --- a/server/src/main/java/org/elasticsearch/TransportVersions.java +++ b/server/src/main/java/org/elasticsearch/TransportVersions.java @@ -176,7 +176,7 @@ static TransportVersion def(int id) { public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00); public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00); public static final TransportVersion REMOVE_DESIRED_NODE_VERSION = def(9_004_0_00); - + public static final TransportVersion ESQL_DRIVER_TASK_DESCRIPTION = def(9_005_0_00); /* * STOP! READ THIS FIRST! No, really, * ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _ diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java index 78572f55cd5eb..c0d220fda5d4e 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/Driver.java @@ -52,6 +52,13 @@ public class Driver implements Releasable, Describable { private final String sessionId; + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + private final String taskDescription; + /** * The wall clock time when this driver was created in milliseconds since epoch. * Compared to {@link #startNanos} this is less accurate and is measured by a @@ -96,6 +103,10 @@ public class Driver implements Releasable, Describable { /** * Creates a new driver with a chain of operators. * @param sessionId session Id + * @param taskDescription Description of the task this driver is running. This + * description should be short and meaningful as a grouping + * identifier. We use the phase of the query right now: + * "data", "node_reduce", "final". * @param driverContext the driver context * @param source source operator * @param intermediateOperators the chain of operators to execute @@ -105,6 +116,7 @@ public class Driver implements Releasable, Describable { */ public Driver( String sessionId, + String taskDescription, long startTime, long startNanos, DriverContext driverContext, @@ -116,6 +128,7 @@ public Driver( Releasable releasable ) { this.sessionId = sessionId; + this.taskDescription = taskDescription; this.startTime = startTime; this.startNanos = startNanos; this.driverContext = driverContext; @@ -129,6 +142,7 @@ public Driver( this.status = new AtomicReference<>( new DriverStatus( sessionId, + taskDescription, startTime, System.currentTimeMillis(), 0, @@ -150,6 +164,7 @@ public Driver( * @param releasable a {@link Releasable} to invoked once the chain of operators has run to completion */ public Driver( + String taskDescription, DriverContext driverContext, SourceOperator source, List intermediateOperators, @@ -158,6 +173,7 @@ public Driver( ) { this( "unset", + taskDescription, System.currentTimeMillis(), System.nanoTime(), driverContext, @@ -485,6 +501,7 @@ public DriverProfile profile() { throw new IllegalStateException("can only get profile from finished driver"); } return new DriverProfile( + status.taskDescription(), status.started(), status.lastUpdated(), finishNanos - startNanos, @@ -531,6 +548,7 @@ private void updateStatus(long extraCpuNanos, int extraIterations, DriverStatus. return new DriverStatus( sessionId, + taskDescription, startTime, now, prev.cpuNanos() + extraCpuNanos, diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java index 59ecdde230413..38fb298a7cffa 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverProfile.java @@ -27,6 +27,13 @@ * Profile results from a single {@link Driver}. */ public class DriverProfile implements Writeable, ChunkedToXContentObject { + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + private final String taskDescription; + /** * Millis since epoch when the driver started. */ @@ -62,6 +69,7 @@ public class DriverProfile implements Writeable, ChunkedToXContentObject { private final DriverSleeps sleeps; public DriverProfile( + String taskDescription, long startMillis, long stopMillis, long tookNanos, @@ -70,6 +78,7 @@ public DriverProfile( List operators, DriverSleeps sleeps ) { + this.taskDescription = taskDescription; this.startMillis = startMillis; this.stopMillis = stopMillis; this.tookNanos = tookNanos; @@ -80,6 +89,7 @@ public DriverProfile( } public DriverProfile(StreamInput in) throws IOException { + this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : ""; if (in.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { this.startMillis = in.readVLong(); this.stopMillis = in.readVLong(); @@ -102,6 +112,9 @@ public DriverProfile(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) { + out.writeString(taskDescription); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0)) { out.writeVLong(startMillis); out.writeVLong(stopMillis); @@ -115,6 +128,13 @@ public void writeTo(StreamOutput out) throws IOException { sleeps.writeTo(out); } + /** + * Description of the task this driver is running. + */ + public String taskDescription() { + return taskDescription; + } + /** * Millis since epoch when the driver started. */ @@ -169,6 +189,7 @@ public DriverSleeps sleeps() { @Override public Iterator toXContentChunked(ToXContent.Params params) { return Iterators.concat(ChunkedToXContentHelper.startObject(), Iterators.single((b, p) -> { + b.field("task_description", taskDescription); b.timestampFieldsFromUnixEpochMillis("start_millis", "start", startMillis); b.timestampFieldsFromUnixEpochMillis("stop_millis", "stop", stopMillis); b.field("took_nanos", tookNanos); @@ -197,7 +218,8 @@ public boolean equals(Object o) { return false; } DriverProfile that = (DriverProfile) o; - return startMillis == that.startMillis + return taskDescription.equals(that.taskDescription) + && startMillis == that.startMillis && stopMillis == that.stopMillis && tookNanos == that.tookNanos && cpuNanos == that.cpuNanos @@ -208,7 +230,7 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); + return Objects.hash(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); } @Override diff --git a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java index 42e3908231206..87537755bba3d 100644 --- a/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java +++ b/x-pack/plugin/esql/compute/src/main/java/org/elasticsearch/compute/operator/DriverStatus.java @@ -42,6 +42,11 @@ public class DriverStatus implements Task.Status { */ private final String sessionId; + /** + * Description of the task this driver is running. + */ + private final String taskDescription; + /** * Milliseconds since epoch when this driver started. */ @@ -83,6 +88,7 @@ public class DriverStatus implements Task.Status { DriverStatus( String sessionId, + String taskDescription, long started, long lastUpdated, long cpuTime, @@ -93,6 +99,7 @@ public class DriverStatus implements Task.Status { DriverSleeps sleeps ) { this.sessionId = sessionId; + this.taskDescription = taskDescription; this.started = started; this.lastUpdated = lastUpdated; this.cpuNanos = cpuTime; @@ -105,6 +112,7 @@ public class DriverStatus implements Task.Status { public DriverStatus(StreamInput in) throws IOException { this.sessionId = in.readString(); + this.taskDescription = in.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION) ? in.readString() : ""; this.started = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readLong() : 0; this.lastUpdated = in.readLong(); this.cpuNanos = in.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0) ? in.readVLong() : 0; @@ -122,6 +130,9 @@ public DriverStatus(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { out.writeString(sessionId); + if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_DRIVER_TASK_DESCRIPTION)) { + out.writeString(taskDescription); + } if (out.getTransportVersion().onOrAfter(TransportVersions.V_8_14_0)) { out.writeLong(started); } @@ -150,6 +161,15 @@ public String sessionId() { return sessionId; } + /** + * Description of the task this driver is running. This description should be + * short and meaningful as a grouping identifier. We use the phase of the + * query right now: "data", "node_reduce", "final". + */ + public String taskDescription() { + return taskDescription; + } + /** * When this {@link Driver} was started. */ @@ -211,7 +231,8 @@ public List activeOperators() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(); - builder.field("sessionId", sessionId); + builder.field("session_id", sessionId); + builder.field("task_description", taskDescription); builder.field("started", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(started)); builder.field("last_updated", DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.formatMillis(lastUpdated)); builder.field("cpu_nanos", cpuNanos); @@ -240,6 +261,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; DriverStatus that = (DriverStatus) o; return sessionId.equals(that.sessionId) + && taskDescription.equals(that.taskDescription) && started == that.started && lastUpdated == that.lastUpdated && cpuNanos == that.cpuNanos @@ -252,7 +274,18 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps); + return Objects.hash( + sessionId, + taskDescription, + started, + lastUpdated, + cpuNanos, + iterations, + status, + completedOperators, + activeOperators, + sleeps + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java index a5061b8cf6d32..41b319be6c5fa 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/OperatorTests.java @@ -123,7 +123,7 @@ public void testQueryOperator() throws IOException { } }); DriverContext driverContext = driverContext(); - drivers.add(new Driver(driverContext, factory.get(driverContext), List.of(), docCollector, () -> {})); + drivers.add(new Driver("test", driverContext, factory.get(driverContext), List.of(), docCollector, () -> {})); } OperatorTestCase.runDriver(drivers); Set expectedDocIds = searchForDocIds(reader, query); @@ -215,6 +215,7 @@ public String toString() { ) ); Driver driver = new Driver( + "test", driverContext, luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT).get(driverContext), operators, @@ -248,6 +249,7 @@ public void testLimitOperator() { DriverContext driverContext = driverContext(); try ( var driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100), List.of((new LimitOperator.Factory(limit)).get(driverContext)), @@ -335,6 +337,7 @@ public void testHashLookup() { var actualPrimeOrds = new ArrayList<>(); try ( var driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), values, 100), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java index 3eaf85c27e596..cea6b6a2a85a9 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/AggregatorFunctionTestCase.java @@ -111,6 +111,7 @@ public final void testIgnoresNulls() { try ( Driver d = new Driver( + "test", driverContext, new NullInsertingSourceOperator(new CannedSourceOperator(input.iterator()), blockFactory), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java index 5bd9ecc931cf2..67dcf4e78d13f 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctIntAggregatorFunctionTests.java @@ -65,6 +65,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java index 70662efae688f..b136d302ccfbd 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/CountDistinctLongAggregatorFunctionTests.java @@ -66,6 +66,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java index 003dc415c6194..4d94d4d2e0296 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumDoubleAggregatorFunctionTests.java @@ -53,6 +53,7 @@ public void testOverflowSucceeds() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(Double.MAX_VALUE - 1, 2)), List.of(simple().get(driverContext)), @@ -71,6 +72,7 @@ public void testSummationAccuracy() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator( driverContext.blockFactory(), @@ -100,6 +102,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(values)), List.of(simple().get(driverContext)), @@ -122,6 +125,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)), List.of(simple().get(driverContext)), @@ -141,6 +145,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceDoubleBlockSourceOperator(driverContext.blockFactory(), DoubleStream.of(largeValues)), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java index 521c1e261cc62..c7a9fb75404f8 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumFloatAggregatorFunctionTests.java @@ -53,6 +53,7 @@ public void testOverflowSucceeds() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(Float.MAX_VALUE - 1, 2f)), List.of(simple().get(driverContext)), @@ -71,6 +72,7 @@ public void testSummationAccuracy() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator( driverContext.blockFactory(), @@ -100,6 +102,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(values)), List.of(simple().get(driverContext)), @@ -122,6 +125,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(largeValues)), List.of(simple().get(driverContext)), @@ -141,6 +145,7 @@ public void testSummationAccuracy() { driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceFloatBlockSourceOperator(driverContext.blockFactory(), Stream.of(largeValues)), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java index 8c5e4430128b7..365b9cc75e01c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumIntAggregatorFunctionTests.java @@ -52,6 +52,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java index 00cdbedef54d6..4821c72229d88 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/SumLongAggregatorFunctionTests.java @@ -51,6 +51,7 @@ public void testOverflowFails() { DriverContext driverContext = driverContext(); try ( Driver d = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), LongStream.of(Long.MAX_VALUE - 1, 2)), List.of(simple().get(driverContext)), @@ -68,6 +69,7 @@ public void testRejectsDouble() { BlockFactory blockFactory = driverContext.blockFactory(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(Iterators.single(new Page(blockFactory.newDoubleArrayVector(new double[] { 1.0 }, 1).asBlock()))), List.of(simple().get(driverContext)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java index 914d29bb8ba25..d0a1fc1e29590 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizeBlockHashTests.java @@ -416,6 +416,7 @@ public void testCategorize_withDriver() { List intermediateOutput = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, new LocalSourceOperator(input1), List.of( @@ -436,6 +437,7 @@ public void testCategorize_withDriver() { runDriver(driver); driver = new Driver( + "test", driverContext, new LocalSourceOperator(input2), List.of( @@ -458,6 +460,7 @@ public void testCategorize_withDriver() { List finalOutput = new ArrayList<>(); driver = new Driver( + "test", driverContext, new CannedSourceOperator(intermediateOutput.iterator()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java index 5f868f51f06e2..17f41e27703f3 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/aggregation/blockhash/CategorizePackedValuesBlockHashTests.java @@ -137,6 +137,7 @@ public void testCategorize_withDriver() { List intermediateOutput = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, new LocalSourceOperator(input1), List.of( @@ -154,6 +155,7 @@ public void testCategorize_withDriver() { runDriver(driver); driver = new Driver( + "test", driverContext, new LocalSourceOperator(input2), List.of( @@ -173,6 +175,7 @@ public void testCategorize_withDriver() { List finalOutput = new ArrayList<>(); driver = new Driver( + "test", driverContext, new CannedSourceOperator(intermediateOutput.iterator()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java index 1f5b5bf9b9337..61c7582c74245 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneCountOperatorTests.java @@ -151,7 +151,7 @@ private void testCount(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java index b65da5aba7588..f6fba20a28889 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMaxOperatorTestCase.java @@ -166,7 +166,7 @@ private void testMax(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java index f57bbd8c5ddb5..3033efa50f373 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneMinOperatorTestCase.java @@ -166,7 +166,7 @@ private void testMin(Supplier contexts, int size, int limit) { int taskConcurrency = between(1, 8); for (int i = 0; i < taskConcurrency; i++) { DriverContext ctx = contexts.get(); - drivers.add(new Driver(ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); + drivers.add(new Driver("test", ctx, factory.get(ctx), List.of(), new TestResultPageSinkOperator(results::add), () -> {})); } OperatorTestCase.runDriver(drivers); assertThat(results.size(), lessThanOrEqualTo(taskConcurrency)); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryExpressionEvaluatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryExpressionEvaluatorTests.java index 54b33732aa425..4a628d596f142 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryExpressionEvaluatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneQueryExpressionEvaluatorTests.java @@ -210,6 +210,7 @@ private List runQuery(Set values, Query query, boolean shuffleDocs operators.add(new EvalOperator(blockFactory, luceneQueryEvaluator)); List results = new ArrayList<>(); Driver driver = new Driver( + "test", driverContext, luceneOperatorFactory(reader, new MatchAllDocsQuery(), LuceneOperator.NO_LIMIT, scoring).get(driverContext), operators, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java index b7114bb4e9b54..574f9b25ff146 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneSourceOperatorTests.java @@ -160,7 +160,7 @@ private void testSimple(DriverContext ctx, int size, int limit) { List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java index 20af40bcc6840..3af21ba37d088 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorScoringTests.java @@ -127,7 +127,7 @@ protected void testSimple(DriverContext ctx, int size, int limit) { List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java index a6d652d499d84..92eaa78eedcd7 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/LuceneTopNSourceOperatorTests.java @@ -187,7 +187,7 @@ protected void testSimple(DriverContext ctx, int size, int limit) { List results = new ArrayList<>(); OperatorTestCase.runDriver( - new Driver(ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) + new Driver("test", ctx, factory.get(ctx), List.of(readS.get(ctx)), new TestResultPageSinkOperator(results::add), () -> {}) ); OperatorTestCase.assertDriverContext(ctx); diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java index feba401d445e7..934fbcc0b897e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/TimeSeriesSortedSourceOperatorTests.java @@ -174,6 +174,7 @@ record Doc(int host, long timestamp, long metric) {} var metricField = new NumberFieldMapper.NumberFieldType("metric", NumberFieldMapper.NumberType.LONG); OperatorTestCase.runDriver( new Driver( + "test", driverContext, timeSeriesFactory.get(driverContext), List.of(ValuesSourceReaderOperatorTests.factory(reader, metricField, ElementType.LONG).get(driverContext)), @@ -248,6 +249,7 @@ public void testMatchNone() throws Exception { List results = new ArrayList<>(); OperatorTestCase.runDriver( new Driver( + "test", driverContext, timeSeriesFactory.get(driverContext), List.of(), @@ -306,6 +308,7 @@ List runDriver(int limit, int maxPageSize, boolean forceMerge, int numTime var hostnameField = new KeywordFieldMapper.KeywordFieldType("hostname"); OperatorTestCase.runDriver( new Driver( + "test", ctx, timeSeriesFactory.get(ctx), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java index 910541607d83f..32164c7954dda 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValueSourceReaderTypeConversionTests.java @@ -1299,6 +1299,7 @@ public void testWithNulls() throws IOException { var vsShardContext = new ValuesSourceReaderOperator.ShardContext(reader(indexKey), () -> SourceLoader.FROM_STORED_SOURCE); try ( Driver driver = new Driver( + "test", driverContext, luceneFactory.get(driverContext), List.of( @@ -1376,6 +1377,7 @@ public void testNullsShared() { int[] pages = new int[] { 0 }; try ( Driver d = new Driver( + "test", driverContext, simpleInput(driverContext, 10), List.of( @@ -1497,6 +1499,7 @@ protected final List drive(List operators, Iterator input, boolean success = false; try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input), operators, @@ -1524,6 +1527,7 @@ public static void runDriver(List drivers) { for (int i = 0; i < dummyDrivers; i++) { drivers.add( new Driver( + "test", "dummy-session", 0, 0, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java index 2661ff665831f..07a66a473f3b1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/lucene/ValuesSourceReaderOperatorTests.java @@ -1307,6 +1307,7 @@ public void testWithNulls() throws IOException { ); try ( Driver driver = new Driver( + "test", driverContext, luceneFactory.get(driverContext), List.of( @@ -1409,6 +1410,7 @@ public void testNullsShared() { int[] pages = new int[] { 0 }; try ( Driver d = new Driver( + "test", driverContext, simpleInput(driverContext.blockFactory(), 10), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java index f017fed16cc96..e94864b9530bc 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/AsyncOperatorTests.java @@ -165,7 +165,14 @@ public void doClose() { } }); PlainActionFuture future = new PlainActionFuture<>(); - Driver driver = new Driver(driverContext, sourceOperator, intermediateOperators, outputOperator, () -> assertFalse(it.hasNext())); + Driver driver = new Driver( + "test", + driverContext, + sourceOperator, + intermediateOperators, + outputOperator, + () -> assertFalse(it.hasNext()) + ); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 10000), future); future.actionGet(); Releasables.close(localBreaker); @@ -295,7 +302,7 @@ protected void doClose() { }; SinkOperator outputOperator = new PageConsumerOperator(Page::releaseBlocks); PlainActionFuture future = new PlainActionFuture<>(); - Driver driver = new Driver(driverContext, sourceOperator, List.of(asyncOperator), outputOperator, localBreaker); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(asyncOperator), outputOperator, localBreaker); Driver.start(threadPool.getThreadContext(), threadPool.executor(ESQL_TEST_EXECUTOR), driver, between(1, 1000), future); assertBusy(() -> assertTrue(future.isDone())); if (failed.get()) { diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java index 27083ea0fcd13..a39aa10af5f31 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverProfileTests.java @@ -27,6 +27,7 @@ public class DriverProfileTests extends AbstractWireSerializingTestCase { public void testToXContent() { DriverProfile status = new DriverProfile( + "test", 123413220000L, 123413243214L, 10012, @@ -44,6 +45,7 @@ public void testToXContent() { ); assertThat(Strings.toString(status, true, true), equalTo(""" { + "task_description" : "test", "start" : "1973-11-29T09:27:00.000Z", "start_millis" : 123413220000, "stop" : "1973-11-29T09:27:23.214Z", @@ -101,6 +103,7 @@ protected Writeable.Reader instanceReader() { @Override protected DriverProfile createTestInstance() { return new DriverProfile( + DriverStatusTests.randomTaskDescription(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -113,6 +116,7 @@ protected DriverProfile createTestInstance() { @Override protected DriverProfile mutateInstance(DriverProfile instance) throws IOException { + String taskDescription = instance.taskDescription(); long startMillis = instance.startMillis(); long stopMillis = instance.stopMillis(); long tookNanos = instance.tookNanos(); @@ -120,17 +124,18 @@ protected DriverProfile mutateInstance(DriverProfile instance) throws IOExceptio long iterations = instance.iterations(); var operators = instance.operators(); var sleeps = instance.sleeps(); - switch (between(0, 6)) { - case 0 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); - case 1 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); - case 2 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong); - case 3 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); - case 4 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); - case 5 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses); - case 6 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); + switch (between(0, 7)) { + case 0 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription); + case 1 -> startMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); + case 2 -> stopMillis = randomValueOtherThan(startMillis, ESTestCase::randomNonNegativeLong); + case 3 -> tookNanos = randomValueOtherThan(tookNanos, ESTestCase::randomNonNegativeLong); + case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); + case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); + case 6 -> operators = randomValueOtherThan(operators, DriverStatusTests::randomOperatorStatuses); + case 7 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); default -> throw new UnsupportedOperationException(); } - return new DriverProfile(startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); + return new DriverProfile(taskDescription, startMillis, stopMillis, tookNanos, cpuNanos, iterations, operators, sleeps); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java index b46d9f3f4add7..83deb57a3ba7c 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverStatusTests.java @@ -32,6 +32,7 @@ public class DriverStatusTests extends AbstractWireSerializingTestCase instanceReader() { protected DriverStatus createTestInstance() { return new DriverStatus( randomSessionId(), + randomTaskDescription(), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), @@ -129,6 +132,10 @@ private String randomSessionId() { return RandomStrings.randomAsciiLettersOfLengthBetween(random(), 1, 15); } + public static String randomTaskDescription() { + return RandomStrings.randomAsciiLettersOfLength(random(), 5); + } + private DriverStatus.Status randomStatus() { return randomFrom(DriverStatus.Status.values()); } @@ -150,6 +157,7 @@ private static DriverStatus.OperatorStatus randomOperatorStatus() { @Override protected DriverStatus mutateInstance(DriverStatus instance) throws IOException { var sessionId = instance.sessionId(); + var taskDescription = instance.taskDescription(); long started = instance.started(); long lastUpdated = instance.lastUpdated(); long cpuNanos = instance.cpuNanos(); @@ -158,19 +166,31 @@ protected DriverStatus mutateInstance(DriverStatus instance) throws IOException var completedOperators = instance.completedOperators(); var activeOperators = instance.activeOperators(); var sleeps = instance.sleeps(); - switch (between(0, 8)) { + switch (between(0, 9)) { case 0 -> sessionId = randomValueOtherThan(sessionId, this::randomSessionId); - case 1 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong); - case 2 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong); - case 3 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); - case 4 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); - case 5 -> status = randomValueOtherThan(status, this::randomStatus); - case 6 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses); - case 7 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses); - case 8 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); + case 1 -> taskDescription = randomValueOtherThan(taskDescription, DriverStatusTests::randomTaskDescription); + case 2 -> started = randomValueOtherThan(started, ESTestCase::randomNonNegativeLong); + case 3 -> lastUpdated = randomValueOtherThan(lastUpdated, ESTestCase::randomNonNegativeLong); + case 4 -> cpuNanos = randomValueOtherThan(cpuNanos, ESTestCase::randomNonNegativeLong); + case 5 -> iterations = randomValueOtherThan(iterations, ESTestCase::randomNonNegativeLong); + case 6 -> status = randomValueOtherThan(status, this::randomStatus); + case 7 -> completedOperators = randomValueOtherThan(completedOperators, DriverStatusTests::randomOperatorStatuses); + case 8 -> activeOperators = randomValueOtherThan(activeOperators, DriverStatusTests::randomOperatorStatuses); + case 9 -> sleeps = randomValueOtherThan(sleeps, DriverSleepsTests::randomDriverSleeps); default -> throw new UnsupportedOperationException(); } - return new DriverStatus(sessionId, started, lastUpdated, cpuNanos, iterations, status, completedOperators, activeOperators, sleeps); + return new DriverStatus( + sessionId, + taskDescription, + started, + lastUpdated, + cpuNanos, + iterations, + status, + completedOperators, + activeOperators, + sleeps + ); } @Override diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java index e715b94bc55e5..48a566994b2f5 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/DriverTests.java @@ -67,6 +67,7 @@ public void testProfileAndStatusFinishInOneRound() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -116,6 +117,7 @@ public void testProfileAndStatusOneIterationAtATime() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -166,6 +168,7 @@ public void testProfileAndStatusTimeout() { Driver driver = new Driver( "unset", + "test", startEpoch, startNanos, driverContext, @@ -231,7 +234,7 @@ public void testThreadContext() throws Exception { WarningsOperator warning1 = new WarningsOperator(threadPool); WarningsOperator warning2 = new WarningsOperator(threadPool); CyclicBarrier allPagesProcessed = new CyclicBarrier(2); - Driver driver = new Driver(driverContext, new CannedSourceOperator(inPages.iterator()) { + Driver driver = new Driver("test", driverContext, new CannedSourceOperator(inPages.iterator()) { @Override public Page getOutput() { assertRunningWithRegularUser(threadPool); @@ -315,7 +318,7 @@ public void close() { } }); - Driver driver = new Driver(driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {}); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(delayOperator), sinkOperator, () -> {}); ThreadContext threadContext = threadPool.getThreadContext(); PlainActionFuture future = new PlainActionFuture<>(); @@ -336,7 +339,7 @@ public void testResumeOnEarlyFinish() throws Exception { var sinkHandler = new ExchangeSinkHandler(driverContext.blockFactory(), between(1, 5), System::currentTimeMillis); var sourceOperator = new ExchangeSourceOperator(sourceHandler.createExchangeSource()); var sinkOperator = new ExchangeSinkOperator(sinkHandler.createExchangeSink(() -> {}), Function.identity()); - Driver driver = new Driver(driverContext, sourceOperator, List.of(), sinkOperator, () -> {}); + Driver driver = new Driver("test", driverContext, sourceOperator, List.of(), sinkOperator, () -> {}); PlainActionFuture future = new PlainActionFuture<>(); Driver.start(threadPool.getThreadContext(), threadPool.executor("esql"), driver, between(1, 1000), future); assertBusy( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java index 744121a3807c3..6b036dea5f749 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/ForkingOperatorTestCase.java @@ -68,6 +68,7 @@ public final void testInitialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input.iterator()), List.of(simpleWithMode(AggregatorMode.INITIAL).get(driverContext), simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -89,6 +90,7 @@ public final void testManyInitialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(partials.iterator()), List.of(simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -110,6 +112,7 @@ public final void testInitialIntermediateFinal() { try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input.iterator()), List.of( @@ -142,6 +145,7 @@ public final void testManyInitialManyPartialFinal() { List results = new ArrayList<>(); try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(intermediates.iterator()), List.of(simpleWithMode(AggregatorMode.FINAL).get(driverContext)), @@ -240,6 +244,7 @@ List createDriversForInput(List input, List results, boolean DriverContext driver1Context = driverContext(); drivers.add( new Driver( + "test", driver1Context, new CannedSourceOperator(pages.iterator()), List.of( @@ -257,6 +262,7 @@ List createDriversForInput(List input, List results, boolean DriverContext driver2Context = driverContext(); drivers.add( new Driver( + "test", driver2Context, new ExchangeSourceOperator(sourceExchanger.createExchangeSource()), List.of( diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java index afd4695db932f..b960a12e6f90e 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/TimeSeriesAggregationOperatorTests.java @@ -303,6 +303,7 @@ public void close() { List results = new ArrayList<>(); OperatorTestCase.runDriver( new Driver( + "test", ctx, sourceOperatorFactory.get(ctx), CollectionUtils.concatLists(intermediateOperators, List.of(intialAgg, intermediateAgg, finalAgg)), diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java index d5c6d196a1b99..2edf156f92da1 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/exchange/ExchangeServiceTests.java @@ -300,6 +300,7 @@ Set runConcurrentTest( DriverContext dc = driverContext(); Driver d = new Driver( "test-session:1", + "test", 0, 0, dc, @@ -318,6 +319,7 @@ Set runConcurrentTest( DriverContext dc = driverContext(); Driver d = new Driver( "test-session:2", + "test", 0, 0, dc, diff --git a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java index e63e8b63d6ee9..49d91df556d14 100644 --- a/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java +++ b/x-pack/plugin/esql/compute/src/test/java/org/elasticsearch/compute/operator/topn/TopNOperatorTests.java @@ -543,6 +543,7 @@ public void testCollectAllValues() { List> actualTop = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -633,6 +634,7 @@ public void testCollectAllValues_RandomMultiValues() { List> actualTop = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -668,6 +670,7 @@ private List> topNTwoColumns( List> outputValues = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new TupleBlockSourceOperator(driverContext.blockFactory(), inputValues, randomIntBetween(1, 1000)), List.of( @@ -938,6 +941,7 @@ private void assertSortingOnMV( int topCount = randomIntBetween(1, values.size()); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(page).iterator()), List.of( @@ -1112,6 +1116,7 @@ public void testIPSortingSingleValue() throws UnknownHostException { List> actual = new ArrayList<>(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(builder.build())).iterator()), List.of( @@ -1239,6 +1244,7 @@ private void assertIPSortingOnMultiValues( DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(builder.build())).iterator()), List.of( @@ -1327,6 +1333,7 @@ public void testZeroByte() { DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new CannedSourceOperator(List.of(new Page(blocks.toArray(Block[]::new))).iterator()), List.of( @@ -1367,6 +1374,7 @@ public void testErrorBeforeFullyDraining() { DriverContext driverContext = driverContext(); try ( Driver driver = new Driver( + "test", driverContext, new SequenceLongBlockSourceOperator(driverContext.blockFactory(), LongStream.range(0, docCount)), List.of( diff --git a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java index a46dca4ae38cf..d9fca11ecdcf2 100644 --- a/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java +++ b/x-pack/plugin/esql/compute/test/src/main/java/org/elasticsearch/compute/test/OperatorTestCase.java @@ -190,6 +190,7 @@ protected final List oneDriverPerPageList(Iterator> source, Sup List in = source.next(); try ( Driver d = new Driver( + "test", driverContext(), new CannedSourceOperator(in.iterator()), operators.get(), @@ -264,6 +265,7 @@ protected final List drive(List operators, Iterator input, boolean success = false; try ( Driver d = new Driver( + "test", driverContext, new CannedSourceOperator(input), operators, @@ -291,6 +293,7 @@ public static void runDriver(List drivers) { for (int i = 0; i < dummyDrivers; i++) { drivers.add( new Driver( + "test", "dummy-session", 0, 0, diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java index 601ce819224b5..58c82d800954c 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/RestEsqlIT.java @@ -40,8 +40,10 @@ import static org.elasticsearch.test.ListMatcher.matchesList; import static org.elasticsearch.test.MapMatcher.assertMap; import static org.elasticsearch.test.MapMatcher.matchesMap; +import static org.hamcrest.Matchers.any; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.either; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.hasItem; @@ -287,7 +289,6 @@ public void testProfile() throws IOException { equalTo(List.of(List.of(499.5d))) ); - List> signatures = new ArrayList<>(); @SuppressWarnings("unchecked") List> profiles = (List>) ((Map) result.get("profile")).get("drivers"); for (Map p : profiles) { @@ -299,26 +300,34 @@ public void testProfile() throws IOException { for (Map o : operators) { sig.add(checkOperatorProfile(o)); } - signatures.add(sig); + String taskDescription = p.get("task_description").toString(); + switch (taskDescription) { + case "data" -> assertMap( + sig, + matchesList().item("LuceneSourceOperator") + .item("ValuesSourceReaderOperator") + .item("AggregationOperator") + .item("ExchangeSinkOperator") + ); + case "node_reduce" -> assertThat( + sig, + either(matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator")).or( + matchesList().item("ExchangeSourceOperator").item("AggregationOperator").item("ExchangeSinkOperator") + ) + ); + case "final" -> assertMap( + sig, + matchesList().item("ExchangeSourceOperator") + .item("AggregationOperator") + .item("ProjectOperator") + .item("LimitOperator") + .item("EvalOperator") + .item("ProjectOperator") + .item("OutputOperator") + ); + default -> throw new IllegalArgumentException("can't match " + taskDescription); + } } - var readProfile = matchesList().item("LuceneSourceOperator") - .item("ValuesSourceReaderOperator") - .item("AggregationOperator") - .item("ExchangeSinkOperator"); - var mergeProfile = matchesList().item("ExchangeSourceOperator") - .item("AggregationOperator") - .item("ProjectOperator") - .item("LimitOperator") - .item("EvalOperator") - .item("ProjectOperator") - .item("OutputOperator"); - var emptyReduction = matchesList().item("ExchangeSourceOperator").item("ExchangeSinkOperator"); - var reduction = matchesList().item("ExchangeSourceOperator").item("AggregationOperator").item("ExchangeSinkOperator"); - assertThat( - signatures, - Matchers.either(containsInAnyOrder(readProfile, reduction, mergeProfile)) - .or(containsInAnyOrder(readProfile, emptyReduction, mergeProfile)) - ); } public void testProfileOrdinalsGroupingOperator() throws IOException { @@ -391,6 +400,7 @@ public void testInlineStatsProfile() throws IOException { } signatures.add(sig); } + // TODO adapt this to use task_description once this is reenabled assertThat( signatures, containsInAnyOrder( @@ -491,10 +501,10 @@ public void testForceSleepsProfile() throws IOException { MapMatcher sleepMatcher = matchesMap().entry("reason", "exchange empty") .entry("sleep_millis", greaterThan(0L)) .entry("wake_millis", greaterThan(0L)); - if (operators.contains("LuceneSourceOperator")) { - assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of())); - } else if (operators.contains("ExchangeSourceOperator")) { - if (operators.contains("ExchangeSinkOperator")) { + String taskDescription = p.get("task_description").toString(); + switch (taskDescription) { + case "data" -> assertMap(sleeps, matchesMap().entry("counts", Map.of()).entry("first", List.of()).entry("last", List.of())); + case "node_reduce" -> { assertMap(sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", greaterThan(0))).extraOk()); @SuppressWarnings("unchecked") List> first = (List>) sleeps.get("first"); @@ -506,8 +516,8 @@ public void testForceSleepsProfile() throws IOException { for (Map s : last) { assertMap(s, sleepMatcher); } - - } else { + } + case "final" -> { assertMap( sleeps, matchesMap().entry("counts", matchesMap().entry("exchange empty", 1)) @@ -515,14 +525,14 @@ public void testForceSleepsProfile() throws IOException { .entry("last", List.of(sleepMatcher)) ); } - } else { - fail("unknown signature: " + operators); + default -> throw new IllegalArgumentException("unknown task: " + taskDescription); } } } private MapMatcher commonProfile() { - return matchesMap().entry("start_millis", greaterThan(0L)) + return matchesMap().entry("task_description", any(String.class)) + .entry("start_millis", greaterThan(0L)) .entry("stop_millis", greaterThan(0L)) .entry("iterations", greaterThan(0L)) .entry("cpu_nanos", greaterThan(0L)) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java index 85c03ce7860d3..b15e4cfe739f0 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/EsqlActionTaskIT.java @@ -38,6 +38,7 @@ import org.elasticsearch.transport.TransportService; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.hamcrest.Matcher; import org.junit.Before; import java.io.IOException; @@ -75,9 +76,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { private static final Logger LOGGER = LogManager.getLogger(EsqlActionTaskIT.class); - private String READ_DESCRIPTION; - private String MERGE_DESCRIPTION; - private String REDUCE_DESCRIPTION; private boolean nodeLevelReduction; /** @@ -89,21 +87,6 @@ public class EsqlActionTaskIT extends AbstractPausableIntegTestCase { public void setup() { assumeTrue("requires query pragmas", canUseQueryPragmas()); nodeLevelReduction = randomBoolean(); - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, scoreMode = COMPLETE_NO_SCORES] - \\_ValuesSourceReaderOperator[fields = [pause_me]] - \\_AggregationOperator[mode = INITIAL, aggs = sum of longs] - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_AggregationOperator[mode = FINAL, aggs = sum of longs] - \\_ProjectOperator[projection = [0]] - \\_LimitOperator[limit = 1000] - \\_OutputOperator[columns = [sum(pause_me)]]"""; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]\n" : "") - + "\\_ExchangeSinkOperator"; - } public void testTaskContents() throws Exception { @@ -120,9 +103,11 @@ public void testTaskContents() throws Exception { for (TaskInfo task : foundTasks) { DriverStatus status = (DriverStatus) task.status(); assertThat(status.sessionId(), not(emptyOrNullString())); + String taskDescription = status.taskDescription(); for (DriverStatus.OperatorStatus o : status.activeOperators()) { logger.info("status {}", o); if (o.operator().startsWith("LuceneSourceOperator[maxPageSize = " + pageSize())) { + assertThat(taskDescription, equalTo("data")); LuceneSourceOperator.Status oStatus = (LuceneSourceOperator.Status) o.status(); assertThat(oStatus.processedSlices(), lessThanOrEqualTo(oStatus.totalSlices())); assertThat(oStatus.processedQueries(), equalTo(Set.of("*:*"))); @@ -142,6 +127,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ValuesSourceReaderOperator[fields = [pause_me]]")) { + assertThat(taskDescription, equalTo("data")); ValuesSourceReaderOperator.Status oStatus = (ValuesSourceReaderOperator.Status) o.status(); assertMap( oStatus.readersBuilt(), @@ -152,6 +138,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ExchangeSourceOperator")) { + assertThat(taskDescription, either(equalTo("node_reduce")).or(equalTo("final"))); ExchangeSourceOperator.Status oStatus = (ExchangeSourceOperator.Status) o.status(); assertThat(oStatus.pagesWaiting(), greaterThanOrEqualTo(0)); assertThat(oStatus.pagesEmitted(), greaterThanOrEqualTo(0)); @@ -159,6 +146,7 @@ public void testTaskContents() throws Exception { continue; } if (o.operator().equals("ExchangeSinkOperator")) { + assertThat(taskDescription, either(equalTo("data")).or(equalTo("node_reduce"))); ExchangeSinkOperator.Status oStatus = (ExchangeSinkOperator.Status) o.status(); assertThat(oStatus.pagesReceived(), greaterThanOrEqualTo(0)); exchangeSinks++; @@ -169,6 +157,29 @@ public void testTaskContents() throws Exception { assertThat(valuesSourceReaders, equalTo(1)); assertThat(exchangeSinks, greaterThanOrEqualTo(1)); assertThat(exchangeSources, equalTo(2)); + assertThat( + dataTasks(foundTasks).get(0).description(), + equalTo( + """ + \\_LuceneSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_AggregationOperator[mode = INITIAL, aggs = sum of longs] + \\_ExchangeSinkOperator""".replace( + "sourceStatus", + "dataPartitioning = SHARD, maxPageSize = " + pageSize() + ", limit = 2147483647, scoreMode = COMPLETE_NO_SCORES" + ) + ) + ); + assertThat( + nodeReduceTasks(foundTasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(foundTasks, "\\_AggregationOperator[mode = INTERMEDIATE, aggs = sum of longs]\n") + ); + assertThat(coordinatorTasks(foundTasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_AggregationOperator[mode = FINAL, aggs = sum of longs] + \\_ProjectOperator[projection = [0]] + \\_LimitOperator[limit = 1000] + \\_OutputOperator[columns = [sum(pause_me)]]""")); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -181,7 +192,7 @@ public void testCancelRead() throws Exception { ActionFuture response = startEsql(); try { List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).findFirst().get(); + TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).findFirst().get(); cancelTask(running.taskId()); assertCancelled(response); } finally { @@ -193,7 +204,7 @@ public void testCancelMerge() throws Exception { ActionFuture response = startEsql(); try { List infos = getTasksStarting(); - TaskInfo running = infos.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).findFirst().get(); + TaskInfo running = infos.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).findFirst().get(); cancelTask(running.taskId()); assertCancelled(response); } finally { @@ -277,8 +288,8 @@ private List getTasksStarting() throws Exception { for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); DriverStatus status = (DriverStatus) task.status(); - logger.info("task {} {}", task.description(), status); - assertThat(task.description(), anyOf(equalTo(READ_DESCRIPTION), equalTo(MERGE_DESCRIPTION), equalTo(REDUCE_DESCRIPTION))); + logger.info("task {} {} {}", status.taskDescription(), task.description(), status); + assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final"))); /* * Accept tasks that are either starting or have gone * immediately async. The coordinating task is likely @@ -302,8 +313,8 @@ private List getTasksRunning() throws Exception { for (TaskInfo task : tasks) { assertThat(task.action(), equalTo(DriverTaskRunner.ACTION_NAME)); DriverStatus status = (DriverStatus) task.status(); - assertThat(task.description(), anyOf(equalTo(READ_DESCRIPTION), equalTo(MERGE_DESCRIPTION), equalTo(REDUCE_DESCRIPTION))); - if (task.description().equals(READ_DESCRIPTION)) { + assertThat(status.taskDescription(), anyOf(equalTo("data"), equalTo("node_reduce"), equalTo("final"))); + if (status.taskDescription().equals("data")) { assertThat(status.status(), equalTo(DriverStatus.Status.RUNNING)); } else { assertThat(status.status(), equalTo(DriverStatus.Status.ASYNC)); @@ -328,23 +339,26 @@ private List getDriverTasks() throws Exception { .get() .getTasks(); assertThat(tasks, hasSize(equalTo(3))); - List readTasks = tasks.stream().filter(t -> t.description().equals(READ_DESCRIPTION)).toList(); - List mergeTasks = tasks.stream().filter(t -> t.description().equals(MERGE_DESCRIPTION)).toList(); - assertThat(readTasks, hasSize(1)); - assertThat(mergeTasks, hasSize(1)); - // node-level reduction is disabled when the target data node is also the coordinator - if (readTasks.get(0).node().equals(mergeTasks.get(0).node())) { - REDUCE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_ExchangeSinkOperator"""; - } - List reduceTasks = tasks.stream().filter(t -> t.description().equals(REDUCE_DESCRIPTION)).toList(); - assertThat(reduceTasks, hasSize(1)); + assertThat(dataTasks(tasks), hasSize(1)); + assertThat(nodeReduceTasks(tasks), hasSize(1)); + assertThat(coordinatorTasks(tasks), hasSize(1)); foundTasks.addAll(tasks); }); return foundTasks; } + private List dataTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("data")).toList(); + } + + private List nodeReduceTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("node_reduce")).toList(); + } + + private List coordinatorTasks(List tasks) { + return tasks.stream().filter(t -> ((DriverStatus) t.status()).taskDescription().equals("final")).toList(); + } + private void assertCancelled(ActionFuture response) throws Exception { Exception e = expectThrows(Exception.class, response); Throwable cancelException = ExceptionsHelper.unwrap(e, TaskCancelledException.class); @@ -477,30 +491,41 @@ protected void doRun() throws Exception { } public void testTaskContentsForTopNQuery() throws Exception { - READ_DESCRIPTION = ("\\_LuceneTopNSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 1000, " - + "scoreMode = TOP_DOCS, " - + "sorts = [{\"pause_me\":{\"order\":\"asc\",\"missing\":\"_last\",\"unmapped_type\":\"long\"}}]]\n" - + "\\_ValuesSourceReaderOperator[fields = [pause_me]]\n" - + "\\_ProjectOperator[projection = [1]]\n" - + "\\_ExchangeSinkOperator").replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " - + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" - + "\\_ProjectOperator[projection = [0]]\n" - + "\\_OutputOperator[columns = [pause_me]]"; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction - ? "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " - + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" - : "") - + "\\_ExchangeSinkOperator"; - ActionFuture response = startEsql("from test | sort pause_me | keep pause_me"); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize()); - getTasksRunning(); + List tasks = getTasksRunning(); + String sortStatus = """ + [{"pause_me":{"order":"asc","missing":"_last","unmapped_type":"long"}}]"""; + String sourceStatus = "dataPartitioning = SHARD, maxPageSize = " + + pageSize() + + ", limit = 1000, scoreMode = TOP_DOCS, sorts = " + + sortStatus; + assertThat(dataTasks(tasks).get(0).description(), equalTo(""" + \\_LuceneTopNSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_ProjectOperator[projection = [1]] + \\_ExchangeSinkOperator""".replace("sourceStatus", sourceStatus))); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher( + tasks, + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " + + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" + ) + ); + assertThat( + coordinatorTasks(tasks).get(0).description(), + equalTo( + "\\_ExchangeSourceOperator[]\n" + + "\\_TopNOperator[count=1000, elementTypes=[LONG], encoders=[DefaultSortable], " + + "sortOrders=[SortOrder[channel=0, asc=true, nullsFirst=false]]]\n" + + "\\_ProjectOperator[projection = [0]]\n" + + "\\_OutputOperator[columns = [pause_me]]" + ) + ); } finally { // each scripted field "emit" is called by LuceneTopNSourceOperator and by ValuesSourceReaderOperator scriptPermits.release(2 * numberOfDocs()); @@ -512,26 +537,26 @@ public void testTaskContentsForTopNQuery() throws Exception { public void testTaskContentsForLimitQuery() throws Exception { String limit = Integer.toString(randomIntBetween(pageSize() + 1, 2 * numberOfDocs())); - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit(), scoreMode = COMPLETE_NO_SCORES] - \\_ValuesSourceReaderOperator[fields = [pause_me]] - \\_ProjectOperator[projection = [1]] - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_LimitOperator[limit = limit()] - \\_ProjectOperator[projection = [0]] - \\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit); - REDUCE_DESCRIPTION = ("\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_LimitOperator[limit = limit()]\n" : "") - + "\\_ExchangeSinkOperator").replace("limit()", limit); - ActionFuture response = startEsql("from test | keep pause_me | limit " + limit); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize() - prereleasedDocs); - getTasksRunning(); + List tasks = getTasksRunning(); + assertThat(dataTasks(tasks).get(0).description(), equalTo(""" + \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = limit(), scoreMode = COMPLETE_NO_SCORES] + \\_ValuesSourceReaderOperator[fields = [pause_me]] + \\_ProjectOperator[projection = [1]] + \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())).replace("limit()", limit))); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(tasks, "\\_LimitOperator[limit = " + limit + "]\n") + ); + assertThat(coordinatorTasks(tasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_LimitOperator[limit = limit()] + \\_ProjectOperator[projection = [0]] + \\_OutputOperator[columns = [pause_me]]""".replace("limit()", limit))); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -541,27 +566,35 @@ public void testTaskContentsForLimitQuery() throws Exception { } public void testTaskContentsForGroupingStatsQuery() throws Exception { - READ_DESCRIPTION = """ - \\_LuceneSourceOperator[dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, scoreMode = COMPLETE_NO_SCORES] - \\_ValuesSourceReaderOperator[fields = [foo]] - \\_OrdinalsGroupingOperator(aggs = max of longs) - \\_ExchangeSinkOperator""".replace("pageSize()", Integer.toString(pageSize())); - MERGE_DESCRIPTION = """ - \\_ExchangeSourceOperator[] - \\_HashAggregationOperator[mode = , aggs = max of longs] - \\_ProjectOperator[projection = [1, 0]] - \\_LimitOperator[limit = 1000] - \\_OutputOperator[columns = [max(foo), pause_me]]"""; - REDUCE_DESCRIPTION = "\\_ExchangeSourceOperator[]\n" - + (nodeLevelReduction ? "\\_HashAggregationOperator[mode = , aggs = max of longs]\n" : "") - + "\\_ExchangeSinkOperator"; - ActionFuture response = startEsql("from test | stats max(foo) by pause_me"); try { getTasksStarting(); logger.info("unblocking script"); scriptPermits.release(pageSize()); - getTasksRunning(); + List tasks = getTasksRunning(); + String sourceStatus = "dataPartitioning = SHARD, maxPageSize = pageSize(), limit = 2147483647, scoreMode = COMPLETE_NO_SCORES" + .replace("pageSize()", Integer.toString(pageSize())); + assertThat( + dataTasks(tasks).get(0).description(), + equalTo( + """ + \\_LuceneSourceOperator[sourceStatus] + \\_ValuesSourceReaderOperator[fields = [foo]] + \\_OrdinalsGroupingOperator(aggs = max of longs) + \\_ExchangeSinkOperator""".replace("sourceStatus", sourceStatus) + + ) + ); + assertThat( + nodeReduceTasks(tasks).get(0).description(), + nodeLevelReduceDescriptionMatcher(tasks, "\\_HashAggregationOperator[mode = , aggs = max of longs]\n") + ); + assertThat(coordinatorTasks(tasks).get(0).description(), equalTo(""" + \\_ExchangeSourceOperator[] + \\_HashAggregationOperator[mode = , aggs = max of longs] + \\_ProjectOperator[projection = [1, 0]] + \\_LimitOperator[limit = 1000] + \\_OutputOperator[columns = [max(foo), pause_me]]""")); } finally { scriptPermits.release(numberOfDocs()); try (EsqlQueryResponse esqlResponse = response.get()) { @@ -572,6 +605,13 @@ public void testTaskContentsForGroupingStatsQuery() throws Exception { } } + private Matcher nodeLevelReduceDescriptionMatcher(List tasks, String nodeReduce) { + boolean matchNodeReduction = nodeLevelReduction + // If the data node and the coordinator are the same node then we don't reduce aggs in it. + && false == dataTasks(tasks).get(0).node().equals(coordinatorTasks(tasks).get(0).node()); + return equalTo("\\_ExchangeSourceOperator[]\n" + (matchNodeReduction ? nodeReduce : "") + "\\_ExchangeSinkOperator"); + } + @Override protected Collection> nodePlugins() { return CollectionUtils.appendToCopy(super.nodePlugins(), MockTransportService.TestPlugin.class); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java index 15bbc06836def..1bbcc46c0555f 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/LookupFromIndexIT.java @@ -227,6 +227,7 @@ private void runLookup(DataType keyType, PopulateIndices populateIndices) throws DriverContext driverContext = driverContext(); try ( var driver = new Driver( + "test", driverContext, source.get(driverContext), List.of(reader.get(driverContext), lookup.get(driverContext)), diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java index cb2582db2ad33..d4d9372214753 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/enrich/AbstractLookupService.java @@ -327,6 +327,7 @@ private void doLookup(T request, CancellableTask task, ActionListener releasables.add(outputOperator); Driver driver = new Driver( "enrich-lookup:" + request.sessionId, + "enrich", System.currentTimeMillis(), System.nanoTime(), driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java index aa24ea113cb48..c4985b029cfcd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlanner.java @@ -169,7 +169,7 @@ public LocalExecutionPlanner( /** * turn the given plan into a list of drivers to execute */ - public LocalExecutionPlan plan(FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { + public LocalExecutionPlan plan(String taskDescription, FoldContext foldCtx, PhysicalPlan localPhysicalPlan) { var context = new LocalExecutionPlannerContext( new ArrayList<>(), new Holder<>(DriverParallelism.SINGLE), @@ -190,7 +190,7 @@ public LocalExecutionPlan plan(FoldContext foldCtx, PhysicalPlan localPhysicalPl final TimeValue statusInterval = configuration.pragmas().statusInterval(); context.addDriverFactory( new DriverFactory( - new DriverSupplier(context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings), + new DriverSupplier(taskDescription, context.bigArrays, context.blockFactory, physicalOperation, statusInterval, settings), context.driverParallelism().get() ) ); @@ -831,6 +831,7 @@ int pageSize(Integer estimatedRowSize) { } record DriverSupplier( + String taskDescription, BigArrays bigArrays, BlockFactory blockFactory, PhysicalOperation physicalOperation, @@ -857,6 +858,7 @@ public Driver apply(String sessionId) { success = true; return new Driver( sessionId, + taskDescription, System.currentTimeMillis(), System.nanoTime(), driverContext, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java index 19ed77405daa2..6c4a76dd0e38c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ClusterComputeHandler.java @@ -206,6 +206,7 @@ void runComputeOnRemoteCluster( parentTask, new ComputeContext( localSessionId, + "remote_reduce", clusterAlias, List.of(), configuration, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java index 82943d23581fd..86af106ea7e42 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeContext.java @@ -19,6 +19,7 @@ record ComputeContext( String sessionId, + String taskDescription, String clusterAlias, List searchContexts, Configuration configuration, diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 71c2a65037e9a..2d01616fe90c4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -155,6 +155,7 @@ public void execute( } var computeContext = new ComputeContext( newChildSession(sessionId), + "single", LOCAL_CLUSTER, List.of(), configuration, @@ -226,6 +227,7 @@ public void execute( rootTask, new ComputeContext( sessionId, + "final", LOCAL_CLUSTER, List.of(), configuration, @@ -394,7 +396,7 @@ public SourceProvider createSourceProvider() { // the planner will also set the driver parallelism in LocalExecutionPlanner.LocalExecutionPlan (used down below) // it's doing this in the planning of EsQueryExec (the source of the data) // see also EsPhysicalOperationProviders.sourcePhysicalOperation - LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.foldCtx(), plan); + LocalExecutionPlanner.LocalExecutionPlan localExecutionPlan = planner.plan(context.taskDescription(), context.foldCtx(), plan); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Local execution plan:\n{}", localExecutionPlan.describe()); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index 7020932819421..40a87fca4dc25 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -372,6 +372,7 @@ public void onFailure(Exception e) { assert ThreadPool.assertCurrentThreadPool(ThreadPool.Names.SEARCH, ESQL_WORKER_THREAD_POOL_NAME); var computeContext = new ComputeContext( sessionId, + "data", clusterAlias, searchContexts, configuration, @@ -435,6 +436,7 @@ private void runComputeOnDataNode( task, new ComputeContext( request.sessionId(), + "node_reduce", request.clusterAlias(), List.of(), request.configuration(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java index bae20bb9b26d3..3d03c008d9261 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/CsvTests.java @@ -639,6 +639,7 @@ void executeSubPlan( // replace fragment inside the coordinator plan List drivers = new ArrayList<>(); LocalExecutionPlan coordinatorNodeExecutionPlan = executionPlanner.plan( + "final", foldCtx, new OutputExec(coordinatorPlan, collectedPages::add) ); @@ -660,7 +661,7 @@ void executeSubPlan( throw new AssertionError("expected no failure", e); }) ); - LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan(foldCtx, csvDataNodePhysicalPlan); + LocalExecutionPlan dataNodeExecutionPlan = executionPlanner.plan("data", foldCtx, csvDataNodePhysicalPlan); drivers.addAll(dataNodeExecutionPlan.createDrivers(getTestName())); Randomness.shuffle(drivers); diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java index ebfe1c8147073..cc4e70632d678 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseProfileTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.action; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.compute.data.BlockWritables; @@ -49,6 +51,7 @@ private List randomDriverProfiles() { private DriverProfile randomDriverProfile() { return new DriverProfile( + RandomStrings.randomAsciiLettersOfLength(random(), 5), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(), diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java index 4fdb4a7bf042b..065495cbad937 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/action/EsqlQueryResponseTests.java @@ -723,6 +723,7 @@ public void testProfileXContent() { new EsqlQueryResponse.Profile( List.of( new DriverProfile( + "test", 1723489812649L, 1723489819929L, 20021, @@ -757,6 +758,7 @@ public void testProfileXContent() { "profile" : { "drivers" : [ { + "task_description" : "test", "start_millis" : 1723489812649, "stop_millis" : 1723489819929, "took_nanos" : 20021, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java index 78aaf1f354723..f7bdf52315fec 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/optimizer/PhysicalPlanOptimizerTests.java @@ -7604,7 +7604,7 @@ private LocalExecutionPlanner.LocalExecutionPlan physicalOperationsFromPhysicalP List.of() ); - return planner.plan(FoldContext.small(), plan); + return planner.plan("test", FoldContext.small(), plan); } private List> findFieldNamesInLookupJoinDescription(LocalExecutionPlanner.LocalExecutionPlan physicalOperations) { diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java index e1e606a6e84b1..7e5143d5a3ac0 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/planner/LocalExecutionPlannerTests.java @@ -84,6 +84,7 @@ public void closeIndex() throws IOException { public void testLuceneSourceOperatorHugeRowSize() throws IOException { int estimatedRowSize = randomEstimatedRowSize(estimatedRowSizeIsHuge); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, @@ -110,6 +111,7 @@ public void testLuceneTopNSourceOperator() throws IOException { EsQueryExec.FieldSort sort = new EsQueryExec.FieldSort(sortField, Order.OrderDirection.ASC, Order.NullsPosition.LAST); Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, @@ -136,6 +138,7 @@ public void testLuceneTopNSourceOperatorDistanceSort() throws IOException { EsQueryExec.GeoDistanceSort sort = new EsQueryExec.GeoDistanceSort(sortField, Order.OrderDirection.ASC, 1, -1); Literal limit = new Literal(Source.EMPTY, 10, DataType.INTEGER); LocalExecutionPlanner.LocalExecutionPlan plan = planner().plan( + "test", FoldContext.small(), new EsQueryExec( Source.EMPTY, diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java index 7db3216d1736d..f4deaa45f1f87 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/plugin/ComputeListenerTests.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.esql.plugin; +import com.carrotsearch.randomizedtesting.generators.RandomStrings; + import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.support.PlainActionFuture; @@ -62,6 +64,7 @@ private List randomProfiles() { for (int i = 0; i < numProfiles; i++) { profiles.add( new DriverProfile( + RandomStrings.randomAsciiLettersOfLength(random(), 5), randomNonNegativeLong(), randomNonNegativeLong(), randomNonNegativeLong(),