Skip to content

Commit d98efb4

Browse files
authored
Support multi groups (#54)
1 parent e6f2dbe commit d98efb4

File tree

16 files changed

+108
-87
lines changed

16 files changed

+108
-87
lines changed

.github/workflows/ci.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,12 @@ jobs:
7171
distribution: 'temurin'
7272
java-version: ${{ matrix.version }}
7373
cache: 'maven'
74+
- name: Login to ghcr
75+
uses: docker/login-action@v1
76+
with:
77+
registry: ghcr.io
78+
username: ${{ github.repository_owner }}
79+
password: ${{ secrets.GITHUB_TOKEN }}
7480
- name: Build and Test
7581
run: ./mvnw -q clean verify install
7682

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ For example,
118118
Instant end = Instant.now();
119119
Instant begin = end.minus(15, ChronoUnit.MINUTES);
120120
// with stream schema, group=default, name=sw
121-
StreamQuery query = new StreamQuery("default", "sw",
121+
StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
122122
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
123123
// projection tags which are indexed
124124
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
@@ -154,7 +154,7 @@ For `Measure`, it is similar to the `Stream`,
154154
Instant end = Instant.now();
155155
Instant begin = end.minus(15, ChronoUnit.MINUTES);
156156
// with stream schema, group=sw_metrics, name=service_instance_cpm_day
157-
MeasureQuery query = new MeasureQuery("sw_metrics", "service_instance_cpm_day",
157+
MeasureQuery query = new MeasureQuery(Lists.newArrayList("sw_metrics"), "service_instance_cpm_day",
158158
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
159159
ImmutableSet.of("id", "scope", "service_id"),
160160
ImmutableSet.of("total"));

pom.xml

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -82,17 +82,15 @@
8282
<!-- core lib dependency -->
8383
<bytebuddy.version>1.10.19</bytebuddy.version>
8484
<!-- grpc version should align with the Skywalking main repo -->
85-
<grpc.version>1.46.0</grpc.version>
86-
<protobuf.version>3.19.2</protobuf.version>
87-
<protoc.version>3.19.2</protoc.version>
88-
<gson.version>2.8.6</gson.version>
89-
<os-maven-plugin.version>1.6.2</os-maven-plugin.version>
85+
<grpc.version>1.63.0</grpc.version>
86+
<protoc.version>3.25.3</protoc.version>
87+
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>
9088
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
91-
<netty.tcnative.version>2.0.51.Final</netty.tcnative.version>
89+
<netty.tcnative.version>2.0.65.Final</netty.tcnative.version>
9290
<javax.annotation-api.version>1.3.2</javax.annotation-api.version>
93-
<auto-value.version>1.9</auto-value.version>
94-
<testcontainers.version>1.16.3</testcontainers.version>
95-
<awaitility.version>4.2.0</awaitility.version>
91+
<auto-value.version>1.10.4</auto-value.version>
92+
<testcontainers.version>1.19.7</testcontainers.version>
93+
<awaitility.version>4.2.1</awaitility.version>
9694
<bufbuild.protoc-gen-validate.version>0.6.13</bufbuild.protoc-gen-validate.version>
9795
<!-- necessary for Java 9+ -->
9896
<org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>

src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractQuery.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import lombok.AccessLevel;
2525
import lombok.Getter;
2626
import lombok.RequiredArgsConstructor;
27-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
2827
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
2928
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
3029
import org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidReferenceException;
@@ -39,7 +38,7 @@ public abstract class AbstractQuery<T> {
3938
/**
4039
* Group of the current entity
4140
*/
42-
protected final String group;
41+
protected final List<String> groups;
4342
/**
4443
* Owner name of the current entity
4544
*/
@@ -63,8 +62,8 @@ public abstract class AbstractQuery<T> {
6362
*/
6463
protected AbstractCriteria criteria;
6564

66-
public AbstractQuery(String group, String name, TimestampRange timestampRange, Set<String> tagProjections) {
67-
this.group = group;
65+
public AbstractQuery(List<String> groups, String name, TimestampRange timestampRange, Set<String> tagProjections) {
66+
this.groups = groups;
6867
this.name = name;
6968
this.timestampRange = timestampRange;
7069
this.conditions = new ArrayList<>(10);
@@ -107,13 +106,6 @@ public AbstractQuery<T> criteria(AbstractCriteria criteria) {
107106
*/
108107
abstract T build(MetadataCache.EntityMetadata entityMetadata) throws BanyanDBException;
109108

110-
protected BanyandbCommon.Metadata buildMetadata() {
111-
return BanyandbCommon.Metadata.newBuilder()
112-
.setGroup(group)
113-
.setName(name)
114-
.build();
115-
}
116-
117109
protected Optional<BanyandbModel.Criteria> buildCriteria() {
118110
if (criteria != null) {
119111
return Optional.of(criteria.build());

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -348,11 +348,18 @@ public StreamWrite createStreamWrite(String group, String name, final String ele
348348
public StreamQueryResponse query(StreamQuery streamQuery) throws BanyanDBException {
349349
checkState(this.streamServiceStub != null, "stream service is null");
350350

351-
final BanyandbStream.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
352-
this.streamServiceBlockingStub
353-
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
354-
.query(streamQuery.build(this.metadataCache.findMetadata(streamQuery.group, streamQuery.name))));
355-
return new StreamQueryResponse(response);
351+
for (String group : streamQuery.groups) {
352+
MetadataCache.EntityMetadata em = this.metadataCache.findMetadata(group, streamQuery.name);
353+
if (em != null) {
354+
final BanyandbStream.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
355+
this.streamServiceBlockingStub
356+
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
357+
.query(streamQuery.build(em)));
358+
return new StreamQueryResponse(response);
359+
}
360+
361+
}
362+
throw new RuntimeException("No metadata found for the query");
356363
}
357364

358365
/**
@@ -379,13 +386,18 @@ public TopNQueryResponse query(TopNQuery topNQuery) throws BanyanDBException {
379386
*/
380387
public MeasureQueryResponse query(MeasureQuery measureQuery) throws BanyanDBException {
381388
checkState(this.streamServiceStub != null, "measure service is null");
382-
383-
final BanyandbMeasure.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
384-
this.measureServiceBlockingStub
385-
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
386-
.query(measureQuery.build(this.metadataCache.findMetadata(measureQuery.group, measureQuery.name))));
387-
return new MeasureQueryResponse(response);
388-
}
389+
for (String group : measureQuery.groups) {
390+
MetadataCache.EntityMetadata em = this.metadataCache.findMetadata(group, measureQuery.name);
391+
if (em != null) {
392+
final BanyandbMeasure.QueryResponse response = HandleExceptionsWith.callAndTranslateApiException(() ->
393+
this.measureServiceBlockingStub
394+
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
395+
.query(measureQuery.build(em)));
396+
return new MeasureQueryResponse(response);
397+
}
398+
}
399+
throw new RuntimeException("No metadata found for the query");
400+
}
389401

390402
/**
391403
* Define a new group and attach to the current client.

src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureQuery.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
2828
import org.apache.skywalking.banyandb.v1.client.metadata.MetadataCache;
2929

30+
import java.util.List;
3031
import java.util.Set;
3132

3233
/**
@@ -49,12 +50,12 @@ public class MeasureQuery extends AbstractQuery<BanyandbMeasure.QueryRequest> {
4950

5051
private OrderBy orderBy;
5152

52-
public MeasureQuery(final String group, final String name, final Set<String> tagProjections, final Set<String> fieldProjections) {
53-
this(group, name, null, tagProjections, fieldProjections);
53+
public MeasureQuery(final List<String> groups, final String name, final Set<String> tagProjections, final Set<String> fieldProjections) {
54+
this(groups, name, null, tagProjections, fieldProjections);
5455
}
5556

56-
public MeasureQuery(final String group, final String name, final TimestampRange timestampRange, final Set<String> tagProjections, final Set<String> fieldProjections) {
57-
super(group, name, timestampRange, tagProjections);
57+
public MeasureQuery(final List<String> groups, final String name, final TimestampRange timestampRange, final Set<String> tagProjections, final Set<String> fieldProjections) {
58+
super(groups, name, timestampRange, tagProjections);
5859
this.fieldProjections = fieldProjections;
5960
}
6061

@@ -149,7 +150,8 @@ BanyandbMeasure.QueryRequest build(MetadataCache.EntityMetadata entityMetadata)
149150
throw new IllegalArgumentException("entity metadata is null");
150151
}
151152
final BanyandbMeasure.QueryRequest.Builder builder = BanyandbMeasure.QueryRequest.newBuilder();
152-
builder.setMetadata(buildMetadata());
153+
builder.setName(this.name);
154+
builder.addAllGroups(this.groups);
153155
if (timestampRange != null) {
154156
builder.setTimeRange(timestampRange.build());
155157
} else {

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamQuery.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.skywalking.banyandb.v1.client;
2020

21+
import java.util.List;
2122
import java.util.Set;
2223

2324
import lombok.Setter;
@@ -43,14 +44,14 @@ public class StreamQuery extends AbstractQuery<BanyandbStream.QueryRequest> {
4344
*/
4445
private OrderBy orderBy;
4546

46-
public StreamQuery(final String group, final String name, final TimestampRange timestampRange, final Set<String> projections) {
47-
super(group, name, timestampRange, projections);
47+
public StreamQuery(final List<String> groups, final String name, final TimestampRange timestampRange, final Set<String> projections) {
48+
super(groups, name, timestampRange, projections);
4849
this.offset = 0;
4950
this.limit = 20;
5051
}
5152

52-
public StreamQuery(final String group, final String name, final Set<String> projections) {
53-
this(group, name, null, projections);
53+
public StreamQuery(final List<String> groups, final String name, final Set<String> projections) {
54+
this(groups, name, null, projections);
5455
}
5556

5657
@Override
@@ -68,8 +69,9 @@ BanyandbStream.QueryRequest build(MetadataCache.EntityMetadata entityMetadata) t
6869
if (entityMetadata == null) {
6970
throw new IllegalArgumentException("entity metadata is null");
7071
}
71-
final BanyandbStream.QueryRequest.Builder builder = BanyandbStream.QueryRequest.newBuilder()
72-
.setMetadata(buildMetadata());
72+
final BanyandbStream.QueryRequest.Builder builder = BanyandbStream.QueryRequest.newBuilder();
73+
builder.setName(this.name);
74+
builder.addAllGroups(this.groups);
7375
if (timestampRange != null) {
7476
builder.setTimeRange(timestampRange.build());
7577
}

src/main/java/org/apache/skywalking/banyandb/v1/client/TopNQuery.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.common.base.Preconditions;
2222
import lombok.Setter;
23-
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
2423
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
2524
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
2625
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
@@ -29,7 +28,7 @@
2928

3029
@Setter
3130
public class TopNQuery {
32-
private final String group;
31+
private final List<String> groups;
3332
private final String name;
3433
private final TimestampRange timestampRange;
3534
private final int number;
@@ -40,10 +39,10 @@ public class TopNQuery {
4039
*/
4140
private List<PairQueryCondition<?>> conditions;
4241

43-
public TopNQuery(String group, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
42+
public TopNQuery(List<String> groups, String name, TimestampRange timestampRange, int number, AbstractQuery.Sort sort) {
4443
Preconditions.checkArgument(sort != AbstractQuery.Sort.UNSPECIFIED);
4544
Preconditions.checkArgument(number > 0);
46-
this.group = group;
45+
this.groups = groups;
4746
this.name = name;
4847
this.timestampRange = timestampRange;
4948
this.number = number;
@@ -52,7 +51,8 @@ public TopNQuery(String group, String name, TimestampRange timestampRange, int n
5251

5352
BanyandbMeasure.TopNRequest build() throws BanyanDBException {
5453
BanyandbMeasure.TopNRequest.Builder bld = BanyandbMeasure.TopNRequest.newBuilder()
55-
.setMetadata(BanyandbCommon.Metadata.newBuilder().setGroup(group).setName(name).build())
54+
.setName(name)
55+
.addAllGroups(groups)
5656
.setTimeRange(timestampRange.build())
5757
.setTopN(number)
5858
.setFieldValueSort(AbstractQuery.Sort.DESC == sort ? BanyandbModel.Sort.SORT_DESC : BanyandbModel.Sort.SORT_ASC);

src/main/proto/banyandb/v1/banyandb-measure.proto

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,19 @@ message QueryResponse {
4646
repeated DataPoint data_points = 1;
4747
}
4848

49+
// QueryRequest is the request contract for query.
4950
// QueryRequest is the request contract for query.
5051
message QueryRequest {
51-
// metadata is required
52-
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
52+
// groups indicate where the data points are stored.
53+
repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
54+
// name is the identity of a measure.
55+
string name = 2 [(validate.rules).string.min_len = 1];
5356
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
54-
model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
57+
model.v1.TimeRange time_range = 3 [(validate.rules).message.required = true];
5558
// tag_families are indexed.
5659
model.v1.Criteria criteria = 4;
5760
// tag_projection can be used to select tags of the data points in the response
58-
model.v1.TagProjection tag_projection = 5 [(validate.rules).message.required = true];
61+
model.v1.TagProjection tag_projection = 5;
5962
message FieldProjection {
6063
repeated string names = 1;
6164
}
@@ -121,19 +124,21 @@ message TopNResponse {
121124

122125
// TopNRequest is the request contract for query.
123126
message TopNRequest {
124-
// metadata is required
125-
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
127+
// groups indicate where the data points are stored.
128+
repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
129+
// name is the identity of a measure.
130+
string name = 2 [(validate.rules).string.min_len = 1];
126131
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
127-
model.v1.TimeRange time_range = 2 [(validate.rules).message.required = true];
132+
model.v1.TimeRange time_range = 3 [(validate.rules).message.required = true];
128133
// top_n set the how many items should be returned in each list.
129-
int32 top_n = 3 [(validate.rules).int32.gt = 0];
134+
int32 top_n = 4 [(validate.rules).int32.gt = 0];
130135
// agg aggregates lists grouped by field names in the time_range
131136
// TODO validate enum defined_only
132-
model.v1.AggregationFunction agg = 4;
137+
model.v1.AggregationFunction agg = 5;
133138
// criteria select counters. Only equals are acceptable.
134-
repeated model.v1.Condition conditions = 5;
139+
repeated model.v1.Condition conditions = 6;
135140
// field_value_sort indicates how to sort fields
136-
model.v1.Sort field_value_sort = 6;
141+
model.v1.Sort field_value_sort = 7;
137142
}
138143

139144
//DataPointValue is the data point for writing. It only contains values.

src/main/proto/banyandb/v1/banyandb-stream.proto

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -53,23 +53,25 @@ message QueryResponse {
5353

5454
// QueryRequest is the request contract for query.
5555
message QueryRequest {
56-
// metadata is required
57-
common.v1.Metadata metadata = 1 [(validate.rules).message.required = true];
56+
// groups indicate where the elements are stored.
57+
repeated string groups = 1 [(validate.rules).repeated.min_items = 1];
58+
// name is the identity of a stream.
59+
string name = 2 [(validate.rules).string.min_len = 1];
5860
// time_range is a range query with begin/end time of entities in the timeunit of milliseconds.
5961
// In the context of stream, it represents the range of the `startTime` for spans/segments,
6062
// while in the context of Log, it means the range of the timestamp(s) for logs.
6163
// it is always recommended to specify time range for performance reason
62-
model.v1.TimeRange time_range = 2;
64+
model.v1.TimeRange time_range = 3;
6365
// offset is used to support pagination, together with the following limit
64-
uint32 offset = 3;
66+
uint32 offset = 4;
6567
// limit is used to impose a boundary on the number of records being returned
66-
uint32 limit = 4;
68+
uint32 limit = 5;
6769
// order_by is given to specify the sort for a field. So far, only fields in the type of Integer are supported
68-
model.v1.QueryOrder order_by = 5;
70+
model.v1.QueryOrder order_by = 6;
6971
// tag_families are indexed.
70-
model.v1.Criteria criteria = 6;
72+
model.v1.Criteria criteria = 7;
7173
// projection can be used to select the key names of the element in the response
72-
model.v1.TagProjection projection = 7 [(validate.rules).message.required = true];
74+
model.v1.TagProjection projection = 8 [(validate.rules).message.required = true];
7375
}
7476

7577
message ElementValue {

0 commit comments

Comments
 (0)