Skip to content

Commit eb4d38c

Browse files
authored
BanyanDB: Support update the Schema when OAP starting. (#12808)
1 parent f6a821a commit eb4d38c

File tree

9 files changed

+462
-218
lines changed

9 files changed

+462
-218
lines changed

.github/workflows/skywalking.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -436,7 +436,7 @@ jobs:
436436
- name: Log ES 7.17.10
437437
config: test/e2e-v2/cases/log/es/e2e.yaml
438438
env: ES_VERSION=7.17.10
439-
- name: Log ES 8.8.1 Shardng
439+
- name: Log ES 8.8.1 Sharding
440440
config: test/e2e-v2/cases/log/es/es-sharding/e2e.yaml
441441
env: ES_VERSION=8.8.1
442442
- name: Log BanyanDB

docs/en/changes/changes.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@
3030
* Bump up netty to 4.1.115, grpc to 1.68.1, boringssl to 2.0.69.
3131
* BanyanDB: Support update the Group settings when OAP starting.
3232
* BanyanDB: Introduce index mode and refactor banyandb group settings.
33+
* BanyanDB: Support update the Schema when OAP starting.
34+
* BanyanDB: Speed up OAP booting while initializing BanyanDB.
3335

3436
#### UI
3537

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBAggregationQueryDAO.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public List<SelectedRecord> sortMetrics(TopNCondition condition, String valueCol
7070
// 2) additional conditions are all group by tags
7171
if (CollectionUtils.isEmpty(additionalConditions) ||
7272
additionalConditions.stream().map(KeyValue::getKey).collect(Collectors.toSet())
73-
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNames()))) {
73+
.equals(ImmutableSet.copyOf(schema.getTopNSpec().getGroupByTagNamesList()))) {
7474
return serverSideTopN(condition, schema, spec, timestampRange, additionalConditions);
7575
}
7676
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBIndexInstaller.java

Lines changed: 423 additions & 11 deletions
Large diffs are not rendered by default.

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/MetadataRegistry.java

Lines changed: 17 additions & 192 deletions
Original file line numberDiff line numberDiff line change
@@ -18,25 +18,17 @@
1818

1919
package org.apache.skywalking.oap.server.storage.plugin.banyandb;
2020

21-
import com.fasterxml.jackson.databind.ObjectMapper;
2221
import com.google.gson.JsonObject;
23-
import io.grpc.Status;
2422
import lombok.Builder;
2523
import lombok.Data;
2624
import lombok.EqualsAndHashCode;
2725
import lombok.Getter;
28-
import lombok.NoArgsConstructor;
2926
import lombok.RequiredArgsConstructor;
30-
import lombok.Setter;
3127
import lombok.Singular;
3228
import lombok.ToString;
3329
import lombok.extern.slf4j.Slf4j;
3430
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
35-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Catalog;
36-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Group;
37-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.IntervalRule;
3831
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.Metadata;
39-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon.ResourceOpts;
4032
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase;
4133
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.CompressionMethod;
4234
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.EncodingMethod;
@@ -50,11 +42,7 @@
5042
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TagType;
5143
import org.apache.skywalking.banyandb.database.v1.BanyandbDatabase.TopNAggregation;
5244
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
53-
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
54-
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
5545
import org.apache.skywalking.banyandb.v1.client.metadata.Duration;
56-
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
57-
import org.apache.skywalking.banyandb.v1.client.metadata.ResourceExist;
5846
import org.apache.skywalking.oap.server.core.analysis.DownSampling;
5947
import org.apache.skywalking.oap.server.core.analysis.metrics.IntList;
6048
import org.apache.skywalking.oap.server.core.analysis.metrics.Metrics;
@@ -77,7 +65,6 @@
7765
import java.util.ArrayList;
7866
import java.util.Comparator;
7967
import java.util.HashMap;
80-
import java.util.HashSet;
8168
import java.util.List;
8269
import java.util.Map;
8370
import java.util.Objects;
@@ -90,9 +77,6 @@
9077
public enum MetadataRegistry {
9178
INSTANCE;
9279

93-
private static final ObjectMapper MAPPER = new ObjectMapper();
94-
// BanyanDB group setting aligned with the OAP settings
95-
private static final Set<String> GROUP_ALIGNED = new HashSet<>();
9680
private final Map<String, Schema> registry = new HashMap<>();
9781

9882
public StreamModel registerStreamModel(Model model, BanyanDBStorageConfig config, DownSamplingConfigService configService) {
@@ -187,13 +171,13 @@ public MeasureModel registerMeasureModel(Model model, BanyanDBStorageConfig conf
187171
schemaBuilder.field(field.getName());
188172
}
189173
// parse TopN
190-
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.name()));
174+
schemaBuilder.topNSpec(parseTopNSpec(model, schemaMetadata.group, schemaMetadata.name()));
191175

