|
9 | 9 |
|
10 | 10 | import org.elasticsearch.Build; |
11 | 11 | import org.elasticsearch.common.Randomness; |
12 | | -import org.elasticsearch.common.Rounding; |
13 | 12 | import org.elasticsearch.common.settings.Settings; |
14 | 13 | import org.elasticsearch.compute.lucene.TimeSeriesSourceOperator; |
15 | 14 | import org.elasticsearch.compute.operator.DriverProfile; |
16 | 15 | import org.elasticsearch.compute.operator.TimeSeriesAggregationOperator; |
17 | | -import org.elasticsearch.core.TimeValue; |
18 | 16 | import org.elasticsearch.xpack.esql.EsqlTestUtils; |
19 | 17 | import org.elasticsearch.xpack.esql.core.type.DataType; |
20 | 18 | import org.junit.Before; |
21 | 19 |
|
22 | | -import java.time.ZoneOffset; |
23 | 20 | import java.util.ArrayList; |
24 | 21 | import java.util.Comparator; |
25 | 22 | import java.util.HashMap; |
@@ -321,319 +318,6 @@ record RateKey(String cluster, String host) { |
321 | 318 | } |
322 | 319 | } |
323 | 320 |
|
324 | | - @AwaitsFix(bugUrl = "removed?") |
325 | | - public void testRateWithTimeBucket() { |
326 | | - var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); |
327 | | - record RateKey(String host, String cluster, long interval) {} |
328 | | - Map<RateKey, List<RequestCounter>> groups = new HashMap<>(); |
329 | | - for (Doc doc : docs) { |
330 | | - RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp)); |
331 | | - groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount)); |
332 | | - } |
333 | | - Map<Long, List<Double>> bucketToRates = new HashMap<>(); |
334 | | - for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) { |
335 | | - List<Double> values = bucketToRates.computeIfAbsent(e.getKey().interval, k -> new ArrayList<>()); |
336 | | - Double rate = computeRate(e.getValue()); |
337 | | - if (rate != null) { |
338 | | - values.add(rate); |
339 | | - } |
340 | | - } |
341 | | - List<Long> sortedKeys = bucketToRates.keySet().stream().sorted().limit(5).toList(); |
342 | | - try (var resp = run("TS hosts | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute) | SORT ts | LIMIT 5")) { |
343 | | - assertThat( |
344 | | - resp.columns(), |
345 | | - equalTo(List.of(new ColumnInfoImpl("sum(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) |
346 | | - ); |
347 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
348 | | - assertThat(values, hasSize(sortedKeys.size())); |
349 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
350 | | - List<Object> row = values.get(i); |
351 | | - assertThat(row, hasSize(2)); |
352 | | - long key = sortedKeys.get(i); |
353 | | - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); |
354 | | - List<Double> bucketValues = bucketToRates.get(key); |
355 | | - if (bucketValues.isEmpty()) { |
356 | | - assertNull(row.get(0)); |
357 | | - } else { |
358 | | - assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1)); |
359 | | - } |
360 | | - } |
361 | | - } |
362 | | - try (var resp = run("TS hosts | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) | SORT ts | LIMIT 5")) { |
363 | | - assertThat( |
364 | | - resp.columns(), |
365 | | - equalTo(List.of(new ColumnInfoImpl("avg(rate(request_count))", "double", null), new ColumnInfoImpl("ts", "date", null))) |
366 | | - ); |
367 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
368 | | - assertThat(values, hasSize(sortedKeys.size())); |
369 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
370 | | - List<Object> row = values.get(i); |
371 | | - assertThat(row, hasSize(2)); |
372 | | - long key = sortedKeys.get(i); |
373 | | - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); |
374 | | - List<Double> bucketValues = bucketToRates.get(key); |
375 | | - if (bucketValues.isEmpty()) { |
376 | | - assertNull(row.get(0)); |
377 | | - } else { |
378 | | - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); |
379 | | - assertThat((double) row.get(0), closeTo(avg, 0.1)); |
380 | | - } |
381 | | - } |
382 | | - } |
383 | | - try (var resp = run(""" |
384 | | - TS hosts |
385 | | - | STATS avg(rate(request_count)), avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute) |
386 | | - | SORT ts |
387 | | - | LIMIT 5 |
388 | | - """)) { |
389 | | - assertThat( |
390 | | - resp.columns(), |
391 | | - equalTo( |
392 | | - List.of( |
393 | | - new ColumnInfoImpl("avg(rate(request_count))", "double", null), |
394 | | - new ColumnInfoImpl("avg(rate(request_count))", "double", null), |
395 | | - new ColumnInfoImpl("ts", "date", null) |
396 | | - ) |
397 | | - ) |
398 | | - ); |
399 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
400 | | - assertThat(values, hasSize(sortedKeys.size())); |
401 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
402 | | - List<Object> row = values.get(i); |
403 | | - assertThat(row, hasSize(3)); |
404 | | - long key = sortedKeys.get(i); |
405 | | - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key))); |
406 | | - List<Double> bucketValues = bucketToRates.get(key); |
407 | | - if (bucketValues.isEmpty()) { |
408 | | - assertNull(row.get(0)); |
409 | | - assertNull(row.get(1)); |
410 | | - } else { |
411 | | - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); |
412 | | - assertThat((double) row.get(0), closeTo(avg, 0.1)); |
413 | | - assertThat((double) row.get(1), closeTo(avg, 0.1)); |
414 | | - } |
415 | | - } |
416 | | - } |
417 | | - } |
418 | | - |
419 | | - @AwaitsFix(bugUrl = "removed?") |
420 | | - public void testRateWithTimeBucketAndCluster() { |
421 | | - var rounding = new Rounding.Builder(TimeValue.timeValueSeconds(60)).timeZone(ZoneOffset.UTC).build().prepareForUnknown(); |
422 | | - record RateKey(String host, String cluster, long interval) {} |
423 | | - Map<RateKey, List<RequestCounter>> groups = new HashMap<>(); |
424 | | - for (Doc doc : docs) { |
425 | | - RateKey key = new RateKey(doc.host, doc.cluster, rounding.round(doc.timestamp)); |
426 | | - groups.computeIfAbsent(key, k -> new ArrayList<>()).add(new RequestCounter(doc.timestamp, doc.requestCount)); |
427 | | - } |
428 | | - record GroupKey(String cluster, long interval) {} |
429 | | - Map<GroupKey, List<Double>> rateBuckets = new HashMap<>(); |
430 | | - for (Map.Entry<RateKey, List<RequestCounter>> e : groups.entrySet()) { |
431 | | - RateKey key = e.getKey(); |
432 | | - List<Double> values = rateBuckets.computeIfAbsent(new GroupKey(key.cluster, key.interval), k -> new ArrayList<>()); |
433 | | - Double rate = computeRate(e.getValue()); |
434 | | - if (rate != null) { |
435 | | - values.add(rate); |
436 | | - } |
437 | | - } |
438 | | - Map<GroupKey, List<Double>> cpuBuckets = new HashMap<>(); |
439 | | - for (Doc doc : docs) { |
440 | | - GroupKey key = new GroupKey(doc.cluster, rounding.round(doc.timestamp)); |
441 | | - cpuBuckets.computeIfAbsent(key, k -> new ArrayList<>()).add(doc.cpu); |
442 | | - } |
443 | | - List<GroupKey> sortedKeys = rateBuckets.keySet() |
444 | | - .stream() |
445 | | - .sorted(Comparator.comparing(GroupKey::interval).thenComparing(GroupKey::cluster)) |
446 | | - .limit(5) |
447 | | - .toList(); |
448 | | - try (var resp = run(""" |
449 | | - TS hosts |
450 | | - | STATS sum(rate(request_count)) BY ts=bucket(@timestamp, 1 minute), cluster |
451 | | - | SORT ts, cluster |
452 | | - | LIMIT 5""")) { |
453 | | - assertThat( |
454 | | - resp.columns(), |
455 | | - equalTo( |
456 | | - List.of( |
457 | | - new ColumnInfoImpl("sum(rate(request_count))", "double", null), |
458 | | - new ColumnInfoImpl("ts", "date", null), |
459 | | - new ColumnInfoImpl("cluster", "keyword", null) |
460 | | - ) |
461 | | - ) |
462 | | - ); |
463 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
464 | | - assertThat(values, hasSize(sortedKeys.size())); |
465 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
466 | | - List<Object> row = values.get(i); |
467 | | - assertThat(row, hasSize(3)); |
468 | | - var key = sortedKeys.get(i); |
469 | | - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); |
470 | | - assertThat(row.get(2), equalTo(key.cluster)); |
471 | | - List<Double> bucketValues = rateBuckets.get(key); |
472 | | - if (bucketValues.isEmpty()) { |
473 | | - assertNull(row.get(0)); |
474 | | - } else { |
475 | | - assertThat((double) row.get(0), closeTo(bucketValues.stream().mapToDouble(d -> d).sum(), 0.1)); |
476 | | - } |
477 | | - } |
478 | | - } |
479 | | - try (var resp = run(""" |
480 | | - TS hosts |
481 | | - | STATS avg(rate(request_count)) BY ts=bucket(@timestamp, 1minute), cluster |
482 | | - | SORT ts, cluster |
483 | | - | LIMIT 5""")) { |
484 | | - assertThat( |
485 | | - resp.columns(), |
486 | | - equalTo( |
487 | | - List.of( |
488 | | - new ColumnInfoImpl("avg(rate(request_count))", "double", null), |
489 | | - new ColumnInfoImpl("ts", "date", null), |
490 | | - new ColumnInfoImpl("cluster", "keyword", null) |
491 | | - ) |
492 | | - ) |
493 | | - ); |
494 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
495 | | - assertThat(values, hasSize(sortedKeys.size())); |
496 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
497 | | - List<Object> row = values.get(i); |
498 | | - assertThat(row, hasSize(3)); |
499 | | - var key = sortedKeys.get(i); |
500 | | - assertThat(row.get(1), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); |
501 | | - assertThat(row.get(2), equalTo(key.cluster)); |
502 | | - List<Double> bucketValues = rateBuckets.get(key); |
503 | | - if (bucketValues.isEmpty()) { |
504 | | - assertNull(row.get(0)); |
505 | | - } else { |
506 | | - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); |
507 | | - assertThat((double) row.get(0), closeTo(avg, 0.1)); |
508 | | - } |
509 | | - } |
510 | | - } |
511 | | - |
512 | | - try (var resp = run(""" |
513 | | - TS hosts |
514 | | - | STATS |
515 | | - s = sum(rate(request_count)), |
516 | | - c = count(rate(request_count)), |
517 | | - max(rate(request_count)), |
518 | | - avg(rate(request_count)) |
519 | | - BY ts=bucket(@timestamp, 1minute), cluster |
520 | | - | SORT ts, cluster |
521 | | - | LIMIT 5 |
522 | | - | EVAL avg_rate= s/c |
523 | | - | KEEP avg_rate, `max(rate(request_count))`, `avg(rate(request_count))`, ts, cluster |
524 | | - """)) { |
525 | | - assertThat( |
526 | | - resp.columns(), |
527 | | - equalTo( |
528 | | - List.of( |
529 | | - new ColumnInfoImpl("avg_rate", "double", null), |
530 | | - new ColumnInfoImpl("max(rate(request_count))", "double", null), |
531 | | - new ColumnInfoImpl("avg(rate(request_count))", "double", null), |
532 | | - new ColumnInfoImpl("ts", "date", null), |
533 | | - new ColumnInfoImpl("cluster", "keyword", null) |
534 | | - ) |
535 | | - ) |
536 | | - ); |
537 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
538 | | - assertThat(values, hasSize(sortedKeys.size())); |
539 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
540 | | - List<Object> row = values.get(i); |
541 | | - assertThat(row, hasSize(5)); |
542 | | - var key = sortedKeys.get(i); |
543 | | - assertThat(row.get(3), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); |
544 | | - assertThat(row.get(4), equalTo(key.cluster)); |
545 | | - List<Double> bucketValues = rateBuckets.get(key); |
546 | | - if (bucketValues.isEmpty()) { |
547 | | - assertNull(row.get(0)); |
548 | | - assertNull(row.get(1)); |
549 | | - } else { |
550 | | - double avg = bucketValues.stream().mapToDouble(d -> d).sum() / bucketValues.size(); |
551 | | - assertThat((double) row.get(0), closeTo(avg, 0.1)); |
552 | | - double max = bucketValues.stream().mapToDouble(d -> d).max().orElse(0.0); |
553 | | - assertThat((double) row.get(1), closeTo(max, 0.1)); |
554 | | - } |
555 | | - assertEquals(row.get(0), row.get(2)); |
556 | | - } |
557 | | - } |
558 | | - try (var resp = run(""" |
559 | | - TS hosts |
560 | | - | STATS sum(rate(request_count)), max(cpu) BY ts=bucket(@timestamp, 1 minute), cluster |
561 | | - | SORT ts, cluster |
562 | | - | LIMIT 5""")) { |
563 | | - assertThat( |
564 | | - resp.columns(), |
565 | | - equalTo( |
566 | | - List.of( |
567 | | - new ColumnInfoImpl("sum(rate(request_count))", "double", null), |
568 | | - new ColumnInfoImpl("max(cpu)", "double", null), |
569 | | - new ColumnInfoImpl("ts", "date", null), |
570 | | - new ColumnInfoImpl("cluster", "keyword", null) |
571 | | - ) |
572 | | - ) |
573 | | - ); |
574 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
575 | | - assertThat(values, hasSize(sortedKeys.size())); |
576 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
577 | | - List<Object> row = values.get(i); |
578 | | - assertThat(row, hasSize(4)); |
579 | | - var key = sortedKeys.get(i); |
580 | | - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); |
581 | | - assertThat(row.get(3), equalTo(key.cluster)); |
582 | | - List<Double> rateBucket = rateBuckets.get(key); |
583 | | - if (rateBucket.isEmpty()) { |
584 | | - assertNull(row.get(0)); |
585 | | - } else { |
586 | | - assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1)); |
587 | | - } |
588 | | - List<Double> cpuBucket = cpuBuckets.get(key); |
589 | | - if (cpuBuckets.isEmpty()) { |
590 | | - assertNull(row.get(1)); |
591 | | - } else { |
592 | | - assertThat((double) row.get(1), closeTo(cpuBucket.stream().mapToDouble(d -> d).max().orElse(0.0), 0.1)); |
593 | | - } |
594 | | - } |
595 | | - } |
596 | | - try (var resp = run(""" |
597 | | - TS hosts |
598 | | - | STATS sum(rate(request_count)), avg(cpu) BY ts=bucket(@timestamp, 1 minute), cluster |
599 | | - | SORT ts, cluster |
600 | | - | LIMIT 5""")) { |
601 | | - assertThat( |
602 | | - resp.columns(), |
603 | | - equalTo( |
604 | | - List.of( |
605 | | - new ColumnInfoImpl("sum(rate(request_count))", "double", null), |
606 | | - new ColumnInfoImpl("avg(cpu)", "double", null), |
607 | | - new ColumnInfoImpl("ts", "date", null), |
608 | | - new ColumnInfoImpl("cluster", "keyword", null) |
609 | | - ) |
610 | | - ) |
611 | | - ); |
612 | | - List<List<Object>> values = EsqlTestUtils.getValuesList(resp); |
613 | | - assertThat(values, hasSize(sortedKeys.size())); |
614 | | - for (int i = 0; i < sortedKeys.size(); i++) { |
615 | | - List<Object> row = values.get(i); |
616 | | - assertThat(row, hasSize(4)); |
617 | | - var key = sortedKeys.get(i); |
618 | | - assertThat(row.get(2), equalTo(DEFAULT_DATE_TIME_FORMATTER.formatMillis(key.interval))); |
619 | | - assertThat(row.get(3), equalTo(key.cluster)); |
620 | | - List<Double> rateBucket = rateBuckets.get(key); |
621 | | - if (rateBucket.isEmpty()) { |
622 | | - assertNull(row.get(0)); |
623 | | - } else { |
624 | | - assertThat((double) row.get(0), closeTo(rateBucket.stream().mapToDouble(d -> d).sum(), 0.1)); |
625 | | - } |
626 | | - List<Double> cpuBucket = cpuBuckets.get(key); |
627 | | - if (cpuBuckets.isEmpty()) { |
628 | | - assertNull(row.get(1)); |
629 | | - } else { |
630 | | - double avg = cpuBucket.stream().mapToDouble(d -> d).sum() / cpuBucket.size(); |
631 | | - assertThat((double) row.get(1), closeTo(avg, 0.1)); |
632 | | - } |
633 | | - } |
634 | | - } |
635 | | - } |
636 | | - |
637 | 321 | public void testApplyRateBeforeFinalGrouping() { |
638 | 322 | record RateKey(String cluster, String host) { |
639 | 323 |
|
|
0 commit comments