| 
7 | 7 | 
 
  | 
8 | 8 | package org.elasticsearch.xpack.esql.action;  | 
9 | 9 | 
 
  | 
 | 10 | +import org.elasticsearch.cluster.metadata.IndexMetadata;  | 
10 | 11 | import org.elasticsearch.common.Randomness;  | 
11 | 12 | import org.elasticsearch.common.settings.Settings;  | 
12 | 13 | import org.elasticsearch.common.unit.ByteSizeValue;  | 
 | 14 | +import org.elasticsearch.common.util.iterable.Iterables;  | 
 | 15 | +import org.elasticsearch.compute.lucene.LuceneSourceOperator;  | 
13 | 16 | import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator;  | 
14 | 17 | import org.elasticsearch.compute.operator.DriverProfile;  | 
15 | 18 | import org.elasticsearch.compute.operator.OperatorStatus;  | 
16 | 19 | import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator;  | 
17 | 20 | import org.elasticsearch.xpack.esql.EsqlTestUtils;  | 
18 | 21 | import org.elasticsearch.xpack.esql.core.type.DataType;  | 
 | 22 | +import org.elasticsearch.xpack.esql.plugin.QueryPragmas;  | 
 | 23 | +import org.hamcrest.Matchers;  | 
19 | 24 | import org.junit.Before;  | 
20 | 25 | 
 
  | 
21 | 26 | import java.util.ArrayList;  | 
@@ -481,33 +486,113 @@ public void testFieldDoesNotExist() {  | 
481 | 486 |         }  | 
482 | 487 |     }  | 
483 | 488 | 
 
  | 
484 |  | -    public void testRateProfile() {  | 
485 |  | -        EsqlQueryRequest request = new EsqlQueryRequest();  | 
486 |  | -        request.profile(true);  | 
487 |  | -        request.query("TS hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");  | 
488 |  | -        try (var resp = run(request)) {  | 
489 |  | -            EsqlQueryResponse.Profile profile = resp.profile();  | 
490 |  | -            List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();  | 
491 |  | -            int totalTimeSeries = 0;  | 
492 |  | -            for (DriverProfile p : dataProfiles) {  | 
493 |  | -                if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {  | 
494 |  | -                    totalTimeSeries++;  | 
495 |  | -                    assertThat(p.operators(), hasSize(2));  | 
496 |  | -                    assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));  | 
497 |  | -                } else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) {  | 
498 |  | -                    assertThat(p.operators(), hasSize(3));  | 
499 |  | -                    assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));  | 
500 |  | -                    assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator"));  | 
501 |  | -                    assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator"));  | 
502 |  | -                } else {  | 
503 |  | -                    assertThat(p.operators(), hasSize(4));  | 
504 |  | -                    assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));  | 
505 |  | -                    assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator"));  | 
506 |  | -                    assertThat(p.operators().get(2).operator(), containsString("EvalOperator"));  | 
507 |  | -                    assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));  | 
 | 489 | +    public void testProfile() {  | 
 | 490 | +        String dataNode = Iterables.get(clusterService().state().getNodes().getDataNodes().keySet(), 0);  | 
 | 491 | +        Settings indexSettings = Settings.builder()  | 
 | 492 | +            .put("mode", "time_series")  | 
 | 493 | +            .putList("routing_path", List.of("host", "cluster"))  | 
 | 494 | +            .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 3)  | 
 | 495 | +            .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)  | 
 | 496 | +            .put("index.routing.allocation.require._id", dataNode)  | 
 | 497 | +            .build();  | 
 | 498 | +        String index = "my-hosts";  | 
 | 499 | +        client().admin()  | 
 | 500 | +            .indices()  | 
 | 501 | +            .prepareCreate(index)  | 
 | 502 | +            .setSettings(indexSettings)  | 
 | 503 | +            .setMapping(  | 
 | 504 | +                "@timestamp",  | 
 | 505 | +                "type=date",  | 
 | 506 | +                "host",  | 
 | 507 | +                "type=keyword,time_series_dimension=true",  | 
 | 508 | +                "cluster",  | 
 | 509 | +                "type=keyword,time_series_dimension=true",  | 
 | 510 | +                "memory",  | 
 | 511 | +                "type=long,time_series_metric=gauge",  | 
 | 512 | +                "request_count",  | 
 | 513 | +                "type=integer,time_series_metric=counter"  | 
 | 514 | +            )  | 
 | 515 | +            .get();  | 
 | 516 | +        Randomness.shuffle(docs);  | 
 | 517 | +        for (Doc doc : docs) {  | 
 | 518 | +            client().prepareIndex(index)  | 
 | 519 | +                .setSource(  | 
 | 520 | +                    "@timestamp",  | 
 | 521 | +                    doc.timestamp,  | 
 | 522 | +                    "host",  | 
 | 523 | +                    doc.host,  | 
 | 524 | +                    "cluster",  | 
 | 525 | +                    doc.cluster,  | 
 | 526 | +                    "memory",  | 
 | 527 | +                    doc.memory.getBytes(),  | 
 | 528 | +                    "cpu",  | 
 | 529 | +                    doc.cpu,  | 
 | 530 | +                    "request_count",  | 
 | 531 | +                    doc.requestCount  | 
 | 532 | +                )  | 
 | 533 | +                .get();  | 
 | 534 | +        }  | 
 | 535 | +        client().admin().indices().prepareRefresh(index).get();  | 
 | 536 | +        QueryPragmas pragmas = new QueryPragmas(  | 
 | 537 | +            Settings.builder()  | 
 | 538 | +                .put(QueryPragmas.MAX_CONCURRENT_SHARDS_PER_NODE.getKey(), between(3, 10))  | 
 | 539 | +                .put(QueryPragmas.TASK_CONCURRENCY.getKey(), 1)  | 
 | 540 | +                .build()  | 
 | 541 | +        );  | 
 | 542 | +        // The rate aggregation is executed with one shard at a time  | 
 | 543 | +        {  | 
 | 544 | +            EsqlQueryRequest request = new EsqlQueryRequest();  | 
 | 545 | +            request.profile(true);  | 
 | 546 | +            request.pragmas(pragmas);  | 
 | 547 | +            request.acceptedPragmaRisks(true);  | 
 | 548 | +            request.query("TS my-hosts | STATS sum(rate(request_count)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");  | 
 | 549 | +            try (var resp = run(request)) {  | 
 | 550 | +                EsqlQueryResponse.Profile profile = resp.profile();  | 
 | 551 | +                List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();  | 
 | 552 | +                for (DriverProfile p : dataProfiles) {  | 
 | 553 | +                    if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesSourceOperator.Status)) {  | 
 | 554 | +                        assertThat(p.operators(), hasSize(2));  | 
 | 555 | +                        TimeSeriesSourceOperator.Status status = (TimeSeriesSourceOperator.Status) p.operators().get(0).status();  | 
 | 556 | +                        assertThat(status.processedShards(), hasSize(1));  | 
 | 557 | +                        assertThat(p.operators().get(1).operator(), equalTo("ExchangeSinkOperator"));  | 
 | 558 | +                    } else if (p.operators().stream().anyMatch(s -> s.status() instanceof TimeSeriesAggregationOperator.Status)) {  | 
 | 559 | +                        assertThat(p.operators(), hasSize(3));  | 
 | 560 | +                        assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));  | 
 | 561 | +                        assertThat(p.operators().get(1).operator(), containsString("TimeSeriesAggregationOperator"));  | 
 | 562 | +                        assertThat(p.operators().get(2).operator(), equalTo("ExchangeSinkOperator"));  | 
 | 563 | +                    } else {  | 
 | 564 | +                        assertThat(p.operators(), hasSize(4));  | 
 | 565 | +                        assertThat(p.operators().get(0).operator(), equalTo("ExchangeSourceOperator"));  | 
 | 566 | +                        assertThat(p.operators().get(1).operator(), containsString("TimeSeriesExtractFieldOperator"));  | 
 | 567 | +                        assertThat(p.operators().get(2).operator(), containsString("EvalOperator"));  | 
 | 568 | +                        assertThat(p.operators().get(3).operator(), equalTo("ExchangeSinkOperator"));  | 
 | 569 | +                    }  | 
