Skip to content

Commit 960d5f4

Browse files
authored
Enhance TopN config to support excludes filter (#13361)
* Enhance TopN config to support excludes filter * banyandb version
1 parent 4b508b9 commit 960d5f4

File tree

13 files changed

+219
-48
lines changed

13 files changed

+219
-48
lines changed

docs/en/setup/backend/storages/banyandb.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,6 +255,7 @@ You can define the TopN rules for different metrics. The configuration is define
255255
# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are held in the memory for hour and day-level metrics.
256256

257257
# - sort: Optional, default `all`. The sorting order for the metrics, asc, des or all(include both asc and des).
258+
# - excludes: Optional, default `[]`. The tag values to be excluded from the candidates. If specified, the candidates will not include the entries with the specified tag values.
258259

259260
TopN-Rules:
260261
# endpoint metrics
@@ -314,6 +315,16 @@ TopN-Rules:
314315
groupByTagNames:
315316
- service_id
316317
sort: des
318+
# The following rule can be used to filter out the mesh endpoints.
319+
# You MUST add `attr0!= MESH` to the MQE topN query to hit this rule.
320+
# - name: endpoint_cpm-service
321+
# metricName: endpoint_cpm
322+
# groupByTagNames:
323+
# - service_id
324+
# sort: des
325+
# excludes:
326+
# - tag: attr0
327+
# value: MESH
317328
```
318329

319330
### Installation Modes

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/query/input/AttrCondition.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,13 @@ public class AttrCondition {
2727
private final String key;
2828
private final String value;
2929
private final boolean isEquals;
30+
31+
@Override
32+
public String toString() {
33+
if (isEquals) {
34+
return key + "==" + value;
35+
} else {
36+
return key + "!=" + value;
37+
}
38+
}
3039
}

oap-server/server-starter/src/main/resources/bydb-topn.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
# - lruSizeHourDay: Optional, default `2`. Defines how many time_buckets are held in the memory for hour and day-level metrics.
3131

3232
# - sort: Optional, default `all`. The sorting order for the metrics, asc, des or all(include both asc and des).
33+
# - excludes: Optional, default `[]`. The tag values to be excluded from the candidates. If specified, the candidates will not include the entries with the specified tag values.
3334

3435
TopN-Rules:
3536
# endpoint metrics
@@ -79,4 +80,14 @@ TopN-Rules:
7980
metricName: browser_app_page_error_rate
8081
groupByTagNames:
8182
- service_id
82-
sort: des
83+
sort: des
84+
# The following rule can be used to filter out the mesh endpoints.
85+
# You MUST add `attr0!= MESH` to the MQE topN query to hit this rule.
86+
# - name: endpoint_cpm-service
87+
# metricName: endpoint_cpm
88+
# groupByTagNames:
89+
# - service_id
90+
# sort: des
91+
# excludes:
92+
# - tag: attr0
93+
# value: MESH

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,21 +70,18 @@ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueCol
7070
// The query tags are the additional conditions and attributes defined in the TopN condition.
7171
// The query tags is the key to find the TopN aggregation in the schema.
7272
// If the TopN aggregation is defined in the schema, it will be used to perform the query.
73-
// The server-side TopN only support when attribute condition `isEquals == true`.
73+
// The server-side TopN only support the rules config in bydb-topn.yaml.
7474
ImmutableSet.Builder<String> queryTags = ImmutableSet.builder();
75-
boolean equalsQuery = true;
7675
if (condition.getAttributes() != null) {
7776
for (AttrCondition attr : condition.getAttributes()) {
7877
if (!attr.isEquals()) {
79-
equalsQuery = false;
80-
break;
78+
// Not equal attr condition should add full String key!=value
79+
queryTags.add(attr.toString());
80+
} else {
81+
queryTags.add(attr.getKey());
8182
}
82-
queryTags.add(attr.getKey());
8383
}
8484
}
85-
if (!equalsQuery) {
86-
return directMetricsTopN(isColdStage, condition, schema, valueColumnName, spec, getTimestampRange(duration), additionalConditions);
87-
}
8885
if (additionalConditions != null) {
8986
additionalConditions.forEach(additionalCondition -> queryTags.add(additionalCondition.getKey()));
9087
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBConfigLoader.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Properties;
2727
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
2829
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
2930
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
3031
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
@@ -236,6 +237,19 @@ private void loadTopNConfig() throws ModuleStartException {
236237
if (sort != null) {
237238
topN.setSort(TopN.Sort.valueOf(sort.toString()));
238239
}
240+
var excludes = rule.get("excludes");
241+
if (excludes != null) {
242+
for (Map<String, String> tag : (List<Map<String, String>>) excludes) {
243+
var tagName = tag.get("tag");
244+
var tagValue = tag.get("value");
245+
if (tagName == null || tagValue == null) {
246+
throw new ModuleStartException(
247+
"TopN rule name: " + name + ", [tag] or [value] is missing in [excludes] item in file [bydb-topn.yml].");
248+
}
249+
topN.getExcludes().add(new KeyValue(tag.get("tag"), tag.get("value")));
250+
}
251+
}
252+
239253
Map<String, TopN> map = config.getTopNConfigs().computeIfAbsent(metricName.toString(), k -> new HashMap<>());
240254
if (map.put(name.toString(), topN) != null) {
241255
throw new ModuleStartException("Duplicate TopN rule name: " + name + " in file [bydb-topn.yml].");

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ private void checkMeasure(Measure measure, BanyanDBClient client) throws BanyanD
475475
.build()
476476
.equals(measure.toBuilder().clearMetadata().build());
477477
if (!equals) {
478+
// banyanDB server can not delete or update Tags.
478479
client.update(measure);
479480
log.info("update Measure: {} from: {} to: {}", hisMeasure.getMetadata().getName(), hisMeasure, measure);
480481
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,15 @@
2222
import com.google.common.collect.Iterables;
2323
import java.util.ArrayList;
2424
import java.util.HashMap;
25+
import java.util.HashSet;
2526
import java.util.List;
2627
import java.util.Map;
28+
import java.util.Set;
2729
import lombok.Data;
2830
import lombok.Getter;
2931
import lombok.Setter;
3032
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
33+
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
3134
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
3235

3336
@Getter
@@ -259,6 +262,8 @@ public static class TopN {
259262
*/
260263
private Sort sort = Sort.all;
261264

265+
private Set<KeyValue> excludes = new HashSet<>();
266+
262267
public enum Sort {
263268
all(BanyandbModel.Sort.SORT_UNSPECIFIED),
264269
des(BanyandbModel.Sort.SORT_DESC),

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java

Lines changed: 42 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
2727
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
2828
import org.apache.skywalking.oap.server.core.CoreModule;
29+
import org.apache.skywalking.oap.server.core.RunningMode;
2930
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
3031
import org.apache.skywalking.oap.server.core.storage.IHistoryDeleteDAO;
3132
import org.apache.skywalking.oap.server.core.storage.StorageBuilderFactory;
@@ -224,49 +225,56 @@ public void start() throws ServiceNotProvidedException, ModuleStartException {
224225

225226
@Override
226227
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {
227-
// Cleanup TopN rules in BanyanDB server that are not configured in the current config.
228+
if (!RunningMode.isNoInitMode()) {
229+
try {
230+
List<BanyandbCommon.Group> groups = this.client.client.findGroups();
231+
cleanupUnusedTopNRules(groups);
232+
//todo: can not delete indexRules now, because banyanDB server can not delete or update Tags.
233+
} catch (BanyanDBException e) {
234+
throw new ModuleStartException(e.getMessage(), e);
235+
}
236+
}
237+
}
238+
239+
@Override
240+
public String[] requiredModules() {
241+
return new String[] {CoreModule.NAME};
242+
}
243+
244+
// Cleanup TopN rules in BanyanDB server that are not configured in the current config.
245+
private void cleanupUnusedTopNRules(List<BanyandbCommon.Group> groups) throws BanyanDBException {
228246
Set<String> topNNames = new HashSet<>();
229247
this.config.getTopNConfigs().values().forEach(topNConfig -> {
230248
topNNames.addAll(topNConfig.keySet());
231249
});
232-
233-
try {
234-
List<BanyandbCommon.Group> groups = this.client.client.findGroups();
235-
for (BanyandbCommon.Group group : groups) {
236-
if (BanyandbCommon.Catalog.CATALOG_MEASURE.equals(group.getCatalog())) {
237-
String groupName = group.getMetadata().getName();
238-
List<BanyandbDatabase.TopNAggregation> topNAggregations = this.client.client.findTopNAggregations(groupName);
239-
if (CollectionUtils.isNotEmpty(topNAggregations)) {
240-
for (BanyandbDatabase.TopNAggregation topNAggregation : topNAggregations) {
241-
String topNName = topNAggregation.getMetadata().getName();
242-
if (!topNNames.contains(topNName)) {
243-
if (this.config.getGlobal().isCleanupUnusedTopNRules()) {
244-
this.client.client.deleteTopNAggregation(groupName, topNName);
245-
log.info(
246-
"Deleted unused topN rule from BanyanDB server: {}, group: {}. Please check bydb-topn.yml. " +
247-
"If you don't want to cleanup unused rules from server, please set cleanupUnusedTopNRules=false in bydb.yml",
248-
topNName, groupName
249-
);
250-
} else {
251-
// Log the unused TopN aggregation.
252-
log.warn(
253-
"Unused topN rule in BanyanDB server: {}, group: {}. Please check bydb-topn.yml. " +
254-
"If you want to cleanup unused rules from server, please set cleanupUnusedTopNRules=true in bydb.yml",
255-
topNName, groupName
256-
);
257-
}
250+
for (BanyandbCommon.Group group : groups) {
251+
if (BanyandbCommon.Catalog.CATALOG_MEASURE.equals(group.getCatalog())) {
252+
String groupName = group.getMetadata().getName();
253+
List<BanyandbDatabase.TopNAggregation> topNAggregations = this.client.client.findTopNAggregations(
254+
groupName);
255+
if (CollectionUtils.isNotEmpty(topNAggregations)) {
256+
for (BanyandbDatabase.TopNAggregation topNAggregation : topNAggregations) {
257+
String topNName = topNAggregation.getMetadata().getName();
258+
if (!topNNames.contains(topNName)) {
259+
if (this.config.getGlobal().isCleanupUnusedTopNRules()) {
260+
this.client.client.deleteTopNAggregation(groupName, topNName);
261+
log.info(
262+
"Deleted unused topN rule from BanyanDB server: {}, group: {}. Please check bydb-topn.yml. " +
263+
"If you don't want to cleanup unused rules from server, please set cleanupUnusedTopNRules=false in bydb.yml",
264+
topNName, groupName
265+
);
266+
} else {
267+
// Log the unused TopN aggregation.
268+
log.warn(
269+
"Unused topN rule in BanyanDB server: {}, group: {}. Please check bydb-topn.yml. " +
270+
"If you want to cleanup unused rules from server, please set cleanupUnusedTopNRules=true in bydb.yml",
271+
topNName, groupName
272+
);
258273
}
259274
}
260275
}
261276
}
262277
}
263-
} catch (BanyanDBException e) {
264-
throw new ModuleStartException(e.getMessage(), e);
265278
}
266279
}
267-
268-
@Override
269-
public String[] requiredModules() {
270-
return new String[] {CoreModule.NAME};
271-
}
272280
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import com.google.common.collect.ImmutableSet;
2222
import com.google.gson.JsonObject;
23+
import java.util.HashSet;
24+
import java.util.function.BiFunction;
2325
import lombok.Builder;
2426
import lombok.Data;
2527
import lombok.EqualsAndHashCode;
@@ -43,13 +45,17 @@
4345
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagSpec;
4446
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
4547
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
48+
import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
49+
import org.apache.skywalking.banyandb.v1.client.And;
50+
import org.apache.skywalking.banyandb.v1.client.PairQueryCondition;
4651
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
4752
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
4853
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
4954
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
5055
import org.apache.skywalking.oap.server.core.analysis.record.Record;
5156
import org.apache.skywalking.oap.server.core.config.DownSamplingConfigService;
5257
import org.apache.skywalking.oap.server.core.query.enumeration.Step;
58+
import org.apache.skywalking.oap.server.core.query.type.KeyValue;
5359
import org.apache.skywalking.oap.server.core.storage.StorageException;
5460
import org.apache.skywalking.oap.server.core.storage.annotation.BanyanDB;
5561
import org.apache.skywalking.oap.server.core.storage.annotation.Column;
@@ -217,6 +223,7 @@ private Map<ImmutableSet<String>, TopNAggregation> parseTopNSpecs(final Model mo
217223
Map<ImmutableSet<String>, TopNAggregation> topNAggregations = new HashMap<>();
218224
topNConfig.forEach((name, topN) -> {
219225
ImmutableSet<String> key = ImmutableSet.of();
226+
Set<String> queryConditions = new HashSet<>();
220227
TopNAggregation.Builder topNAggregation = TopNAggregation.newBuilder()
221228
.setMetadata(
222229
Metadata.newBuilder().setGroup(group).setName(name))
@@ -225,7 +232,7 @@ private Map<ImmutableSet<String>, TopNAggregation> parseTopNSpecs(final Model mo
225232
.setFieldName(valueColumnOpt.get().getValueCName())
226233
.setCountersNumber(topN.getCountersNumber());
227234
if (topN.getGroupByTagNames() != null) {
228-
key = ImmutableSet.copyOf(topN.getGroupByTagNames());
235+
queryConditions.addAll(topN.getGroupByTagNames());
229236
//check tags
230237
topN.getGroupByTagNames().forEach(tag -> {
231238
if (!tags.contains(tag)) {
@@ -247,8 +254,28 @@ private Map<ImmutableSet<String>, TopNAggregation> parseTopNSpecs(final Model mo
247254
default:
248255
throw new UnsupportedOperationException("unsupported downsampling: " + model.getDownsampling());
249256
}
257+
258+
if (CollectionUtils.isNotEmpty(topN.getExcludes())) {
259+
AbstractCriteria criteria;
260+
List<AbstractCriteria> conditions = new ArrayList<>(topN.getExcludes().size());
261+
for (KeyValue keyValue : topN.getExcludes()) {
262+
conditions.add(PairQueryCondition.StringQueryCondition.ne(keyValue.getKey(), keyValue.getValue()));
263+
queryConditions.remove(keyValue.getKey());
264+
queryConditions.add(keyValue.getKey() + "!=" + keyValue.getValue());
265+
}
266+
if (conditions.size() == 1) {
267+
criteria = conditions.get(0);
268+
} else {
269+
criteria = conditions.subList(2, conditions.size()).stream().reduce(
270+
And.create(conditions.get(0), conditions.get(1)),
271+
(BiFunction<AbstractCriteria, AbstractCriteria, AbstractCriteria>) And::create,
272+
And::create);
273+
}
274+
topNAggregation.setCriteria(criteria.build());
275+
}
276+
key = ImmutableSet.copyOf(queryConditions);
250277
if (topNAggregations.containsKey(key)) {
251-
throw new IllegalArgumentException("In file [bydb-topn.yml], TopN rule " + topN.getName() + "'s groupByTagNames " + key + " already exist in the same metric " + model.getName());
278+
throw new IllegalArgumentException("In file [bydb-topn.yml], TopN rule " + topN.getName() + "'s groupByTagNames and excludes " + key + " already exist in the same metric " + model.getName());
252279
}
253280
topNAggregations.put(key, topNAggregation.build());
254281
});

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -207,9 +207,9 @@ private TopNQueryResponse topNQuery(boolean isColdStage,
207207
attributes.forEach(attr -> {
208208
if (attr.isEquals()) {
209209
conditions.add(PairQueryCondition.StringQueryCondition.eq(attr.getKey(), attr.getValue()));
210-
} else {
211-
conditions.add(PairQueryCondition.StringQueryCondition.ne(attr.getKey(), attr.getValue()));
212210
}
211+
//server side topn query does not support not equals condition
212+
//the not equals condition should be handled by a specific topN rule by adding exclude condition.
213213
});
214214
}
215215
q.setConditions(conditions);

0 commit comments

Comments
 (0)