diff --git a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java index 396011f2e834c..589a82c1860bf 100644 --- a/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java +++ b/x-pack/plugin/downsample/src/internalClusterTest/java/org/elasticsearch/xpack/downsample/DownsampleIT.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.downsample; -import org.elasticsearch.Build; import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest; import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest; import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction; @@ -28,7 +27,6 @@ import org.elasticsearch.xpack.esql.action.EsqlQueryAction; import org.elasticsearch.xpack.esql.action.EsqlQueryRequest; import org.elasticsearch.xpack.esql.action.EsqlQueryResponse; -import org.elasticsearch.xpack.esql.plugin.QueryPragmas; import java.io.IOException; import java.time.Instant; @@ -296,15 +294,11 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception { } // tests on counter types - // TODO: remove hard-coded pragmas - assumeTrue("query pragmas require snapshot build", Build.current().isSnapshot()); - var ratePragmas = new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build()); - for (String innerCommand : List.of("rate")) { String command = outerCommand + " (" + innerCommand + "(request))"; String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)"; try ( - var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery).pragmas(ratePragmas)) + var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery)) .actionGet(30, TimeUnit.SECONDS) ) { var columns = resp.columns(); diff --git a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java index 0945fa0f1641d..182d75e7e437c 100644 --- a/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java +++ b/x-pack/plugin/esql/src/internalClusterTest/java/org/elasticsearch/xpack/esql/action/TimeSeriesIT.java @@ -7,15 +7,20 @@ package org.elasticsearch.xpack.esql.action; +import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Randomness; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.ByteSizeValue; +import org.elasticsearch.common.util.iterable.Iterables; +import org.elasticsearch.compute.lucene.LuceneSourceOperator; import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; import org.elasticsearch.compute.operator.DriverProfile; import org.elasticsearch.compute.operator.OperatorStatus; import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; import org.elasticsearch.xpack.esql.EsqlTestUtils; import org.elasticsearch.xpack.esql.core.type.DataType; +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; +import org.hamcrest.Matchers; import org.junit.Before; import java.util.ArrayList; @@ -481,33 +486,113 @@ public void testFieldDoesNotExist() { } } - public void testRateProfile() { - EsqlQueryRequest request = new EsqlQueryRequest(); - request.profile(true); - request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster"); - try (var resp = run(request)) { - EsqlQueryResponse.Profile profile = resp.profile(); - List dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList(); - int totalTimeSeries = 0; - for (DriverProfile p : dataProfiles) { - if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) { - totalTimeSeries++; - assertThat(p.operators(), hasSize(2)); - assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator")); - } else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) { - assertThat(p.operators(), hasSize(3)); - assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator")); - assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator")); - assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator")); - } else { - assertThat(p.operators(), hasSize(4)); - assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator")); - assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator")); - assertThat(p.operators().get(2).operator(), containsString("EvalOperator")); - assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator")); + public void testProfile() { + String dataNode = Iterables.get(clusterService().state().getNodes().getDataNodes().keySet(), 0); + Settings indexSettings = Settings.builder() + .put("mode", "time_series") + .putList("routing_path", List.of("host", "cluster")) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put("index.routing.allocation.require._id", dataNode) + .build(); + String index = "my-hosts"; + client().admin() + .indices() + .prepareCreate(index) + .setSettings(indexSettings) + .setMapping( + "@timestamp", + "type=date", + "host", + "type=keyword,time_series_dimension=true", + "cluster", + "type=keyword,time_series_dimension=true", + "memory", + "type=long,time_series_metric=gauge", + "request_count", + "type=integer,time_series_metric=counter" + ) + .get(); + Randomness.shuffle(docs); + for (Doc doc : docs) { + client().prepareIndex(index) + .setSource( + "@timestamp", + doc.timestamp, + "host", + doc.host, + "cluster", + doc.cluster, + "memory", + doc.memory.getBytes(), + "cpu", + doc.cpu, + "request_count", + doc.requestCount + ) + .get(); + } + client().admin().indices().prepareRefresh(index).get(); + QueryPragmas pragmas = new QueryPragmas( + Settings.builder() + .put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), between(3, 10)) + .put(QueryPragmas.TASK_CONCURRENCY.getKey(), 1) + .build() + ); + // The rate aggregation is executed with one shard at a time + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.profile(true); + request.pragmas(pragmas); + request.acceptedPragmaRisks(true); + request.query("TS my-hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster"); + try (var resp = run(request)) { + EsqlQueryResponse.Profile profile = resp.profile(); + List dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList(); + for (DriverProfile p : dataProfiles) { + if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) { + assertThat(p.operators(), hasSize(2)); + TimeSeriesSourceOperator.Status status = (TimeSeriesSourceOperator.Status) p.operators().get(0).status(); + assertThat(status.processedShards(), hasSize(1)); + assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator")); + } else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) { + assertThat(p.operators(), hasSize(3)); + assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator")); + assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator")); + assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator")); + } else { + assertThat(p.operators(), hasSize(4)); + assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator")); + assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator")); + assertThat(p.operators().get(2).operator(), containsString("EvalOperator")); + assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator")); + } } + assertThat(dataProfiles, hasSize(9)); + } + } + // non-rate aggregation is executed with multiple shards at a time + { + EsqlQueryRequest request = new EsqlQueryRequest(); + request.profile(true); + request.pragmas(pragmas); + request.acceptedPragmaRisks(true); + request.query("TS my-hosts | STATS avg(avg_over_time(cpu)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster"); + try (var resp = run(request)) { + EsqlQueryResponse.Profile profile = resp.profile(); + List dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList(); + assertThat(dataProfiles, hasSize(1)); + List ops = dataProfiles.get(0).operators(); + assertThat(ops, hasSize(5)); + assertThat(ops.get(0).operator(), containsString("LuceneSourceOperator")); + assertThat(ops.get(0).status(), Matchers.instanceOf(LuceneSourceOperator.Status.class)); + LuceneSourceOperator.Status status = (LuceneSourceOperator.Status) ops.get(0).status(); + assertThat(status.processedShards(), hasSize(3)); + assertThat(ops.get(1).operator(), containsString("EvalOperator")); + assertThat(ops.get(2).operator(), containsString("ValuesSourceReaderOperator")); + assertThat(ops.get(3).operator(), containsString("TimeSeriesAggregationOperator")); + assertThat(ops.get(4).operator(), containsString("ExchangeSinkOperator")); } - assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 3)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java index 825219f9817e5..f956bdf34089f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/planner/PlannerUtils.java @@ -159,6 +159,15 @@ public static String[] planOriginalIndices(PhysicalPlan plan) { return indices.toArray(String[]::new); } + public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) { + return plan.anyMatch(e -> { + if (e instanceof FragmentExec f) { + return f.fragment().anyMatch(l -> l instanceof EsRelation r && r.indexMode() == IndexMode.TIME_SERIES); + } + return false; + }); + } + private static void forEachRelation(PhysicalPlan plan, Consumer action) { plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> { if (r.indexMode() != IndexMode.LOOKUP) { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java index aea1c7d6d2d67..490edec311b36 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plugin/DataNodeComputeHandler.java @@ -227,6 +227,7 @@ private class DataNodeRequestExecutor { private final ComputeListener computeListener; private final int maxConcurrentShards; private final ExchangeSink blockingSink; // block until we have completed on all shards or the coordinator has enough data + private final boolean singleShardPipeline; private final boolean failFastOnShardFailure; private final Map shardLevelFailures; @@ -238,6 +239,7 @@ private class DataNodeRequestExecutor { int maxConcurrentShards, boolean failFastOnShardFailure, Map shardLevelFailures, + boolean singleShardPipeline, ComputeListener computeListener ) { this.flags = flags; @@ -248,6 +250,7 @@ private class DataNodeRequestExecutor { this.maxConcurrentShards = maxConcurrentShards; this.failFastOnShardFailure = failFastOnShardFailure; this.shardLevelFailures = shardLevelFailures; + this.singleShardPipeline = singleShardPipeline; this.blockingSink = exchangeSink.createExchangeSink(() -> {}); } @@ -297,18 +300,37 @@ public void onFailure(Exception e) { batchListener.onResponse(DriverCompletionInfo.EMPTY); return; } - var computeContext = new ComputeContext( - sessionId, - "data", - clusterAlias, - flags, - searchContexts, - configuration, - configuration.newFoldContext(), - null, - () -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet) - ); - computeService.runCompute(parentTask, computeContext, request.plan(), batchListener); + if (singleShardPipeline) { + try (ComputeListener sub = new ComputeListener(threadPool, () -> {}, batchListener)) { + for (SearchContext searchContext : searchContexts) { + var computeContext = new ComputeContext( + sessionId, + "data", + clusterAlias, + flags, + List.of(searchContext), + configuration, + configuration.newFoldContext(), + null, + () -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet) + ); + computeService.runCompute(parentTask, computeContext, request.plan(), sub.acquireCompute()); + } + } + } else { + var computeContext = new ComputeContext( + sessionId, + "data", + clusterAlias, + flags, + searchContexts, + configuration, + configuration.newFoldContext(), + null, + () -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet) + ); + computeService.runCompute(parentTask, computeContext, request.plan(), batchListener); + } }, batchListener::onFailure)); } @@ -428,14 +450,21 @@ private void runComputeOnDataNode( exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(task.getReasonCancelled())); }); EsqlFlags flags = computeService.createFlags(); + int maxConcurrentShards = request.pragmas().maxConcurrentShardsPerNode(); + final boolean sortedTimeSeriesSource = PlannerUtils.requiresSortedTimeSeriesSource(request.plan()); + if (sortedTimeSeriesSource) { + // each time-series pipeline uses 3 drivers + maxConcurrentShards = Math.clamp(Math.ceilDiv(request.pragmas().taskConcurrency(), 3), 1, maxConcurrentShards); + } DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor( flags, request, task, internalSink, - request.configuration().pragmas().maxConcurrentShardsPerNode(), + maxConcurrentShards, failFastOnShardFailure, shardLevelFailures, + sortedTimeSeriesSource, computeListener ); dataNodeRequestExecutor.start();