508 | 570 |                 }  | 
 | 571 | +                assertThat(dataProfiles, hasSize(9));  | 
 | 572 | +            }  | 
 | 573 | +        }  | 
 | 574 | +        // non-rate aggregation is executed with multiple shards at a time  | 
 | 575 | +        {  | 
 | 576 | +            EsqlQueryRequest request = new EsqlQueryRequest();  | 
 | 577 | +            request.profile(true);  | 
 | 578 | +            request.pragmas(pragmas);  | 
 | 579 | +            request.acceptedPragmaRisks(true);  | 
 | 580 | +            request.query("TS my-hosts | STATS avg(avg_over_time(cpu)) BY cluster, bucket(@timestamp, 1minute) | SORT cluster");  | 
 | 581 | +            try (var resp = run(request)) {  | 
 | 582 | +                EsqlQueryResponse.Profile profile = resp.profile();  | 
 | 583 | +                List<DriverProfile> dataProfiles = profile.drivers().stream().filter(d -> d.description().equals("data")).toList();  | 
 | 584 | +                assertThat(dataProfiles, hasSize(1));  | 
 | 585 | +                List<OperatorStatus> ops = dataProfiles.get(0).operators();  | 
 | 586 | +                assertThat(ops, hasSize(5));  | 
 | 587 | +                assertThat(ops.get(0).operator(), containsString("LuceneSourceOperator"));  | 
 | 588 | +                assertThat(ops.get(0).status(), Matchers.instanceOf(LuceneSourceOperator.Status.class));  | 
 | 589 | +                LuceneSourceOperator.Status status = (LuceneSourceOperator.Status) ops.get(0).status();  | 
 | 590 | +                assertThat(status.processedShards(), hasSize(3));  | 
 | 591 | +                assertThat(ops.get(1).operator(), containsString("EvalOperator"));  | 
 | 592 | +                assertThat(ops.get(2).operator(), containsString("ValuesSourceReaderOperator"));  | 
 | 593 | +                assertThat(ops.get(3).operator(), containsString("TimeSeriesAggregationOperator"));  | 
 | 594 | +                assertThat(ops.get(4).operator(), containsString("ExchangeSinkOperator"));  | 
509 | 595 |             }  | 
510 |  | -            assertThat(totalTimeSeries, equalTo(dataProfiles.size() / 3));  | 
511 | 596 |         }  | 
512 | 597 |     }  | 
513 | 598 | 
 
  | 
 | 
0 commit comments