diff --git a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java index 7cc599945e7c0..d01e1c9fb7f56 100644 --- a/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java +++ b/x-pack/plugin/esql/qa/server/single-node/src/javaRestTest/java/org/elasticsearch/xpack/esql/qa/single_node/EsqlSpecIT.java @@ -12,13 +12,16 @@ import org.elasticsearch.test.TestClustersThreadFilter; import org.elasticsearch.test.cluster.ElasticsearchCluster; import org.elasticsearch.xpack.esql.CsvSpecReader.CsvTestCase; +import org.elasticsearch.xpack.esql.plugin.ComputeService; import org.elasticsearch.xpack.esql.qa.rest.EsqlSpecTestCase; import org.junit.ClassRule; @ThreadLeakFilters(filters = TestClustersThreadFilter.class) public class EsqlSpecIT extends EsqlSpecTestCase { @ClassRule - public static ElasticsearchCluster cluster = Clusters.testCluster(spec -> spec.plugin("inference-service-test")); + public static ElasticsearchCluster cluster = Clusters.testCluster( + spec -> spec.plugin("inference-service-test").setting("logger." + ComputeService.class.getName(), "DEBUG") // So we log a profile + ); @Override protected String getTestRestCluster() { diff --git a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java index 221aa17ee1979..4285fe112a7c6 100644 --- a/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java +++ b/x-pack/plugin/esql/qa/server/src/main/java/org/elasticsearch/xpack/esql/qa/rest/EsqlSpecTestCase.java @@ -23,6 +23,7 @@ import org.elasticsearch.geometry.utils.WellKnownText; import org.elasticsearch.logging.LogManager; import org.elasticsearch.logging.Logger; +import org.elasticsearch.test.MapMatcher; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.test.rest.TestFeatureService; import org.elasticsearch.xcontent.XContentType; @@ -437,16 +438,17 @@ public static void assertRequestBreakerEmpty() throws Exception { HttpEntity entity = adminClient().performRequest(new Request("GET", "/_nodes/stats")).getEntity(); Map stats = XContentHelper.convertToMap(XContentType.JSON.xContent(), entity.getContent(), false); Map nodes = (Map) stats.get("nodes"); - for (Object n : nodes.values()) { - Map node = (Map) n; - Map breakers = (Map) node.get("breakers"); - Map request = (Map) breakers.get("request"); - assertMap( - "circuit breakers not reset to 0", - request, - matchesMap().extraOk().entry("estimated_size_in_bytes", 0).entry("estimated_size", "0b") + + MapMatcher breakersEmpty = matchesMap().extraOk().entry("estimated_size_in_bytes", 0).entry("estimated_size", "0b"); + + MapMatcher nodesMatcher = matchesMap(); + for (Object name : nodes.keySet()) { + nodesMatcher = nodesMatcher.entry( + name, + matchesMap().extraOk().entry("breakers", matchesMap().extraOk().entry("request", breakersEmpty)) ); } + assertMap("circuit breakers not reset to 0", stats, matchesMap().extraOk().entry("nodes", nodesMatcher)); }); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java index 7053b02eaafd5..39e3503b5fdd9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/ComputeService.java @@ -605,23 +605,36 @@ public SourceProvider createSourceProvider() { throw new IllegalStateException("no drivers created"); } LOGGER.debug("using {} drivers", drivers.size()); - driverRunner.executeDrivers( - task, - drivers, - transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME), - ActionListener.releaseAfter(listener.map(ignored -> { - if (context.configuration().profile()) { - return DriverCompletionInfo.includingProfiles( + ActionListener driverListener = listener.map(ignored -> { + if (LOGGER.isDebugEnabled()) { + LOGGER.debug( + "finished {}", + DriverCompletionInfo.includingProfiles( drivers, context.description(), clusterService.getClusterName().value(), transportService.getLocalNode().getName(), localPlan.toString() - ); - } else { - return DriverCompletionInfo.excludingProfiles(drivers); - } - }), () -> Releasables.close(drivers)) + ) + ); + } + if (context.configuration().profile()) { + return DriverCompletionInfo.includingProfiles( + drivers, + context.description(), + clusterService.getClusterName().value(), + transportService.getLocalNode().getName(), + localPlan.toString() + ); + } else { + return DriverCompletionInfo.excludingProfiles(drivers); + } + }); + driverRunner.executeDrivers( + task, + drivers, + transportService.getThreadPool().executor(ESQL_WORKER_THREAD_POOL_NAME), + ActionListener.releaseAfter(driverListener, () -> Releasables.close(drivers)) ); } catch (Exception e) { listener.onFailure(e);