99
1010import org .elasticsearch .cluster .metadata .IndexMetadata ;
1111import org .elasticsearch .common .Randomness ;
12+ import org .elasticsearch .common .Rounding ;
1213import org .elasticsearch .common .settings .Settings ;
1314import org .elasticsearch .common .unit .ByteSizeValue ;
1415import org .elasticsearch .common .util .iterable .Iterables ;
16+ import org .elasticsearch .common .util .set .Sets ;
1517import org .elasticsearch .compute .lucene .LuceneSliceQueue ;
1618import org .elasticsearch .compute .lucene .LuceneSourceOperator ;
1719import org .elasticsearch .compute .operator .DriverProfile ;
1820import org .elasticsearch .compute .operator .OperatorStatus ;
1921import org .elasticsearch .compute .operator .TimeSeriesAggregationOperator ;
22+ import org .elasticsearch .core .TimeValue ;
23+ import org .elasticsearch .core .Tuple ;
2024import org .elasticsearch .xpack .esql .EsqlTestUtils ;
2125import org .elasticsearch .xpack .esql .core .type .DataType ;
2226import org .elasticsearch .xpack .esql .plugin .QueryPragmas ;
2327import org .hamcrest .Matchers ;
2428import org .junit .Before ;
2529
2630import java .util .ArrayList ;
31+ import java .util .Collection ;
2732import java .util .Comparator ;
2833import java .util .HashMap ;
2934import java .util .List ;
3035import java .util .Map ;
3136import java .util .Objects ;
37+ import java .util .Set ;
3238
3339import static org .elasticsearch .index .mapper .DateFieldMapper .DEFAULT_DATE_TIME_FORMATTER ;
3440import static org .hamcrest .Matchers .closeTo ;
@@ -49,6 +55,8 @@ public void testEmpty() {
4955 .setMapping (
5056 "@timestamp" ,
5157 "type=date" ,
58+ "project" ,
59+ "type=keyword" ,
5260 "host" ,
5361 "type=keyword,time_series_dimension=true" ,
5462 "cpu" ,
@@ -58,7 +66,15 @@ public void testEmpty() {
5866 run ("TS empty_index | LIMIT 1" ).close ();
5967 }
6068
61- record Doc (String host , String cluster , long timestamp , int requestCount , double cpu , ByteSizeValue memory ) {}
69+ record Doc (
70+ Collection <String > project ,
71+ String host ,
72+ String cluster ,
73+ long timestamp ,
74+ int requestCount ,
75+ double cpu ,
76+ ByteSizeValue memory
77+ ) {}
6278
6379 final List <Doc > docs = new ArrayList <>();
6480
@@ -94,6 +110,8 @@ public void populateIndex() {
94110 .setMapping (
95111 "@timestamp" ,
96112 "type=date" ,
113+ "project" ,
114+ "type=keyword" ,
97115 "host" ,
98116 "type=keyword,time_series_dimension=true" ,
99117 "cluster" ,
@@ -114,6 +132,7 @@ public void populateIndex() {
114132 int numDocs = between (20 , 100 );
115133 docs .clear ();
116134 Map <String , Integer > requestCounts = new HashMap <>();
135+ List <String > allProjects = List .of ("project-1" , "project-2" , "project-3" );
117136 for (int i = 0 ; i < numDocs ; i ++) {
118137 List <String > hosts = randomSubsetOf (between (1 , hostToClusters .size ()), hostToClusters .keySet ());
119138 timestamp += between (1 , 10 ) * 1000L ;
@@ -127,7 +146,8 @@ public void populateIndex() {
127146 });
128147 int cpu = randomIntBetween (0 , 100 );
129148 ByteSizeValue memory = ByteSizeValue .ofBytes (randomIntBetween (1024 , 1024 * 1024 ));
130- docs .add (new Doc (host , hostToClusters .get (host ), timestamp , requestCount , cpu , memory ));
149+ List <String > projects = randomSubsetOf (between (1 , 3 ), allProjects );
150+ docs .add (new Doc (projects , host , hostToClusters .get (host ), timestamp , requestCount , cpu , memory ));
131151 }
132152 }
133153 Randomness .shuffle (docs );
@@ -136,6 +156,8 @@ public void populateIndex() {
136156 .setSource (
137157 "@timestamp" ,
138158 doc .timestamp ,
159+ "project" ,
160+ doc .project ,
139161 "host" ,
140162 doc .host ,
141163 "cluster" ,
@@ -646,7 +668,8 @@ public void testNullMetricsAreSkipped() {
646668 });
647669 int cpu = randomIntBetween (0 , 100 );
648670 ByteSizeValue memory = ByteSizeValue .ofBytes (randomIntBetween (1024 , 1024 * 1024 ));
649- sparseDocs .add (new Doc (host , hostToClusters .get (host ), timestamp , requestCount , cpu , memory ));
671+ String project = randomFrom ("project-1" , "project-2" , "project-3" );
672+ sparseDocs .add (new Doc (List .of (project ), host , hostToClusters .get (host ), timestamp , requestCount , cpu , memory ));
650673 }
651674
652675 Randomness .shuffle (sparseDocs );
@@ -703,4 +726,136 @@ public void testTSIDMetadataAttribute() {
703726 }
704727 }
705728
729+ public void testGroupByProject () {
730+ record TimeSeries (String cluster , String host ) {
731+
732+ }
733+ record Sample (int count , double sum ) {
734+
735+ }
736+ Map <TimeSeries , Tuple <Sample , Set <String >>> buckets = new HashMap <>();
737+ for (Doc doc : docs ) {
738+ TimeSeries timeSeries = new TimeSeries (doc .cluster , doc .host );
739+ buckets .compute (timeSeries , (k , v ) -> {
740+ if (v == null ) {
741+ return Tuple .tuple (new Sample (1 , doc .cpu ), Set .copyOf (doc .project ));
742+ } else {
743+ Set <String > projects = Sets .union (v .v2 (), Sets .newHashSet (doc .project ));
744+ return Tuple .tuple (new Sample (v .v1 ().count + 1 , v .v1 ().sum + doc .cpu ), projects );
745+ }
746+ });
747+ }
748+ try (var resp = run ("TS host* | STATS avg(avg_over_time(cpu)) BY project" )) {
749+ Map <String , Integer > countPerProject = new HashMap <>();
750+ Map <String , Double > sumOfAvgPerProject = new HashMap <>();
751+ for (var e : buckets .entrySet ()) {
752+ Sample sample = e .getValue ().v1 ();
753+ for (String project : e .getValue ().v2 ()) {
754+ countPerProject .merge (project , 1 , Integer ::sum );
755+ sumOfAvgPerProject .merge (project , sample .sum / sample .count , Double ::sum );
756+ }
757+ }
758+ List <List <Object >> rows = EsqlTestUtils .getValuesList (resp );
759+ assertThat (rows , hasSize (sumOfAvgPerProject .size ()));
760+ for (List <Object > r : rows ) {
761+ String project = (String ) r .get (1 );
762+ double actualAvg = (Double ) r .get (0 );
763+ double expectedAvg = sumOfAvgPerProject .get (project ) / countPerProject .get (project );
764+ assertThat (actualAvg , closeTo (expectedAvg , 0.5 ));
765+ }
766+ }
767+ try (var resp = run ("TS host* | STATS avg(avg_over_time(cpu)) BY project, cluster" )) {
768+ record Key (String project , String cluster ) {
769+
770+ }
771+ Map <Key , Integer > countPerProject = new HashMap <>();
772+ Map <Key , Double > sumOfAvgPerProject = new HashMap <>();
773+ for (var e : buckets .entrySet ()) {
774+ Sample sample = e .getValue ().v1 ();
775+ for (String project : e .getValue ().v2 ()) {
776+ Key key = new Key (project , e .getKey ().cluster );
777+ countPerProject .merge (key , 1 , Integer ::sum );
778+ sumOfAvgPerProject .merge (key , sample .sum / sample .count , Double ::sum );
779+ }
780+ }
781+ List <List <Object >> rows = EsqlTestUtils .getValuesList (resp );
782+ assertThat (rows , hasSize (sumOfAvgPerProject .size ()));
783+ for (List <Object > r : rows ) {
784+ Key key = new Key ((String ) r .get (1 ), (String ) r .get (2 ));
785+ double actualAvg = (Double ) r .get (0 );
786+ double expectedAvg = sumOfAvgPerProject .get (key ) / countPerProject .get (key );
787+ assertThat (actualAvg , closeTo (expectedAvg , 0.5 ));
788+ }
789+ }
790+ }
791+
792+ public void testGroupByProjectAndTBucket () {
793+ record TimeSeries (String cluster , String host , String tbucket ) {
794+
795+ }
796+ record Sample (int count , double sum ) {
797+
798+ }
799+ Map <TimeSeries , Tuple <Sample , Set <String >>> buckets = new HashMap <>();
800+ var rounding = new Rounding .Builder (TimeValue .timeValueMillis (TimeValue .timeValueMinutes (1 ).millis ())).build ().prepareForUnknown ();
801+ for (Doc doc : docs ) {
802+ var tbucket = DEFAULT_DATE_TIME_FORMATTER .formatMillis (rounding .round (doc .timestamp ));
803+ TimeSeries timeSeries = new TimeSeries (doc .cluster , doc .host , tbucket );
804+ buckets .compute (timeSeries , (k , v ) -> {
805+ if (v == null ) {
806+ return Tuple .tuple (new Sample (1 , doc .cpu ), Set .copyOf (doc .project ));
807+ } else {
808+ Set <String > projects = Sets .union (v .v2 (), Sets .newHashSet (doc .project ));
809+ return Tuple .tuple (new Sample (v .v1 ().count + 1 , v .v1 ().sum + doc .cpu ), projects );
810+ }
811+ });
812+ }
813+ try (var resp = run ("TS host* | STATS avg(avg_over_time(cpu)) BY project, tbucket(1minute)" )) {
814+ record Key (String project , String tbucket ) {
815+
816+ }
817+ Map <Key , Integer > countPerProject = new HashMap <>();
818+ Map <Key , Double > sumOfAvgPerProject = new HashMap <>();
819+ for (var e : buckets .entrySet ()) {
820+ Sample sample = e .getValue ().v1 ();
821+ for (String project : e .getValue ().v2 ()) {
822+ Key key = new Key (project , e .getKey ().tbucket );
823+ countPerProject .merge (key , 1 , Integer ::sum );
824+ sumOfAvgPerProject .merge (key , sample .sum / sample .count , Double ::sum );
825+ }
826+ }
827+ List <List <Object >> rows = EsqlTestUtils .getValuesList (resp );
828+ assertThat (rows , hasSize (sumOfAvgPerProject .size ()));
829+ for (List <Object > r : rows ) {
830+ Key key = new Key ((String ) r .get (1 ), (String ) r .get (2 ));
831+ double actualAvg = (Double ) r .get (0 );
832+ double expectedAvg = sumOfAvgPerProject .get (key ) / countPerProject .get (key );
833+ assertThat (actualAvg , closeTo (expectedAvg , 0.5 ));
834+ }
835+ }
836+ try (var resp = run ("TS host* | STATS avg(avg_over_time(cpu)) BY tbucket(1minute), project, cluster" )) {
837+ record Key (String project , String cluster , String tbucket ) {
838+
839+ }
840+ Map <Key , Integer > countPerProject = new HashMap <>();
841+ Map <Key , Double > sumOfAvgPerProject = new HashMap <>();
842+ for (var e : buckets .entrySet ()) {
843+ Sample sample = e .getValue ().v1 ();
844+ for (String project : e .getValue ().v2 ()) {
845+ Key key = new Key (project , e .getKey ().cluster , e .getKey ().tbucket );
846+ countPerProject .merge (key , 1 , Integer ::sum );
847+ sumOfAvgPerProject .merge (key , sample .sum / sample .count , Double ::sum );
848+ }
849+ }
850+ List <List <Object >> rows = EsqlTestUtils .getValuesList (resp );
851+ assertThat (rows , hasSize (sumOfAvgPerProject .size ()));
852+ for (List <Object > r : rows ) {
853+ Key key = new Key ((String ) r .get (2 ), (String ) r .get (3 ), (String ) r .get (1 ));
854+ double actualAvg = (Double ) r .get (0 );
855+ double expectedAvg = sumOfAvgPerProject .get (key ) / countPerProject .get (key );
856+ assertThat (actualAvg , closeTo (expectedAvg , 0.5 ));
857+ }
858+ }
859+ }
860+
706861}
0 commit comments