Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

package org.elasticsearch.xpack.downsample;

import org.elasticsearch.Build;
import org.elasticsearch.action.admin.cluster.node.capabilities.NodesCapabilitiesRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
Expand All @@ -28,7 +27,6 @@
import org.elasticsearch.xpack.esql.action.EsqlQueryAction;
import org.elasticsearch.xpack.esql.action.EsqlQueryRequest;
import org.elasticsearch.xpack.esql.action.EsqlQueryResponse;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.io.IOException;
import java.time.Instant;
Expand Down Expand Up @@ -296,15 +294,11 @@ public void testAggMetricInEsqlTSAfterDownsampling() throws Exception {
}

// tests on counter types
// TODO: remove hard-coded pragmas
assumeTrue("query pragmas require snapshot build", Build.current().isSnapshot());
var ratePragmas = new QueryPragmas(Settings.builder().put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), 1).build());

for (String innerCommand : List.of("rate")) {
String command = outerCommand + " (" + innerCommand + "(request))";
String esqlQuery = "TS " + dataStreamName + " | STATS " + command + " by cluster, bucket(@timestamp, 1 hour)";
try (
var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery).pragmas(ratePragmas))
var resp = client().execute(EsqlQueryAction.INSTANCE, new EsqlQueryRequest().query(esqlQuery))
.actionGet(30, TimeUnit.SECONDS)
) {
var columns = resp.columns();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,20 @@

package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Randomness;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.iterable.Iterables;
import org.elasticsearch.compute.lucene.LuceneSourceOperator;
import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;
import org.elasticsearch.compute.operator.DriverProfile;
import org.elasticsearch.compute.operator.OperatorStatus;
import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.core.type.DataType;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;
import org.hamcrest.Matchers;
import org.junit.Before;

import java.util.ArrayList;
Expand Down Expand Up @@ -481,33 +486,113 @@ public void testFieldDoesNotExist() {
}
}

public void testRateProfile() {
EsqlQueryRequest request = new EsqlQueryRequest();
request.profile(true);
request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
try (var resp = run(request)) {
EsqlQueryResponse.Profile profile = resp.profile();
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
int totalTimeSeries = 0;
for (DriverProfile p : dataProfiles) {
if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {
totalTimeSeries++;
assertThat(p.operators(), hasSize(2));
assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
} else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) {
assertThat(p.operators(), hasSize(3));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator"));
assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator"));
} else {
assertThat(p.operators(), hasSize(4));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator"));
assertThat(p.operators().get(2).operator(), containsString("EvalOperator"));
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
public void testProfile() {
String dataNode = Iterables.get(clusterService().state().getNodes().getDataNodes().keySet(), 0);
Settings indexSettings = Settings.builder()
.put("mode", "time_series")
.putList("routing_path", List.of("host", "cluster"))
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.routing.allocation.require._id", dataNode)
.build();
String index = "my-hosts";
client().admin()
.indices()
.prepareCreate(index)
.setSettings(indexSettings)
.setMapping(
"@timestamp",
"type=date",
"host",
"type=keyword,time_series_dimension=true",
"cluster",
"type=keyword,time_series_dimension=true",
"memory",
"type=long,time_series_metric=gauge",
"request_count",
"type=integer,time_series_metric=counter"
)
.get();
Randomness.shuffle(docs);
for (Doc doc : docs) {
client().prepareIndex(index)
.setSource(
"@timestamp",
doc.timestamp,
"host",
doc.host,
"cluster",
doc.cluster,
"memory",
doc.memory.getBytes(),
"cpu",
doc.cpu,
"request_count",
doc.requestCount
)
.get();
}
client().admin().indices().prepareRefresh(index).get();
QueryPragmas pragmas = new QueryPragmas(
Settings.builder()
.put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), between(3, 10))
.put(QueryPragmas.TASK_CONCURRENCY.getKey(), 1)
.build()
);
// The rate aggregation is executed with one shard at a time
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.profile(true);
request.pragmas(pragmas);
request.acceptedPragmaRisks(true);
request.query("TS my-hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
try (var resp = run(request)) {
EsqlQueryResponse.Profile profile = resp.profile();
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
for (DriverProfile p : dataProfiles) {
if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {
assertThat(p.operators(), hasSize(2));
TimeSeriesSourceOperator.Status status = (TimeSeriesSourceOperator.Status) p.operators().get(0).status();
assertThat(status.processedShards(), hasSize(1));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I saw a failure in my PR:

REPRODUCE WITH: ./gradlew ":x-pack:plugin:esql:internalClusterTest" --tests "org.elasticsearch.xpack.esql.action.TimeSeriesIT.testProfile" -Dtests.seed=ADBEEDF1C962A1C8 -Dtests.locale=nso -Dtests.timezone=Europe/Samara -Druntime.java=24
--
  | 2> java.lang.AssertionError:
  | Expected: a collection with size <1>
  | but: collection size was <0>
  | at __randomizedtesting.SeedInfo.seed([ADBEEDF1C962A1C8:FCE458746D2F19E1]:0)
  | at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
  | at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6)
  | at org.elasticsearch.test.ESTestCase.assertThat(ESTestCase.java:2706)
  | at org.elasticsearch.xpack.esql.action.TimeSeriesIT.testProfile(TimeSeriesIT.java:556)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks Kostas! I will look into it.

assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));
} else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) {
assertThat(p.operators(), hasSize(3));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator"));
assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator"));
} else {
assertThat(p.operators(), hasSize(4));
assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));
assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator"));
assertThat(p.operators().get(2).operator(), containsString("EvalOperator"));
assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));
}
}
assertThat(dataProfiles, hasSize(9));
}
}
// non-rate aggregation is executed with multiple shards at a time
{
EsqlQueryRequest request = new EsqlQueryRequest();
request.profile(true);
request.pragmas(pragmas);
request.acceptedPragmaRisks(true);
request.query("TS my-hosts | STATS avg(avg_over_time(cpu)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");
try (var resp = run(request)) {
EsqlQueryResponse.Profile profile = resp.profile();
List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();
assertThat(dataProfiles, hasSize(1));
List<OperatorStatus> ops = dataProfiles.get(0).operators();
assertThat(ops, hasSize(5));
assertThat(ops.get(0).operator(), containsString("LuceneSourceOperator"));
assertThat(ops.get(0).status(), Matchers.instanceOf(LuceneSourceOperator.Status.class));
LuceneSourceOperator.Status status = (LuceneSourceOperator.Status) ops.get(0).status();
assertThat(status.processedShards(), hasSize(3));
assertThat(ops.get(1).operator(), containsString("EvalOperator"));
assertThat(ops.get(2).operator(), containsString("ValuesSourceReaderOperator"));
assertThat(ops.get(3).operator(), containsString("TimeSeriesAggregationOperator"));
assertThat(ops.get(4).operator(), containsString("ExchangeSinkOperator"));
}
assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 3));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,15 @@ public static String[] planOriginalIndices(PhysicalPlan plan) {
return indices.toArray(String[]::new);
}

