Skip to content

Commit 7dccf96

Browse files
authored
BanyanDB: Support @ShardingKey for Measure tags and set to TopNAggregation group tag by default (#13180)
1 parent bcc2a27 commit 7dccf96

File tree

7 files changed

+123
-16
lines changed

7 files changed

+123
-16
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
* PromQL Service: traffic query support `limit` and regex match.
1313
* Fix an edge case of HashCodeSelector(Integer#MIN_VALUE causes ArrayIndexOutOfBoundsException).
1414
* Support Flink monitoring.
15+
* BanyanDB: Support `@ShardingKey` for Measure tags and set to TopNAggregation group tag by default.
1516

1617
#### UI
1718

oap-server/oal-rt/src/main/java/org/apache/skywalking/oal/rt/util/OALClassGenerator.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -203,9 +203,9 @@ private Class generateMetricsClass(AnalysisResult metricsStmt) throws OALCompile
203203
annotationsAttribute.addAnnotation(columnAnnotation);
204204
if (field.isID()) {
205205
// Add SeriesID = 0 annotation to ID field.
206-
Annotation banyanShardingKeyAnnotation = new Annotation(BanyanDB.SeriesID.class.getName(), constPool);
207-
banyanShardingKeyAnnotation.addMemberValue("index", new IntegerMemberValue(constPool, 0));
208-
annotationsAttribute.addAnnotation(banyanShardingKeyAnnotation);
206+
Annotation banyanSeriesIDAnnotation = new Annotation(BanyanDB.SeriesID.class.getName(), constPool);
207+
banyanSeriesIDAnnotation.addMemberValue("index", new IntegerMemberValue(constPool, 0));
208+
annotationsAttribute.addAnnotation(banyanSeriesIDAnnotation);
209209

210210
// Entity id field should enable doc values.
211211
final var enableDocValuesAnnotation = new Annotation(ElasticSearch.EnableDocValues.class.getName(), constPool);
@@ -215,6 +215,10 @@ private Class generateMetricsClass(AnalysisResult metricsStmt) throws OALCompile
215215
if (field.isGroupByCondInTopN()) {
216216
Annotation banyanTopNAggregationAnnotation = new Annotation(BanyanDB.TopNAggregation.class.getName(), constPool);
217217
annotationsAttribute.addAnnotation(banyanTopNAggregationAnnotation);
218+
// If TopN, add ShardingKey to group field.
219+
Annotation banyanShardingKeyAnnotation = new Annotation(BanyanDB.ShardingKey.class.getName(), constPool);
220+
banyanShardingKeyAnnotation.addMemberValue("index", new IntegerMemberValue(constPool, 0));
221+
annotationsAttribute.addAnnotation(banyanShardingKeyAnnotation);
218222
}
219223

220224
newField.getFieldInfo().addAttribute(annotationsAttribute);

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/annotation/BanyanDB.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,30 @@
7777
int index() default -1;
7878
}
7979

80+
/**
81+
* ShardingKey is used to group time series data per metric in one place. Optional. Only support Measure Tag.
82+
* If ShardingKey is not set, the default ShardingKey is based on the combination of 'name' and 'entity' according to the {@link SeriesID}.
83+
* <p>
84+
* The typical scenario to specify the ShardingKey to the Group tag when the metric generate a TopNAggregation:
85+
* If not set, the default data distribution based on the combination of 'name' and 'entity', can lead to performance issues when calculating the 'TopNAggregation'.
86+
* This is because each shard only has a subset of the top-n list, and the query process has to be responsible for aggregating those lists to obtain the final result.
87+
* This introduces overhead in terms of querying performance and disk usage.
88+
*
89+
* @since 10.3.0
90+
*/
91+
@Target({ElementType.FIELD})
92+
@Retention(RetentionPolicy.RUNTIME)
93+
@interface ShardingKey {
94+
/**
95+
* Relative sharding tag
96+
* <p>
97+
* The index number determines the order of the column placed in the ShardingKey.
98+
*
99+
* @return non-negative if this column be used for sharding. -1 means not as a sharding key
100+
*/
101+
int index() default -1;
102+
}
103+
80104
/**
81105
* Force disabling indexing declare through {@link Column}.
82106
* In BanyanDB, some additional conditions could be done in server memory, no indexing required in this case.

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/BanyanDBExtension.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ public class BanyanDBExtension {
3939
@Getter
4040
private final int seriesIDIdx;
4141

42+
@Getter
43+
private final int shardingKeyIdx;
44+
4245
/**
4346
* {@link BanyanDB.NoIndexing} exists to override {@link ModelColumn#shouldIndex()}, or be the same as {@link
4447
* ModelColumn#shouldIndex()}.
@@ -82,6 +85,13 @@ public boolean isSeriesID() {
8285
return this.seriesIDIdx > -1;
8386
}
8487

88+
/**
89+
* @return true if this column is a part of sharding key
90+
*/
91+
public boolean isShardingKey() {
92+
return this.shardingKeyIdx > -1;
93+
}
94+
8595
/**
8696
* @return true if this column should be indexing in the BanyanDB
8797
*/

oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/storage/model/StorageModels.java

Lines changed: 57 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,15 @@ public Model add(Class<?> aClass, int scopeId, Storage storage) throws StorageEx
5858
DefaultScopeDefine.nameOf(scopeId);
5959

6060
List<ModelColumn> modelColumns = new ArrayList<>();
61-
SeriesIDChecker checker = new SeriesIDChecker();
61+
SeriesIDChecker seriesIDChecker = new SeriesIDChecker();
62+
ShardingKeyChecker shardingKeyChecker = new ShardingKeyChecker();
6263
SQLDatabaseModelExtension sqlDBModelExtension = new SQLDatabaseModelExtension();
6364
BanyanDBModelExtension banyanDBModelExtension = new BanyanDBModelExtension();
6465
ElasticSearchModelExtension elasticSearchModelExtension = new ElasticSearchModelExtension();
65-
retrieval(aClass, storage.getModelName(), modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
66+
retrieval(
67+
aClass, storage.getModelName(), modelColumns, scopeId, seriesIDChecker, shardingKeyChecker, sqlDBModelExtension,
68+
banyanDBModelExtension
69+
);
6670
// Add extra column for additional entities
6771
if (aClass.isAnnotationPresent(SQLDatabase.ExtraColumn4AdditionalEntity.class)
6872
|| aClass.isAnnotationPresent(SQLDatabase.MultipleExtraColumn4AdditionalEntity.class)) {
@@ -106,7 +110,8 @@ public Model add(Class<?> aClass, int scopeId, Storage storage) throws StorageEx
106110
// Set routing rules for ElasticSearch
107111
elasticSearchModelExtension.setRouting(storage.getModelName(), modelColumns);
108112

109-
checker.check(storage.getModelName());
113+
seriesIDChecker.check(storage.getModelName());
114+
shardingKeyChecker.check(storage.getModelName());
110115

111116
Model model = new Model(
112117
storage.getModelName(),
@@ -153,7 +158,8 @@ private void retrieval(final Class<?> clazz,
153158
final String modelName,
154159
final List<ModelColumn> modelColumns,
155160
final int scopeId,
156-
SeriesIDChecker checker,
161+
SeriesIDChecker seriesIDChecker,
162+
ShardingKeyChecker shardingKeyChecker,
157163
final SQLDatabaseModelExtension sqlDBModelExtension,
158164
final BanyanDBModelExtension banyanDBModelExtension) {
159165
if (log.isDebugEnabled()) {
@@ -212,6 +218,8 @@ private void retrieval(final Class<?> clazz,
212218
// BanyanDB extension
213219
final BanyanDB.SeriesID banyanDBSeriesID = field.getAnnotation(
214220
BanyanDB.SeriesID.class);
221+
final BanyanDB.ShardingKey banyanDBShardingKey = field.getAnnotation(
222+
BanyanDB.ShardingKey.class);
215223
final BanyanDB.NoIndexing banyanDBNoIndex = field.getAnnotation(
216224
BanyanDB.NoIndexing.class);
217225
final BanyanDB.IndexRule banyanDBIndexRule = field.getAnnotation(
@@ -227,6 +235,7 @@ private void retrieval(final Class<?> clazz,
227235
final boolean shouldIndex = (banyanDBNoIndex == null) && !column.storageOnly();
228236
BanyanDBExtension banyanDBExtension = new BanyanDBExtension(
229237
banyanDBSeriesID == null ? -1 : banyanDBSeriesID.index(),
238+
banyanDBShardingKey == null ? -1 : banyanDBShardingKey.index(),
230239
shouldIndex,
231240
banyanDBIndexRule == null ? BanyanDB.IndexRule.IndexType.INVERTED : banyanDBIndexRule.indexType(),
232241
banyanDBMeasureField != null,
@@ -255,7 +264,10 @@ private void retrieval(final Class<?> clazz,
255264
banyanDBExtension
256265
);
257266
if (banyanDBExtension.isSeriesID()) {
258-
checker.accept(modelName, modelColumn);
267+
seriesIDChecker.accept(modelName, modelColumn);
268+
}
269+
if (banyanDBExtension.isShardingKey()) {
270+
shardingKeyChecker.accept(modelName, modelColumn);
259271
}
260272

261273
if (field.isAnnotationPresent(SQLDatabase.AdditionalEntity.class)) {
@@ -282,7 +294,10 @@ private void retrieval(final Class<?> clazz,
282294
}
283295

284296
if (Objects.nonNull(clazz.getSuperclass())) {
285-
retrieval(clazz.getSuperclass(), modelName, modelColumns, scopeId, checker, sqlDBModelExtension, banyanDBModelExtension);
297+
retrieval(
298+
clazz.getSuperclass(), modelName, modelColumns, scopeId, seriesIDChecker, shardingKeyChecker,
299+
sqlDBModelExtension, banyanDBModelExtension
300+
);
286301
}
287302
}
288303

@@ -359,4 +374,40 @@ private void check(String modelName) throws IllegalStateException {
359374
}
360375
}
361376
}
377+
378+
private static class ShardingKeyChecker {
379+
private final ArrayList<ModelColumn> keys = new ArrayList<>();
380+
381+
/**
382+
* @throws IllegalStateException if sharding key indices are conflicting.
383+
*/
384+
private void accept(String modelName, ModelColumn modelColumn) throws IllegalStateException {
385+
final int idx = modelColumn.getBanyanDBExtension().getShardingKeyIdx();
386+
while (idx + 1 > keys.size()) {
387+
keys.add(null);
388+
}
389+
ModelColumn exist = keys.get(idx);
390+
if (exist != null) {
391+
throw new IllegalStateException(
392+
modelName + "'s "
393+
+ "Column [" + exist.getColumnName() + "] and column [" + modelColumn.getColumnName()
394+
+ " are conflicting with sharding key index=" + modelColumn.getBanyanDBExtension()
395+
.getShardingKeyIdx());
396+
}
397+
keys.set(idx, modelColumn);
398+
}
399+
400+
/**
401+
* @param modelName model name of the entity
402+
* @throws IllegalStateException if sharding key indices are not continuous
403+
*/
404+
private void check(String modelName) throws IllegalStateException {
405+
for (int i = 0; i < keys.size(); i++) {
406+
final ModelColumn modelColumn = keys.get(i);
407+
if (modelColumn == null) {
408+
throw new IllegalStateException("sharding key index=" + i + " is missing in " + modelName);
409+
}
410+
}
411+
}
412+
}
362413
}

oap-server/server-core/src/test/java/org/apache/skywalking/oap/server/core/storage/model/ModelColumnTest.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public void testColumnDefine() {
5050
new ElasticSearchExtension(
5151
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, null, false, false, true),
5252
new BanyanDBExtension(
53-
-1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
53+
-1, -1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
5454
BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true
5555
)
5656
);
@@ -62,7 +62,7 @@ public void testColumnDefine() {
6262
new SQLDatabaseExtension(),
6363
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, null, false, false, true),
6464
new BanyanDBExtension(
65-
-1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
65+
-1, -1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
6666
BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true
6767
)
6868
);
@@ -74,7 +74,7 @@ public void testColumnDefine() {
7474
false, false, true, 200,
7575
new SQLDatabaseExtension(),
7676
new ElasticSearchExtension(ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, null, false, false, true),
77-
new BanyanDBExtension(-1, true, BanyanDB.IndexRule.IndexType.INVERTED, false, BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true)
77+
new BanyanDBExtension(-1, -1, true, BanyanDB.IndexRule.IndexType.INVERTED, false, BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true)
7878
);
7979
Assertions.assertFalse(column.isStorageOnly());
8080
Assertions.assertEquals("abc", column.getColumnName().getName());
@@ -89,7 +89,7 @@ public void testConflictDefinition() {
8989
new ElasticSearchExtension(
9090
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false, true),
9191
new BanyanDBExtension(
92-
-1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
92+
-1, -1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
9393
BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true
9494
)
9595
);
@@ -104,7 +104,7 @@ public void testConflictDefinitionIndexOnly() {
104104
new SQLDatabaseExtension(),
105105
new ElasticSearchExtension(
106106
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false, true),
107-
new BanyanDBExtension(-1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
107+
new BanyanDBExtension(-1, -1, true, BanyanDB.IndexRule.IndexType.INVERTED, false,
108108
BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true
109109
)
110110
);
@@ -120,7 +120,7 @@ public void testConflictDefinitionStorageOnly() {
120120
new ElasticSearchExtension(
121121
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false, true),
122122
new BanyanDBExtension(
123-
-1, false, BanyanDB.IndexRule.IndexType.INVERTED, false,
123+
-1, -1, false, BanyanDB.IndexRule.IndexType.INVERTED, false,
124124
BanyanDB.MatchQuery.AnalyzerType.SIMPLE, true
125125
)
126126
);
@@ -136,7 +136,7 @@ public void testConflictDefinitionEnableSort() {
136136
new ElasticSearchExtension(
137137
ElasticSearch.MatchQuery.AnalyzerType.OAP_ANALYZER, "abc", false, false, true),
138138
new BanyanDBExtension(
139-
-1, false, BanyanDB.IndexRule.IndexType.INVERTED, true,
139+
-1, -1, false, BanyanDB.IndexRule.IndexType.INVERTED, true,
140140
null, true
141141
)
142142
);

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
131131
.collect(Collectors.toMap(modelColumn -> modelColumn.getColumnName().getStorageName(), Function.identity()));
132132
// parse and set seriesIDs
133133
List<String> seriesIDColumns = parseEntityNames(modelColumnMap);
134+
List<String> shardingKeyColumns = parseShardingKeyNames(modelColumnMap);
134135
if (seriesIDColumns.isEmpty()) {
135136
throw new StorageException("model " + model.getName() + " doesn't contain series id");
136137
}
@@ -160,6 +161,9 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
160161
.setName(schemaMetadata.name()));
161162
builder.setInterval(downSamplingDuration(model.getDownsampling()).format());
162163
builder.setEntity(BanyandbDatabase.Entity.newBuilder().addAllTagNames(seriesIDColumns));
164+
if (CollectionUtils.isNotEmpty(shardingKeyColumns)) {
165+
builder.setShardingKey(BanyandbDatabase.ShardingKey.newBuilder().addAllTagNames(shardingKeyColumns));
166+
}
163167
builder.addAllTagFamilies(tagFamilySpecs);
164168
if (model.getBanyanDBModelExtension().isIndexMode()) {
165169
builder.setIndexMode(true);
@@ -351,6 +355,19 @@ List<String> parseEntityNames(Map<String, ModelColumn> modelColumnMap) {
351355
.collect(Collectors.toList());
352356
}
353357

358+
List<String> parseShardingKeyNames(Map<String, ModelColumn> modelColumnMap) {
359+
List<ModelColumn> shardingKeyColumns = new ArrayList<>();
360+
for (final ModelColumn col : modelColumnMap.values()) {
361+
if (col.getBanyanDBExtension().isShardingKey()) {
362+
shardingKeyColumns.add(col);
363+
}
364+
}
365+
return shardingKeyColumns.stream()
366+
.sorted(Comparator.comparingInt(col -> col.getBanyanDBExtension().getShardingKeyIdx()))
367+
.map(col -> col.getColumnName().getName())
368+
.collect(Collectors.toList());
369+
}
370+
354371
/**
355372
* Parse tags' metadata for {@link Stream}
356373
* Every field of a class is registered as a {@link org.apache.skywalking.banyandb.model.v1.BanyandbModel.Tag}

0 commit comments

Comments
 (0)