From 768671942ce06c95f6dddbaf6d7c18a1c4a7676d Mon Sep 17 00:00:00 2001 From: Ioana Tagirta Date: Thu, 22 May 2025 16:54:02 +0200 Subject: [PATCH] Add profile description labels for FORK sub plans --- .../xpack/esql/action/ForkIT.java | 30 +++++++++++++ .../xpack/esql/plugin/ComputeService.java | 44 +++++++++++++------ 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java index 152119ea5a7ce..f9f504e868a71 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/ForkIT.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.xpack.esql.VerificationException; import org.elasticsearch.xpack.esql.parser.ParsingException; import org.junit.Before; @@ -685,6 +686,35 @@ public void testOneSubQuery() { assertTrue(e.getMessage().contains("Fork requires at least two branches")); } + public void testProfile() { + var query = """ + FROM test + | FORK + ( WHERE content:"fox" | SORT id ) + ( WHERE content:"dog" | SORT id ) + | SORT _fork, id + | KEEP _fork, id, content + """; + + EsqlQueryRequest request = EsqlQueryRequest.syncEsqlQueryRequest(); + + request.pragmas(randomPragmas()); + request.query(query); + request.profile(true); + + try (var resp = run(request)) { + EsqlQueryResponse.Profile profile = resp.profile(); + assertNotNull(profile); + + List descriptions = profile.drivers().stream().map(DriverProfile::description).sorted().toList(); + + assertEquals( + List.of("data", "data", "main.final", "node_reduce", "node_reduce", "subplan-0.final", "subplan-1.final"), + descriptions + ); + } + } + private void createAndPopulateIndex() { var indexName = "test"; var client = client().admin().indices(); diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 396410432c16c..a16e5f8bd78e0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -195,7 +195,7 @@ public void execute( // we have no sub plans, so we can just execute the given plan if (subplans == null || subplans.size() == 0) { - executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, listener, null); + executePlan(sessionId, rootTask, physicalPlan, configuration, foldContext, execInfo, null, listener, null); return; } @@ -220,7 +220,7 @@ public void execute( var finalListener = ActionListener.runBefore(listener, () -> exchangeService.removeExchangeSourceHandler(sessionId)); var computeContext = new ComputeContext( mainSessionId, - "single", + "main.final", LOCAL_CLUSTER, List.of(), configuration, @@ -244,22 +244,33 @@ public void execute( ) { runCompute(rootTask, computeContext, finalMainPlan, localListener.acquireCompute()); - for (PhysicalPlan subplan : subplans) { + for (int i = 0; i < subplans.size(); i++) { + var subplan = subplans.get(i); var childSessionId = newChildSession(sessionId); ExchangeSinkHandler exchangeSink = exchangeService.createSinkHandler(childSessionId, queryPragmas.exchangeBufferSize()); // funnel sub plan pages into the main plan exchange source mainExchangeSource.addRemoteSink(exchangeSink::fetchPageAsync, true, () -> {}, 1, ActionListener.noop()); var subPlanListener = localListener.acquireCompute(); - executePlan(childSessionId, rootTask, subplan, configuration, foldContext, execInfo, ActionListener.wrap(result -> { - exchangeSink.addCompletionListener( - ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) - ); - subPlanListener.onResponse(result.completionInfo()); - }, e -> { - exchangeService.finishSinkHandler(childSessionId, e); - subPlanListener.onFailure(e); - }), () -> exchangeSink.createExchangeSink(() -> {})); + executePlan( + childSessionId, + rootTask, + subplan, + configuration, + foldContext, + execInfo, + "subplan-" + i, + ActionListener.wrap(result -> { + exchangeSink.addCompletionListener( + ActionListener.running(() -> { exchangeService.finishSinkHandler(childSessionId, null); }) + ); + subPlanListener.onResponse(result.completionInfo()); + }, e -> { + exchangeService.finishSinkHandler(childSessionId, e); + subPlanListener.onFailure(e); + }), + () -> exchangeSink.createExchangeSink(() -> {}) + ); } } } @@ -272,6 +283,7 @@ public void executePlan( Configuration configuration, FoldContext foldContext, EsqlExecutionInfo execInfo, + String profileQualifier, ActionListener listener, Supplier exchangeSinkSupplier ) { @@ -309,7 +321,7 @@ public void executePlan( } var computeContext = new ComputeContext( newChildSession(sessionId), - "single", + profileDescription(profileQualifier, "single"), LOCAL_CLUSTER, List.of(), configuration, @@ -395,7 +407,7 @@ public void executePlan( rootTask, new ComputeContext( sessionId, - "final", + profileDescription(profileQualifier, "final"), LOCAL_CLUSTER, List.of(), configuration, @@ -611,6 +623,10 @@ String newChildSession(String session) { return session + "/" + childSessionIdGenerator.incrementAndGet(); } + String profileDescription(String qualifier, String label) { + return qualifier == null ? label : qualifier + "." + label; + } + Runnable cancelQueryOnFailure(CancellableTask task) { return new RunOnce(() -> { LOGGER.debug("cancelling ESQL task {} on failure", task);