public static boolean requiresSortedTimeSeriesSource(PhysicalPlan plan) {
return plan.anyMatch(e -> {
if (e instanceof FragmentExec f) {
return f.fragment().anyMatch(l -> l instanceof EsRelation r && r.indexMode() == IndexMode.TIME_SERIES);
}
return false;
});
}

private static void forEachRelation(PhysicalPlan plan, Consumer<EsRelation> action) {
plan.forEachDown(FragmentExec.class, f -> f.fragment().forEachDown(EsRelation.class, r -> {
if (r.indexMode() != IndexMode.LOOKUP) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private class DataNodeRequestExecutor {
private final ComputeListener computeListener;
private final int maxConcurrentShards;
private final ExchangeSink blockingSink; // block until we have completed on all shards or the coordinator has enough data
private final boolean singleShardPipeline;
private final boolean failFastOnShardFailure;
private final Map<ShardId, Exception> shardLevelFailures;

Expand All @@ -238,6 +239,7 @@ private class DataNodeRequestExecutor {
int maxConcurrentShards,
boolean failFastOnShardFailure,
Map<ShardId, Exception> shardLevelFailures,
boolean singleShardPipeline,
ComputeListener computeListener
) {
this.flags = flags;
Expand All @@ -248,6 +250,7 @@ private class DataNodeRequestExecutor {
this.maxConcurrentShards = maxConcurrentShards;
this.failFastOnShardFailure = failFastOnShardFailure;
this.shardLevelFailures = shardLevelFailures;
this.singleShardPipeline = singleShardPipeline;
this.blockingSink = exchangeSink.createExchangeSink(() -> {});
}

Expand Down Expand Up @@ -297,18 +300,37 @@ public void onFailure(Exception e) {
batchListener.onResponse(DriverCompletionInfo.EMPTY);
return;
}
var computeContext = new ComputeContext(
sessionId,
"data",
clusterAlias,
flags,
searchContexts,
configuration,
configuration.newFoldContext(),
null,
() -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet)
);
computeService.runCompute(parentTask, computeContext, request.plan(), batchListener);
if (singleShardPipeline) {
try (ComputeListener sub = new ComputeListener(threadPool, () -> {}, batchListener)) {
for (SearchContext searchContext : searchContexts) {
var computeContext = new ComputeContext(
sessionId,
"data",
clusterAlias,
flags,
List.of(searchContext),
configuration,
configuration.newFoldContext(),
null,
() -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet)
);
computeService.runCompute(parentTask, computeContext, request.plan(), sub.acquireCompute());
}
}
} else {
var computeContext = new ComputeContext(
sessionId,
"data",
clusterAlias,
flags,
searchContexts,
configuration,
configuration.newFoldContext(),
null,
() -> exchangeSink.createExchangeSink(pagesProduced::incrementAndGet)
);
computeService.runCompute(parentTask, computeContext, request.plan(), batchListener);
}
}, batchListener::onFailure));
}

Expand Down Expand Up @@ -428,14 +450,21 @@ private void runComputeOnDataNode(
exchangeService.finishSinkHandler(request.sessionId(), new TaskCancelledException(task.getReasonCancelled()));
});
EsqlFlags flags = computeService.createFlags();
int maxConcurrentShards = request.pragmas().maxConcurrentShardsPerNode();
final boolean sortedTimeSeriesSource = PlannerUtils.requiresSortedTimeSeriesSource(request.plan());
if (sortedTimeSeriesSource) {
// each time-series pipeline uses 3 drivers
maxConcurrentShards = Math.clamp(Math.ceilDiv(request.pragmas().taskConcurrency(), 3), 1, maxConcurrentShards);
}
DataNodeRequestExecutor dataNodeRequestExecutor = new DataNodeRequestExecutor(
flags,
request,
task,
internalSink,
request.configuration().pragmas().maxConcurrentShardsPerNode(),
maxConcurrentShards,
failFastOnShardFailure,
shardLevelFailures,
sortedTimeSeriesSource,
computeListener
);
dataNodeRequestExecutor.start();
Expand Down