Skip to content

Commit 6fe1fd8

Browse files
authored
Add self obs metrics for L1/L2 queue and persistent cache. (apache#13405)
* 1.[Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values. 2. Fix `MetricsPersistentWorker`, remove DataCarrier queue from `Hour/Day` dimensions metrics persistent process. 3. Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server.
1 parent 30702c7 commit 6fe1fd8

File tree

16 files changed

+508
-130
lines changed

16 files changed

+508
-130
lines changed

docs/en/api/metrics-query-expression.md

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -440,19 +440,42 @@ TIME_SERIES_VALUES.
440440

441441
## Sort Operation
442442
### SortValues Operation
443-
SortValues Operation takes an expression and sorts the values of the input expression result.
444-
443+
SortValues Operation takes an expression used to sort and pick the top N label value groups, which according to
444+
the values of a given ExpressionResult and based on the specified order, limit and aggregation type.
445+
If the input expression is not a labeled result, it will retrurn the original expression result.
445446
Expression:
446447
```text
447-
sort_values(Expression, <limit>, <order>)
448+
sort_values(Expression, <limit>, <order>, <aggregation_type>)
448449
```
449-
- `limit` is the number of the sort results, should be a positive integer, if not specified, will return all results. Optional.
450+
- `limit` is the number of the sort results, should be a positive integer.
450451
- `order` is the order of the sort results. The value of `order` can be `asc` or `des`.
452+
- `aggregation_type` is the type of the aggregation operation. The type can be `avg`, `sum`, `max`, `min`.
451453

452-
For example:
453-
If we want to sort the `service_resp_time` metric values in descending order and get the top 10 values, we can use the following expression:
454+
For example, the following metrics in time series T1 and T2:
455+
```text
456+
T1:
457+
http_requests_total{service="api"} 160
458+
http_requests_total{service="web"} 120
459+
http_requests_total{service="auth"} 80
460+
461+
T2:
462+
http_requests_total{service="api"} 100
463+
http_requests_total{service="web"} 180
464+
http_requests_total{service="auth"} 10
465+
```
466+
We can use SortValuesOp to pick the top 2 services with the most avg requests in descending order:
467+
```text
468+
sort_values(http_requests_total, 2, desc, avg)
469+
```
470+
The result will be:
454471
```text
455-
sort_values(service_resp_time, 10, des)
472+
T1:
473+
http_requests_total{service="web"} 120
474+
http_requests_total{service="api"} 160
475+
476+
T2:
477+
http_requests_total{service="web"} 180
478+
http_requests_total{service="api"} 100
456479
```
457480

458481
#### Result Type

docs/en/changes/changes.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
* Bump up BanyanDB dependency version(server and java-client) to 0.9.0.
66
* Fix CVE-2025-54057, restrict and validate url for widgets.
7+
* Fix `MetricsPersistentWorker`, remove DataCarrier queue from `Hour/Day` dimensions metrics persistent process.
8+
This is important to reduce memory cost and `Hour/Day` dimensions metrics persistent latency.
79

810
#### OAP Server
911

@@ -46,6 +48,8 @@
4648
* Add UI dashboard for Ruby runtime metrics.
4749
* Tracing Query Execution HTTP APIs: make the argument `service layer` optional.
4850
* GraphQL API: metadata, topology, log and trace support query by name.
51+
* [Break Change] MQE function `sort_values` sorts according to the aggregation result and labels rather than the simple time series values.
52+
* Self Observability: add `metrics_aggregation_queue_used_percentage` and `metrics_persistent_collection_cached_size` metrics for the OAP server.
4953

5054
#### UI
5155

@@ -63,6 +67,7 @@
6367
* Fix the snapshot charts unable to display.
6468
* Bump vue-i18n from 9.14.3 to 9.14.5.
6569
* Fix split queries for topology to avoid page crash.
70+
* Self Observability ui-template: Add new panels for monitor `metrics aggregation queue used percentage` and `metrics persistent collection cached size`.
6671

6772
#### Documentation
6873

oap-server/mqe-grammar/src/main/antlr4/org/apache/skywalking/mqe/rt/grammar/MQEParser.g4

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ expression
3838
| topNOf L_PAREN topN (COMMA topN)* COMMA INTEGER COMMA order R_PAREN #topNOfOP
3939
| relabels L_PAREN expression COMMA label COMMA replaceLabel R_PAREN #relablesOP
4040
| aggregateLabels L_PAREN expression COMMA aggregateLabelsFunc R_PAREN #aggregateLabelsOp
41-
| sort_values L_PAREN expression (COMMA INTEGER)? COMMA order R_PAREN #sortValuesOP
41+
| sort_values L_PAREN expression COMMA INTEGER COMMA order COMMA aggregation R_PAREN #sortValuesOP
4242
| sort_label_values L_PAREN expression COMMA order COMMA labelNameList R_PAREN #sortLabelValuesOP
4343
| baseline L_PAREN metric COMMA baseline_type R_PAREN #baselineOP
4444
;

oap-server/mqe-rt/src/main/java/org/apache/skywalking/mqe/rt/MQEVisitorBase.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.util.HashMap;
2424
import java.util.List;
2525
import java.util.Map;
26-
import java.util.Optional;
2726
import java.util.function.BiFunction;
2827
import lombok.SneakyThrows;
2928
import lombok.extern.slf4j.Slf4j;
@@ -437,12 +436,9 @@ public ExpressionResult visitSortValuesOP(MQEParser.SortValuesOPContext ctx) {
437436
try {
438437
ExpressionResult result = visit(ctx.expression());
439438
int order = ctx.order().getStart().getType();
440-
Optional<Integer> limit = Optional.empty();
441-
if (ctx.INTEGER() != null) {
442-
limit = Optional.of(Integer.valueOf(ctx.INTEGER().getText()));
443-
}
439+
int limit = Integer.parseInt(ctx.INTEGER().getText());
444440
try {
445-
return SortValuesOp.doSortValuesOp(result, limit, order);
441+
return SortValuesOp.doSortValuesOp(result, limit, order, MQEParser.AVG);
446442
} catch (IllegalExpressionException e) {
447443
return getErrorResult(e.getMessage());
448444
}

oap-server/mqe-rt/src/main/java/org/apache/skywalking/mqe/rt/operation/SortValuesOp.java

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -18,40 +18,98 @@
1818

1919
package org.apache.skywalking.mqe.rt.operation;
2020

21+
import java.util.ArrayList;
2122
import java.util.Comparator;
2223
import java.util.List;
23-
import java.util.Optional;
24+
import java.util.Map;
2425
import java.util.stream.Collectors;
2526
import org.apache.skywalking.mqe.rt.exception.IllegalExpressionException;
2627
import org.apache.skywalking.mqe.rt.grammar.MQEParser;
2728
import org.apache.skywalking.oap.server.core.query.mqe.ExpressionResult;
28-
import org.apache.skywalking.oap.server.core.query.mqe.MQEValue;
29+
import org.apache.skywalking.oap.server.core.query.mqe.ExpressionResultType;
30+
import org.apache.skywalking.oap.server.core.query.mqe.MQEValues;
31+
import org.apache.skywalking.oap.server.core.query.mqe.Metadata;
2932

33+
/**
34+
* When a result has multiple label value groups, it is often required to sort the values of these groups.
35+
* SortValuesOp is used to sort and pick the top N label value groups, which according to
36+
* the values of a given ExpressionResult and based on the specified order, limit and aggregation type.
37+
*
38+
* It first performs aggregation on the results, then sorts them according to the specified order (ascending or descending),
39+
* and finally limits the number of results if a limit is provided.
40+
*
41+
* for example, the following metrics in time series T1 and T2:
42+
* T1:
43+
* http_requests_total{service="api"} 160
44+
* http_requests_total{service="web"} 120
45+
* http_requests_total{service="auth"} 80
46+
*
47+
* T2:
48+
* http_requests_total{service="api"} 100
49+
* http_requests_total{service="web"} 180
50+
* http_requests_total{service="auth"} 10
51+
*
52+
* We can use SortValuesOp to pick the top 2 services with the most avg requests in descending order:
53+
* `sort_values(http_requests_total, 2, desc, avg)`
54+
* The result will be:
55+
* T1:
56+
* http_requests_total{service="web"} 120
57+
* http_requests_total{service="api"} 160
58+
*
59+
* T2:
60+
* http_requests_total{service="web"} 180
61+
* http_requests_total{service="api"} 100
62+
*/
3063
public class SortValuesOp {
3164
public static ExpressionResult doSortValuesOp(ExpressionResult expResult,
32-
Optional<Integer> limit,
33-
int order) throws IllegalExpressionException {
34-
if (MQEParser.ASC == order || MQEParser.DES == order) {
35-
expResult.getResults().forEach(mqeValues -> {
36-
List<MQEValue> values = mqeValues.getValues()
37-
.stream()
38-
// Filter out empty values
39-
.filter(mqeValue -> !mqeValue.isEmptyValue())
40-
.sorted(MQEParser.ASC == order ? Comparator.comparingDouble(
41-
MQEValue::getDoubleValue) :
42-
Comparator.comparingDouble(MQEValue::getDoubleValue)
43-
.reversed())
44-
.collect(
45-
Collectors.toList());
46-
if (limit.isPresent() && limit.get() < values.size()) {
47-
mqeValues.setValues(values.subList(0, limit.get()));
48-
} else {
49-
mqeValues.setValues(values);
50-
}
51-
});
52-
} else {
53-
throw new IllegalExpressionException("Unsupported sort order.");
65+
int limit,
66+
int order,
67+
int aggregationType) throws IllegalExpressionException {
68+
// no label result, no need to sort
69+
if (!expResult.isLabeledResult()) {
70+
return expResult;
5471
}
72+
// store the original results in a map to avoid losing data during aggregation
73+
Map<Metadata, MQEValues> resultMap = expResult.getResults()
74+
.stream()
75+
.collect(Collectors.toMap(
76+
MQEValues::getMetric, v -> {
77+
MQEValues newValues = new MQEValues();
78+
newValues.setMetric(v.getMetric());
79+
newValues.setValues(v.getValues());
80+
return newValues;
81+
}
82+
));
83+
// do aggregation first
84+
ExpressionResult aggResult = AggregationOp.doAggregationOp(expResult, aggregationType);
85+
86+
List<MQEValues> sorted =
87+
aggResult.getResults().stream()
88+
.sorted(getComparator(order))
89+
.collect(Collectors.toList());
90+
if (limit < sorted.size()) {
91+
sorted = sorted.subList(0, limit);
92+
}
93+
List<MQEValues> results = new ArrayList<>();
94+
sorted.forEach(v -> {
95+
MQEValues mqeValues = resultMap.get(v.getMetric());
96+
if (mqeValues != null) {
97+
results.add(mqeValues);
98+
}
99+
}
100+
);
101+
102+
expResult.setResults(results);
103+
expResult.setType(ExpressionResultType.TIME_SERIES_VALUES);
55104
return expResult;
56105
}
106+
107+
private static Comparator<MQEValues> getComparator(int order) {
108+
Comparator<MQEValues> comparator = Comparator.comparingDouble(
109+
mqeValues -> mqeValues.getValues().isEmpty()
110+
? Double.NaN
111+
: mqeValues.getValues().get(0).getDoubleValue()
112+
);
113+
return order == MQEParser.ASC ? comparator : comparator.reversed();
114+
}
57115
}

oap-server/mqe-rt/src/test/java/org/apache/skywalking/mqe/rt/SortValuesOpTest.java

Lines changed: 49 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
package org.apache.skywalking.mqe.rt;
2020

21-
import java.util.Optional;
2221
import org.apache.skywalking.mqe.rt.exception.IllegalExpressionException;
2322
import org.apache.skywalking.mqe.rt.grammar.MQEParser;
2423
import org.apache.skywalking.mqe.rt.operation.SortValuesOp;
@@ -32,35 +31,64 @@ public class SortValuesOpTest {
3231
@Test
3332
public void sortValueTest() throws IllegalExpressionException {
3433
//no label
35-
ExpressionResult des = SortValuesOp.doSortValuesOp(mockData.newSeriesNoLabeledResult(), Optional.of(3),
36-
MQEParser.DES);
37-
Assertions.assertEquals(300, des.getResults().get(0).getValues().get(0).getDoubleValue());
38-
Assertions.assertEquals(100, des.getResults().get(0).getValues().get(1).getDoubleValue());
39-
ExpressionResult asc = SortValuesOp.doSortValuesOp(mockData.newSeriesNoLabeledResult(), Optional.of(3),
40-
MQEParser.ASC);
34+
ExpressionResult des = SortValuesOp.doSortValuesOp(
35+
mockData.newSeriesNoLabeledResult(), 3,
36+
MQEParser.DES, MQEParser.AVG
37+
);
38+
Assertions.assertEquals(100, des.getResults().get(0).getValues().get(0).getDoubleValue());
39+
Assertions.assertEquals(300, des.getResults().get(0).getValues().get(1).getDoubleValue());
40+
ExpressionResult asc = SortValuesOp.doSortValuesOp(
41+
mockData.newSeriesNoLabeledResult(), 3,
42+
MQEParser.ASC, MQEParser.AVG
43+
);
4144
Assertions.assertEquals(100, asc.getResults().get(0).getValues().get(0).getDoubleValue());
4245
Assertions.assertEquals(300, asc.getResults().get(0).getValues().get(1).getDoubleValue());
4346

4447
//labeled
45-
ExpressionResult desLabeled = SortValuesOp.doSortValuesOp(mockData.newSeriesLabeledResult(), Optional.of(3),
46-
MQEParser.DES);
47-
Assertions.assertEquals(300, desLabeled.getResults().get(0).getValues().get(0).getDoubleValue());
48-
Assertions.assertEquals(100, desLabeled.getResults().get(0).getValues().get(1).getDoubleValue());
49-
Assertions.assertEquals(301, desLabeled.getResults().get(1).getValues().get(0).getDoubleValue());
50-
Assertions.assertEquals(101, desLabeled.getResults().get(1).getValues().get(1).getDoubleValue());
51-
ExpressionResult ascLabeled = SortValuesOp.doSortValuesOp(mockData.newSeriesLabeledResult(), Optional.of(2),
52-
MQEParser.ASC);
48+
ExpressionResult desLabeled = SortValuesOp.doSortValuesOp(
49+
mockData.newSeriesLabeledResult(), 3,
50+
MQEParser.DES, MQEParser.AVG
51+
);
52+
Assertions.assertEquals(101, desLabeled.getResults().get(0).getValues().get(0).getDoubleValue());
53+
Assertions.assertEquals(301, desLabeled.getResults().get(0).getValues().get(1).getDoubleValue());
54+
Assertions.assertEquals("label", desLabeled.getResults().get(0).getMetric().getLabels().get(0).getKey());
55+
Assertions.assertEquals("2", desLabeled.getResults().get(0).getMetric().getLabels().get(0).getValue());
56+
Assertions.assertEquals("label2", desLabeled.getResults().get(0).getMetric().getLabels().get(1).getKey());
57+
Assertions.assertEquals("21", desLabeled.getResults().get(0).getMetric().getLabels().get(1).getValue());
58+
Assertions.assertEquals(100, desLabeled.getResults().get(1).getValues().get(0).getDoubleValue());
59+
Assertions.assertEquals(300, desLabeled.getResults().get(1).getValues().get(1).getDoubleValue());
60+
Assertions.assertEquals("label", desLabeled.getResults().get(1).getMetric().getLabels().get(0).getKey());
61+
Assertions.assertEquals("1", desLabeled.getResults().get(1).getMetric().getLabels().get(0).getValue());
62+
Assertions.assertEquals("label2", desLabeled.getResults().get(1).getMetric().getLabels().get(1).getKey());
63+
Assertions.assertEquals("21", desLabeled.getResults().get(1).getMetric().getLabels().get(1).getValue());
64+
65+
ExpressionResult ascLabeled = SortValuesOp.doSortValuesOp(
66+
mockData.newSeriesLabeledResult(), 3,
67+
MQEParser.ASC, MQEParser.AVG
68+
);
5369
Assertions.assertEquals(100, ascLabeled.getResults().get(0).getValues().get(0).getDoubleValue());
5470
Assertions.assertEquals(300, ascLabeled.getResults().get(0).getValues().get(1).getDoubleValue());
71+
Assertions.assertEquals("label", ascLabeled.getResults().get(0).getMetric().getLabels().get(0).getKey());
72+
Assertions.assertEquals("1", ascLabeled.getResults().get(0).getMetric().getLabels().get(0).getValue());
73+
Assertions.assertEquals("label2", ascLabeled.getResults().get(0).getMetric().getLabels().get(1).getKey());
74+
Assertions.assertEquals("21", ascLabeled.getResults().get(0).getMetric().getLabels().get(1).getValue());
5575
Assertions.assertEquals(101, ascLabeled.getResults().get(1).getValues().get(0).getDoubleValue());
5676
Assertions.assertEquals(301, ascLabeled.getResults().get(1).getValues().get(1).getDoubleValue());
77+
Assertions.assertEquals("label", ascLabeled.getResults().get(1).getMetric().getLabels().get(0).getKey());
78+
Assertions.assertEquals("2", ascLabeled.getResults().get(1).getMetric().getLabels().get(0).getValue());
79+
Assertions.assertEquals("label2", ascLabeled.getResults().get(1).getMetric().getLabels().get(1).getKey());
80+
Assertions.assertEquals("21", ascLabeled.getResults().get(1).getMetric().getLabels().get(1).getValue());
5781

5882
//limit
59-
ExpressionResult desLabeledLimit = SortValuesOp.doSortValuesOp(mockData.newSeriesLabeledResult(), Optional.of(1),
60-
MQEParser.DES);
61-
Assertions.assertEquals(1, desLabeledLimit.getResults().get(0).getValues().size());
62-
Assertions.assertEquals(1, desLabeledLimit.getResults().get(1).getValues().size());
63-
Assertions.assertEquals(300, desLabeledLimit.getResults().get(0).getValues().get(0).getDoubleValue());
64-
Assertions.assertEquals(301, desLabeledLimit.getResults().get(1).getValues().get(0).getDoubleValue());
83+
ExpressionResult desLabeledLimit = SortValuesOp.doSortValuesOp(
84+
mockData.newSeriesLabeledResult(), 1,
85+
MQEParser.DES, MQEParser.AVG
86+
);
87+
Assertions.assertEquals(101, desLabeledLimit.getResults().get(0).getValues().get(0).getDoubleValue());
88+
Assertions.assertEquals(301, desLabeledLimit.getResults().get(0).getValues().get(1).getDoubleValue());
89+
Assertions.assertEquals("label", desLabeledLimit.getResults().get(0).getMetric().getLabels().get(0).getKey());
90+
Assertions.assertEquals("2", desLabeledLimit.getResults().get(0).getMetric().getLabels().get(0).getValue());
91+
Assertions.assertEquals("label2", desLabeledLimit.getResults().get(0).getMetric().getLabels().get(1).getKey());
92+
Assertions.assertEquals("21", desLabeledLimit.getResults().get(0).getMetric().getLabels().get(1).getValue());
6593
}
6694
}

0 commit comments

Comments
 (0)