Skip to content

Commit 9c5870d

Browse files
authored
Refactor client (#68)
1 parent 24ca171 commit 9c5870d

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1831
-3826
lines changed

CHANGES.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,19 @@ Changes by Version
22
==================
33
Release Notes.
44

5+
0.7.0-rc3
6+
------------------
7+
8+
### Features
9+
10+
* Refactor metadata object to original protocol.
11+
* Complemented the Schema management API.
12+
* Enhance the MetadataCache.
13+
* Add more IT tests.
14+
15+
### Bugs
16+
17+
518
0.7.0-rc0
619
------------------
720

README.md

Lines changed: 147 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -50,53 +50,162 @@ options are listed below,
5050

5151
### Stream and index rules
5252

53-
Then we may define a stream with customized configurations. The following example uses `SegmentRecord` in SkyWalking OAP
54-
as an illustration,
55-
56-
```java
57-
// build a stream default(group)/sw(name) with 2 shards and ttl equals to 30 days
58-
Stream s = Stream.create("default", "sw")
59-
// set entities
60-
.setEntityRelativeTags("service_id", "service_instance_id", "is_error")
61-
// add a tag family "data"
62-
.addTagFamily(TagFamilySpec.create("data")
63-
.addTagSpec(TagFamilySpec.TagSpec.newBinaryTag("data_binary"))
64-
.build())
65-
// add a tag family "searchable"
66-
.addTagFamily(TagFamilySpec.create("searchable")
67-
// create a string tag "trace_id"
68-
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("trace_id"))
69-
.addTagSpec(TagFamilySpec.TagSpec.newIntTag("is_error"))
70-
// service_id is not stored, but can be searched through the index
71-
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("service_id").indexedOnly())
72-
.build())
73-
.build();
53+
#### Define a Group
54+
```java
55+
// build a group sw_record for Stream with 2 shards and ttl equals to 3 days
56+
Group g = Group.newBuilder().setMetadata(Metadata.newBuilder().setName("sw_record"))
57+
.setCatalog(Catalog.CATALOG_STREAM)
58+
.setResourceOpts(ResourceOpts.newBuilder()
59+
.setShardNum(2)
60+
.setSegmentInterval(
61+
IntervalRule.newBuilder()
62+
.setUnit(
63+
IntervalRule.Unit.UNIT_DAY)
64+
.setNum(
65+
1))
66+
.setTtl(
67+
IntervalRule.newBuilder()
68+
.setUnit(
69+
IntervalRule.Unit.UNIT_DAY)
70+
.setNum(
71+
3)))
72+
.build();
73+
client.define(g);
74+
```
75+
76+
Then we may define a stream with customized configurations.
77+
78+
#### Define a Stream
79+
```java
80+
// build a stream trace with above group
81+
Stream s = Stream.newBuilder()
82+
.setMetadata(Metadata.newBuilder()
83+
.setGroup("sw_record")
84+
.setName("trace"))
85+
.setEntity(Entity.newBuilder().addAllTagNames(
86+
Arrays.asList("service_id", "service_instance_id", "is_error")))
87+
.addTagFamilies(TagFamilySpec.newBuilder()
88+
.setName("data")
89+
.addTags(TagSpec.newBuilder()
90+
.setName("data_binary")
91+
.setType(TagType.TAG_TYPE_DATA_BINARY)))
92+
.addTagFamilies(TagFamilySpec.newBuilder()
93+
.setName("searchable")
94+
.addTags(TagSpec.newBuilder()
95+
.setName("trace_id")
96+
.setType(TagType.TAG_TYPE_STRING))
97+
.addTags(TagSpec.newBuilder()
98+
.setName("is_error")
99+
.setType(TagType.TAG_TYPE_INT))
100+
.addTags(TagSpec.newBuilder()
101+
.setName("service_id")
102+
.setType(TagType.TAG_TYPE_STRING)
103+
.setIndexedOnly(true)))
104+
.build();
74105
client.define(s);
75106
```
76107

108+
#### Define a IndexRules
109+
```java
110+
IndexRule.Builder ir = IndexRule.newBuilder()
111+
.setMetadata(Metadata.newBuilder()
112+
.setGroup("sw_record")
113+
.setName("trace_id"))
114+
.addTags("trace_id")
115+
.setType(IndexRule.Type.TYPE_INVERTED)
116+
.setAnalyzer(IndexRule.Analyzer.ANALYZER_UNSPECIFIED);
117+
client.define(ir.build());
118+
```
119+
120+
#### Define a IndexRuleBinding
121+
```java
122+
IndexRuleBinding.Builder irb = IndexRuleBinding.newBuilder()
123+
.setMetadata(BanyandbCommon.Metadata.newBuilder()
124+
.setGroup("sw_record")
125+
.setName("trace_binding"))
126+
.setSubject(BanyandbDatabase.Subject.newBuilder()
127+
.setCatalog(
128+
BanyandbCommon.Catalog.CATALOG_STREAM)
129+
.setName("trace"))
130+
.addAllRules(
131+
Arrays.asList("trace_id"))
132+
.setBeginAt(TimeUtils.buildTimestamp(ZonedDateTime.of(2024, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC)))
133+
.setExpireAt(TimeUtils.buildTimestamp(DEFAULT_EXPIRE_AT));
134+
client.define(irb.build());
135+
```
136+
77137
For the last line in the code block, a simple API (i.e. `BanyanDBClient.define(Stream)`) is used to define the schema of `Stream`.
78138
The same works for `Measure` which will be demonstrated later.
79139

80140
### Measure and index rules
81141

82142
`Measure` can also be defined directly with `BanyanDBClient`,
83143

144+
#### Define a Group
145+
```java
146+
// build a group sw_metrics for Measure with 2 shards and ttl equals to 7 days
147+
Group g = Group.newBuilder().setMetadata(Metadata.newBuilder().setName("sw_metric"))
148+
.setCatalog(Catalog.CATALOG_MEASURE)
149+
.setResourceOpts(ResourceOpts.newBuilder()
150+
.setShardNum(2)
151+
.setSegmentInterval(
152+
IntervalRule.newBuilder()
153+
.setUnit(
154+
IntervalRule.Unit.UNIT_DAY)
155+
.setNum(
156+
1))
157+
.setTtl(
158+
IntervalRule.newBuilder()
159+
.setUnit(
160+
IntervalRule.Unit.UNIT_DAY)
161+
.setNum(
162+
7)))
163+
.build();
164+
client.define(g);
165+
```
166+
167+
#### Define a Measure
84168
```java
85169
// create a new measure schema with an additional interval
86170
// the interval is used to specify how frequently to send a data point
87-
Measure m = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1))
88-
// set entity
89-
.setEntityRelativeTags("entity_id")
90-
// define a tag family "default"
91-
.addTagFamily(TagFamilySpec.create("default")
92-
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("id"))
93-
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id"))
94-
.build())
95-
// define field specs
96-
// compressMethod and encodingMethod can be specified
97-
.addField(Measure.FieldSpec.newIntField("total").compressWithZSTD().encodeWithGorilla().build())
98-
.addField(Measure.FieldSpec.newIntField("value").compressWithZSTD().encodeWithGorilla().build())
99-
.build();
171+
Measure m = Measure.newBuilder()
172+
.setMetadata(Metadata.newBuilder()
173+
.setGroup("sw_metric")
174+
.setName("service_cpm_minute"))
175+
.setInterval(Duration.ofMinutes(1).format())
176+
.setEntity(Entity.newBuilder().addTagNames("entity_id"))
177+
.addTagFamilies(
178+
TagFamilySpec.newBuilder()
179+
.setName("default")
180+
.addTags(
181+
TagSpec.newBuilder()
182+
.setName("entity_id")
183+
.setType(
184+
TagType.TAG_TYPE_STRING))
185+
.addTags(
186+
TagSpec.newBuilder()
187+
.setName("scope")
188+
.setType(
189+
TagType.TAG_TYPE_STRING)))
190+
.addFields(
191+
FieldSpec.newBuilder()
192+
.setName("total")
193+
.setFieldType(
194+
FieldType.FIELD_TYPE_INT)
195+
.setCompressionMethod(
196+
CompressionMethod.COMPRESSION_METHOD_ZSTD)
197+
.setEncodingMethod(
198+
EncodingMethod.ENCODING_METHOD_GORILLA))
199+
.addFields(
200+
FieldSpec.newBuilder()
201+
.setName("value")
202+
.setFieldType(
203+
FieldType.FIELD_TYPE_INT)
204+
.setCompressionMethod(
205+
CompressionMethod.COMPRESSION_METHOD_ZSTD)
206+
.setEncodingMethod(
207+
EncodingMethod.ENCODING_METHOD_GORILLA))
208+
.build();
100209
// define a measure, as we've mentioned above
101210
client.define(m);
102211
```
@@ -118,14 +227,14 @@ For example,
118227
Instant end = Instant.now();
119228
Instant begin = end.minus(15, ChronoUnit.MINUTES);
120229
// with stream schema, group=default, name=sw
121-
StreamQuery query = new StreamQuery(Lists.newArrayList("default"), "sw",
230+
StreamQuery query = new StreamQuery(Lists.newArrayList("sw_record"), "trace",
122231
new TimestampRange(begin.toEpochMilli(), end.toEpochMilli()),
123232
// projection tags which are indexed
124233
ImmutableSet.of("state", "start_time", "duration", "trace_id"));
125234
// search for all states
126-
query.and(PairQueryCondition.LongQueryCondition.eq("searchable", "state" , 0L));
235+
query.and(PairQueryCondition.StringQueryCondition.eq("searchable", "trace_id" , "1a60e0846817447eac4cd498eefd3743.1.17261060724190003"));
127236
// set order by condition
128-
query.setOrderBy(new StreamQuery.OrderBy("duration", StreamQuery.OrderBy.Type.DESC));
237+
query.setOrderBy(new AbstractQuery.OrderBy(AbstractQuery.Sort.DESC));
129238
// set projection for un-indexed tags
130239
query.setDataProjections(ImmutableSet.of("data_binary"));
131240
// send the query request
@@ -424,4 +533,4 @@ Please follow the [REPORTING GUIDELINES](https://www.apache.org/foundation/polic
424533
* [bilibili B站 视频](https://space.bilibili.com/390683219)
425534

426535
# License
427-
[Apache 2.0 License.](LICENSE)
536+
[Apache 2.0 License.](LICENSE)

0 commit comments

Comments
 (0)