@@ -142,7 +142,7 @@ static Map<String, List<Tuple<String, Tuple<Instant, Integer>>>> groupByTimeseri
142142 .collect (Collectors .groupingBy (Tuple ::v1 ));
143143 }
144144
145- static Double aggregatePerTimeseries (
145+ static Object aggregatePerTimeseries (
146146 Map <String , List <Tuple <String , Tuple <Instant , Integer >>>> timeseries ,
147147 Agg crossAgg ,
148148 Agg timeseriesAgg
@@ -152,27 +152,32 @@ static Double aggregatePerTimeseries(
152152 return aggregateValuesInWindow (values , timeseriesAgg );
153153 }).filter (Objects ::nonNull ).toList ();
154154
155- if (res .isEmpty ()) {
156- return null ; // No values to aggregate
155+ if (res .isEmpty () && timeseriesAgg == Agg . COUNT ) {
156+ res = List . of ( 0.0 );
157157 }
158+
158159 return switch (crossAgg ) {
159- case MAX -> res .stream ().mapToDouble (Double ::doubleValue ).max ().orElseThrow ();
160- case MIN -> res .stream ().mapToDouble (Double ::doubleValue ).min ().orElseThrow ();
161- case AVG -> res .stream ().mapToDouble (Double ::doubleValue ).average ().orElseThrow ();
162- case SUM -> res .stream ().mapToDouble (Double ::doubleValue ).sum ();
163- case COUNT -> (double ) res .size ();
160+ case MAX -> res .isEmpty ()
161+ ? null
162+ : Double .valueOf (res .stream ().mapToDouble (Double ::doubleValue ).max ().orElseThrow ()).longValue ();
163+ case MIN -> res .isEmpty ()
164+ ? null
165+ : Double .valueOf (res .stream ().mapToDouble (Double ::doubleValue ).min ().orElseThrow ()).longValue ();
166+ case AVG -> res .isEmpty () ? null : res .stream ().mapToDouble (Double ::doubleValue ).average ().orElseThrow ();
167+ case SUM -> res .isEmpty () ? null : Double .valueOf (res .stream ().mapToDouble (Double ::doubleValue ).sum ()).longValue ();
168+ case COUNT -> Integer .toUnsignedLong (res .size ());
164169 };
165170 }
166171
167172 static Double aggregateValuesInWindow (List <Integer > values , Agg agg ) {
168- if (values .isEmpty ()) {
169- throw new IllegalArgumentException ("No values to aggregate for " + agg + " operation" );
170- }
173+ // if (values.isEmpty()) {
174+ // throw new IllegalArgumentException("No values to aggregate for " + agg + " operation");
175+ // }
171176 return switch (agg ) {
172177 case MAX -> Double .valueOf (values .stream ().max (Integer ::compareTo ).orElseThrow ());
173178 case MIN -> Double .valueOf (values .stream ().min (Integer ::compareTo ).orElseThrow ());
174179 case AVG -> values .stream ().mapToDouble (Integer ::doubleValue ).average ().orElseThrow ();
175- case SUM -> values .stream ().mapToDouble (Integer ::doubleValue ).sum ();
180+ case SUM -> values .isEmpty () ? null : values . stream ().mapToDouble (Integer ::doubleValue ).sum ();
176181 case COUNT -> (double ) values .size ();
177182 };
178183 }
@@ -430,14 +435,14 @@ public void testGroupBySubset() {
430435 containsInAnyOrder (docValues .stream ().mapToLong (Integer ::longValue ).boxed ().toArray (Long []::new ))
431436 );
432437 } else {
433- assertThat (row .getFirst (), equalTo (docValues .getFirst ().longValue ()));
438+ assertThat (row .getFirst (), equalTo (docValues .isEmpty () ? null : docValues . getFirst ().longValue ()));
434439 }
435- assertThat (row .get (1 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX ). longValue () ));
436- assertThat (row .get (2 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN ). longValue () ));
437- assertThat (row .get (3 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT ). longValue () ));
438- assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ). longValue () ));
439- var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
440- assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
440+ assertThat (row .get (1 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX )));
441+ assertThat (row .get (2 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN )));
442+ assertThat (row .get (3 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT )));
443+ assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM )));
444+ var avg = ( Double ) aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
445+ assertThat ((Double ) row .get (5 ), row . get ( 5 ) == null ? equalTo ( null ) : closeTo (avg , avg * 0.01 ));
441446 // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
442447 }
443448 }
@@ -475,14 +480,14 @@ public void testGroupByNothing() {
475480 containsInAnyOrder (docValues .stream ().mapToLong (Integer ::longValue ).boxed ().toArray (Long []::new ))
476481 );
477482 } else {
478- assertThat (row .get ( 0 ), equalTo (docValues .getFirst ().longValue ()));
483+ assertThat (row .getFirst ( ), equalTo (docValues . isEmpty () ? null : docValues .getFirst ().longValue ()));
479484 }
480- assertThat (row .get (1 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX ). longValue () ));
481- assertThat (row .get (2 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN ). longValue () ));
482- assertThat (row .get (3 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT ). longValue () ));
483- assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM ). longValue () ));
484- var avg = aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
485- assertThat ((Double ) row .get (5 ), closeTo (avg , avg * 0.01 ));
485+ assertThat (row .get (1 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MAX , Agg .MAX )));
486+ assertThat (row .get (2 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .MIN , Agg .MIN )));
487+ assertThat (row .get (3 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .COUNT )));
488+ assertThat (row .get (4 ), equalTo (aggregatePerTimeseries (tsGroups , Agg .SUM , Agg .SUM )));
489+ var avg = ( Double ) aggregatePerTimeseries (tsGroups , Agg .AVG , Agg .AVG );
490+ assertThat ((Double ) row .get (5 ), row . get ( 5 ) == null ? equalTo ( null ) : closeTo (avg , avg * 0.01 ));
486491 // assertThat(row.get(6), equalTo(aggregatePerTimeseries(tsGroups, Agg.COUNT, Agg.COUNT).longValue()));
487492 }
488493 }
0 commit comments