192176
registry.put(schemaMetadata.name(), schemaBuilder.build());
193177
return new MeasureModel(builder.build(), indexRules);
194178
}
195179

196-
private TopNSpec parseTopNSpec(final Model model, final String measureName)
180+
private TopNAggregation parseTopNSpec(final Model model, final String group, final String measureName)
197181
throws StorageException {
198182
if (model.getBanyanDBModelExtension().getTopN() == null) {
199183
return null;
@@ -208,14 +192,16 @@ private TopNSpec parseTopNSpec(final Model model, final String measureName)
208192
if (CollectionUtils.isEmpty(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())) {
209193
throw new StorageException("invalid groupBy tags: " + model.getBanyanDBModelExtension().getTopN().getGroupByTagNames());
210194
}
211-
return TopNSpec.builder()
212-
.name(measureName + "_topn")
213-
.lruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
214-
.countersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
215-
.fieldName(valueColumnOpt.get().getValueCName())
216-
.groupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
217-
.sort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
218-
.build();
195+
return TopNAggregation.newBuilder()
196+
.setMetadata(
197+
Metadata.newBuilder().setGroup(group).setName(Schema.formatTopNName(measureName)))
198+
.setSourceMeasure(Metadata.newBuilder().setGroup(group).setName(measureName))
199+
.setFieldValueSort(BanyandbModel.Sort.SORT_UNSPECIFIED) // include both TopN and BottomN
200+
.setFieldName(valueColumnOpt.get().getValueCName())
201+
.addAllGroupByTagNames(model.getBanyanDBModelExtension().getTopN().getGroupByTagNames())
202+
.setCountersNumber(model.getBanyanDBModelExtension().getTopN().getCountersNumber())
203+
.setLruSize(model.getBanyanDBModelExtension().getTopN().getLruSize())
204+
.build();
219205
}
220206

221207
public Schema findMetadata(final Model model) {
@@ -306,8 +292,8 @@ Duration downSamplingDuration(DownSampling downSampling) {
306292

307293
IndexRule indexRule(String group, String tagName, BanyanDB.MatchQuery.AnalyzerType analyzer) {
308294
IndexRule.Builder builder = IndexRule.newBuilder()
309-
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
310-
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
295+
.setMetadata(Metadata.newBuilder().setName(tagName).setGroup(group))
296+
.setType(IndexRule.Type.TYPE_INVERTED).addTags(tagName);
311297
if (analyzer != null) {
312298
switch (analyzer) {
313299
case KEYWORD:
@@ -553,36 +539,6 @@ static String formatName(String modelName, DownSampling downSampling) {
553539
return modelName + "_" + downSampling.getName();
554540
}
555541

556-
public Optional<Object> findRemoteSchema(BanyanDBClient client) throws BanyanDBException {
557-
try {
558-
switch (kind) {
559-
case STREAM:
560-
return Optional.ofNullable(client.findStream(this.group, this.name()));
561-
case MEASURE:
562-
return Optional.ofNullable(client.findMeasure(this.group, this.name()));
563-
default:
564-
throw new IllegalStateException("should not reach here");
565-
}
566-
} catch (BanyanDBException ex) {
567-
if (ex.getStatus().equals(Status.Code.NOT_FOUND)) {
568-
return Optional.empty();
569-
}
570-
571-
throw ex;
572-
}
573-
}
574-
575-
public MetadataCache.EntityMetadata updateRemoteSchema(BanyanDBClient client) throws BanyanDBException {
576-
switch (kind) {
577-
case STREAM:
578-
return client.updateStreamMetadataCacheFromSever(this.group, this.name());
579-
case MEASURE:
580-
return client.updateMeasureMetadataCacheFromSever(this.group, this.name());
581-
default:
582-
throw new IllegalStateException("should not reach here");
583-
}
584-
}
585-
586542
private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataList, boolean shouldAddID) {
587543
final String indexFamily = SchemaMetadata.this.indexFamily();
588544
final String nonIndexFamily = SchemaMetadata.this.nonIndexFamily();
@@ -603,95 +559,6 @@ private List<TagFamilySpec> extractTagFamilySpec(List<TagMetadata> tagMetadataLi
603559
return tagFamilySpecs;
604560
}
605561

606-
/**
607-
* Check if the group settings need to be updated
608-
*/
609-
private boolean checkGroupUpdate(BanyanDBClient client) throws BanyanDBException {
610-
Group g = client.findGroup(this.group);
611-
return g.getResourceOpts().getShardNum() != this.shard
612-
|| g.getResourceOpts().getSegmentInterval().getNum() != this.segmentIntervalDays
613-
|| g.getResourceOpts().getTtl().getNum() != this.ttlDays;
614-
}
615-
616-
public boolean checkResourceExistence(BanyanDBClient client) throws BanyanDBException {
617-
ResourceExist resourceExist;
618-
Group.Builder gBuilder
619-
= Group.newBuilder()
620-
.setMetadata(Metadata.newBuilder().setName(this.group))
621-
.setResourceOpts(ResourceOpts.newBuilder()
622-
.setShardNum(this.shard)
623-
.setSegmentInterval(
624-
IntervalRule.newBuilder()
625-
.setUnit(
626-
IntervalRule.Unit.UNIT_DAY)
627-
.setNum(
628-
this.segmentIntervalDays))
629-
.setTtl(
630-
IntervalRule.newBuilder()
631-
.setUnit(
632-
IntervalRule.Unit.UNIT_DAY)
633-
.setNum(
634-
this.ttlDays)));
635-
switch (kind) {
636-
case STREAM:
637-
resourceExist = client.existStream(this.group, this.name());
638-
gBuilder.setCatalog(Catalog.CATALOG_STREAM).build();
639-
if (!GROUP_ALIGNED.contains(this.group)) {
640-
// create the group if not exist
641-
if (!resourceExist.hasGroup()) {
642-
try {
643-
Group g = client.define(gBuilder.build());
644-
if (g != null) {
645-
log.info("group {} created", g.getMetadata().getName());
646-
}
647-
} catch (BanyanDBException ex) {
648-
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
649-
log.info("group {} already created by another OAP node", this.group);
650-
} else {
651-
throw ex;
652-
}
653-
}
654-
} else {
655-
// update the group if necessary
656-
if (this.checkGroupUpdate(client)) {
657-
client.update(gBuilder.build());
658-
log.info("group {} updated", this.group);
659-
}
660-
}
661-
// mark the group as aligned
662-
GROUP_ALIGNED.add(this.group);
663-
}
664-
return resourceExist.hasResource();
665-
case MEASURE:
666-
resourceExist = client.existMeasure(this.group, this.name());
667-
gBuilder.setCatalog(Catalog.CATALOG_MEASURE).build();
668-
if (!GROUP_ALIGNED.contains(this.group)) {
669-
if (!resourceExist.hasGroup()) {
670-
try {
671-
Group g = client.define(gBuilder.build());
672-
if (g != null) {
673-
log.info("group {} created", g.getMetadata().getName());
674-
}
675-
} catch (BanyanDBException ex) {
676-
if (ex.getStatus().equals(Status.Code.ALREADY_EXISTS)) {
677-
log.info("group {} already created by another OAP node", this.group);
678-
} else {
679-
throw ex;
680-
}
681-
}
682-
} else {
683-
if (this.checkGroupUpdate(client)) {
684-
client.update(gBuilder.build());
685-
log.info("group {} updated", this.group);
686-
}
687-
}
688-
}
689-
return resourceExist.hasResource();
690-
default:
691-
throw new IllegalStateException("should not reach here");
692-
}
693-
}
694-
695562
/**
696563
* @return name of the Stream/Measure in the BanyanDB
697564
*/
@@ -761,52 +628,17 @@ public static class Schema {
761628

762629
@Getter
763630
@Nullable
764-
private final TopNSpec topNSpec;
631+
private final TopNAggregation topNSpec;
765632

766633
public ColumnSpec getSpec(String columnName) {
767634
return this.specs.get(columnName);
768635
}
769636

770-
public void installTopNAggregation(BanyanDBClient client) throws BanyanDBException {
771-
if (this.getTopNSpec() == null) {
772-
if (this.metadata.kind == Kind.MEASURE) {
773-
log.debug("skip null TopN Schema for [{}]", metadata.getModelName());
774-
}
775-
return;
776-
}
777-
TopNAggregation.Builder builder
778-
= TopNAggregation.newBuilder()
779-
.setMetadata(Metadata.newBuilder()
780-
.setGroup(getMetadata().getGroup())
781-
.setName(this.getTopNSpec().getName()))
782-
783-
.setSourceMeasure(Metadata.newBuilder()
784-
.setGroup(getMetadata().getGroup())
785-
.setName(getMetadata().name()))
786-
.setFieldValueSort(this.getTopNSpec().getSort())
787-
.setFieldName(this.getTopNSpec().getFieldName())
788-
.addAllGroupByTagNames(this.getTopNSpec().getGroupByTagNames())
789-
.setCountersNumber(this.getTopNSpec().getCountersNumber())
790-
.setLruSize(this.getTopNSpec().getLruSize());
791-
client.define(builder.build());
792-
log.info("installed TopN schema for measure {}", getMetadata().name());
637+
public static String formatTopNName(String measureName) {
638+
return measureName + "_topn";
793639
}
794640
}
795641

796-
@Builder
797-
@EqualsAndHashCode
798-
@Getter
799-
@ToString
800-
public static class TopNSpec {
801-
private final String name;
802-
@Singular
803-
private final List<String> groupByTagNames;
804-
private final String fieldName;
805-
private final BanyandbModel.Sort sort;
806-
private final int lruSize;
807-
private final int countersNumber;
808-
}
809-
810642
@RequiredArgsConstructor
811643
@Getter
812644
@ToString
@@ -818,11 +650,4 @@ public static class ColumnSpec {
818650
public enum ColumnType {
819651
TAG, FIELD;
820652
}
821-
822-
@Getter
823-
@Setter
824-
@NoArgsConstructor
825-
public static class GroupSetting {
826-
private int segmentIntervalDays;
827-
}
828653
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEBPFProfilingScheduleQueryDAO.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,13 +55,14 @@ public List<EBPFProfilingSchedule> querySchedules(String taskId) throws IOExcept
5555
MetadataRegistry.Schema schema = MetadataRegistry.INSTANCE.findMetadata(EBPFProfilingScheduleRecord.INDEX_NAME, DownSampling.Minute);
5656
MeasureQueryResponse resp = query(schema,
5757
TAGS,
58-
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
59-
@Override
60-
protected void apply(MeasureQuery query) {
61-
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
62-
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
63-
}
64-
});
58+
Collections.emptySet(), new QueryBuilder<MeasureQuery>() {
59+
@Override
60+
protected void apply(MeasureQuery query) {
61+
query.and(eq(EBPFProfilingScheduleRecord.TASK_ID, taskId));
62+
query.setOrderBy(new AbstractQuery.OrderBy(EBPFProfilingScheduleRecord.START_TIME, AbstractQuery.Sort.DESC));
63+
}
64+
}
65+
);
6566

6667
return resp.getDataPoints().stream().map(this::buildEBPFProfilingSchedule).collect(Collectors.toList());
6768
}

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/measure/BanyanDBEventQueryDAO.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,9 +120,11 @@ protected void apply(MeasureQuery query) {
120120
query.limit(page.getLimit());
121121
query.offset(page.getFrom());
122122
if (queryOrder == Order.ASC) {
123-
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
123+
query.setOrderBy(
124+
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.ASC));
124125
} else {
125-
query.setOrderBy(new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
126+
query.setOrderBy(
127+
new AbstractQuery.OrderBy(Event.START_TIME, AbstractQuery.Sort.DESC));
126128
}
127129
for (final EventQueryCondition condition : conditionList) {
128130
List<PairQueryCondition<?>> queryConditions = new ArrayList<>();

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/stream/AbstractBanyanDBDAO.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.skywalking.oap.server.storage.plugin.banyandb.stream;
2020

2121
import com.google.gson.Gson;
22+
import java.util.Objects;
2223
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
2324
import org.apache.skywalking.banyandb.v1.client.AbstractCriteria;
2425
import org.apache.skywalking.banyandb.v1.client.AbstractQuery;
@@ -183,9 +184,10 @@ private TopNQueryResponse topNQuery(MetadataRegistry.Schema schema,
183184
AbstractQuery.Sort sort,
184185
List<KeyValue> additionalConditions,
185186
List<AttrCondition> attributes) throws IOException {
186-
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), schema.getTopNSpec().getName(),
187-
timestampRange,
188-
number, sort);
187+
final TopNQuery q = new TopNQuery(List.of(schema.getMetadata().getGroup()), Objects.requireNonNull(
188+
schema.getTopNSpec()).getMetadata().getName(),
189+
timestampRange,
190+
number, sort);
189191
q.setAggregationType(MeasureQuery.Aggregation.Type.MEAN);
190192
List<PairQueryCondition<?>> conditions = new ArrayList<>();
191193
if (CollectionUtils.isNotEmpty(additionalConditions)) {

0 commit comments

Comments
 (0)