Skip to content

Commit 32eae17

Browse files
authored
Add mod revision check to write requests(measure/stream) (#46)
* Add mod revision check to write requests(measure/stream) - Support cached mod revision when create/update schema - Support auto-refresh mod revision when writing data
1 parent cd9c31f commit 32eae17

29 files changed

+568
-94
lines changed

CHANGES.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ Changes by Version
22
==================
33
Release Notes.
44

5+
0.5.0
6+
------------------
7+
8+
### Features
9+
10+
* Add mod revision check to write requests
11+
512
0.4.0
613
------------------
714

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ Measure m = Measure.create("sw_metric", "service_cpm_minute", Duration.ofHours(1
8181
.setEntityRelativeTags("entity_id")
8282
// define a tag family "default"
8383
.addTagFamily(TagFamilySpec.create("default")
84-
.addTagSpec(TagFamilySpec.TagSpec.newIDTag("id"))
84+
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("id"))
8585
.addTagSpec(TagFamilySpec.TagSpec.newStringTag("entity_id"))
8686
.build())
8787
// define field specs
@@ -280,15 +280,15 @@ The writing procedure for `Measure` is similar to the above described process an
280280

281281
```java
282282
// build a MeasureBulkWriteProcessor from client
283-
MeasureBulkWriteProcessor bulkWriteProcessor = client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency);
283+
MeasureBulkWriteProcessor measureBulkWriteProcessor = client.buildMeasureWriteProcessor(maxBulkSize, flushInterval, concurrency);
284284
```
285285

286286
A `BulkWriteProcessor` is created by calling `buildMeasureWriteProcessor`. Then build the `MeasureWrite` object and send with bulk processor,
287287

288288
```java
289289
Instant now = Instant.now();
290290
MeasureWrite measureWrite = client.createMeasureWrite("sw_metric", "service_cpm_minute", now.toEpochMilli());
291-
measureWrite.tag("id", TagAndValue.idTagValue("1"))
291+
measureWrite.tag("id", TagAndValue.stringTagValue("1"))
292292
.tag("entity_id", TagAndValue.stringTagValue("entity_1"))
293293
.field("total", TagAndValue.longFieldValue(100))
294294
.field("value", TagAndValue.longFieldValue(1));

src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ P build() {
7474
}
7575

7676
BanyandbCommon.Metadata metadata = BanyandbCommon.Metadata.newBuilder()
77-
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).build();
77+
.setGroup(entityMetadata.getGroup()).setName(entityMetadata.getName()).setModRevision(entityMetadata.getModRevision()).build();
7878
Timestamp ts = Timestamp.newBuilder()
7979
.setSeconds(timestamp / 1000)
8080
.setNanos((int) (timestamp % 1000 * 1_000_000)).build();

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 84 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import lombok.AccessLevel;
2929
import lombok.Getter;
3030
import lombok.extern.slf4j.Slf4j;
31+
32+
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
3133
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
3234
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
3335
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
@@ -36,6 +38,8 @@
3638
import org.apache.skywalking.banyandb.v1.client.grpc.channel.ChannelManager;
3739
import org.apache.skywalking.banyandb.v1.client.grpc.channel.DefaultChannelFactory;
3840
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
41+
import org.apache.skywalking.banyandb.v1.client.grpc.exception.InternalException;
42+
import org.apache.skywalking.banyandb.v1.client.grpc.exception.InvalidArgumentException;
3943
import org.apache.skywalking.banyandb.v1.client.metadata.Group;
4044
import org.apache.skywalking.banyandb.v1.client.metadata.GroupMetadataRegistry;
4145
import org.apache.skywalking.banyandb.v1.client.metadata.IndexRule;
@@ -58,6 +62,7 @@
5862
import java.util.ArrayList;
5963
import java.util.Collections;
6064
import java.util.List;
65+
import java.util.concurrent.CompletableFuture;
6166
import java.util.concurrent.TimeUnit;
6267
import java.util.concurrent.locks.ReentrantLock;
6368
import java.util.stream.Collectors;
@@ -198,33 +203,73 @@ void connect(Channel channel) {
198203
* Perform a single write with given entity.
199204
*
200205
* @param streamWrite the entity to be written
206+
* @return a future of write result
201207
*/
202-
public void write(StreamWrite streamWrite) {
208+
public CompletableFuture<Void> write(StreamWrite streamWrite) {
203209
checkState(this.streamServiceStub != null, "stream service is null");
204210

211+
CompletableFuture<Void> future = new CompletableFuture<>();
205212
final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
206213
= this.streamServiceStub
207214
.withDeadlineAfter(this.getOptions().getDeadline(), TimeUnit.SECONDS)
208215
.write(
209216
new StreamObserver<BanyandbStream.WriteResponse>() {
217+
private BanyanDBException responseException;
218+
210219
@Override
211220
public void onNext(BanyandbStream.WriteResponse writeResponse) {
221+
switch (writeResponse.getStatus()) {
222+
case STATUS_INVALID_TIMESTAMP:
223+
responseException = new InvalidArgumentException(
224+
"Invalid timestamp: " + streamWrite.getTimestamp(), null, Status.Code.INVALID_ARGUMENT, false);
225+
break;
226+
case STATUS_NOT_FOUND:
227+
responseException = new InvalidArgumentException(
228+
"Invalid metadata: " + streamWrite.entityMetadata, null, Status.Code.INVALID_ARGUMENT, false);
229+
break;
230+
case STATUS_EXPIRED_SCHEMA:
231+
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
232+
log.warn("The schema {}.{} is expired, trying update the schema...",
233+
metadata.getGroup(), metadata.getName());
234+
try {
235+
BanyanDBClient.this.findStream(metadata.getGroup(), metadata.getName());
236+
} catch (BanyanDBException e) {
237+
String warnMessage = String.format("Failed to refresh the stream schema %s.%s",
238+
metadata.getGroup(), metadata.getName());
239+
log.warn(warnMessage, e);
240+
}
241+
responseException = new InvalidArgumentException(
242+
"Expired revision: " + metadata.getModRevision(), null, Status.Code.INVALID_ARGUMENT, true);
243+
break;
244+
case STATUS_INTERNAL_ERROR:
245+
responseException = new InternalException(
246+
"Internal error occurs in server", null, Status.Code.INTERNAL, true);
247+
break;
248+
default:
249+
}
212250
}
213251

214252
@Override
215253
public void onError(Throwable throwable) {
216254
log.error("Error occurs in flushing streams.", throwable);
255+
future.completeExceptionally(throwable);
217256
}
218257

219258
@Override
220259
public void onCompleted() {
260+
if (responseException == null) {
261+
future.complete(null);
262+
} else {
263+
future.completeExceptionally(responseException);
264+
}
221265
}
222266
});
223267
try {
224268
writeRequestStreamObserver.onNext(streamWrite.build());
225269
} finally {
226270
writeRequestStreamObserver.onCompleted();
227271
}
272+
return future;
228273
}
229274

230275
/**
@@ -239,7 +284,7 @@ public void onCompleted() {
239284
public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
240285
checkState(this.streamServiceStub != null, "stream service is null");
241286

242-
return new StreamBulkWriteProcessor(this.streamServiceStub, maxBulkSize, flushInterval, concurrency);
287+
return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency);
243288
}
244289

245290
/**
@@ -254,7 +299,7 @@ public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int f
254299
public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency) {
255300
checkState(this.measureServiceStub != null, "measure service is null");
256301

257-
return new MeasureBulkWriteProcessor(this.measureServiceStub, maxBulkSize, flushInterval, concurrency);
302+
return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency);
258303
}
259304

260305
/**
@@ -361,23 +406,57 @@ public Group define(Group group) throws BanyanDBException {
361406
*/
362407
public void define(Stream stream) throws BanyanDBException {
363408
StreamMetadataRegistry streamRegistry = new StreamMetadataRegistry(checkNotNull(this.channel));
364-
streamRegistry.create(stream);
409+
long modRevision = streamRegistry.create(stream);
365410
defineIndexRules(stream, stream.indexRules());
411+
412+
stream = stream.withModRevision(modRevision);
366413
this.metadataCache.register(stream);
367414
}
368415

416+
/**
417+
* Delete a stream
418+
*
419+
* @param stream the stream to be deleted
420+
* @return true if the stream is deleted successfully
421+
*/
422+
public boolean delete(Stream stream) throws BanyanDBException {
423+
StreamMetadataRegistry streamRegistry = new StreamMetadataRegistry(checkNotNull(this.channel));
424+
if (streamRegistry.delete(stream.group(), stream.name())) {
425+
this.metadataCache.unregister(stream);
426+
return true;
427+
}
428+
return false;
429+
}
430+
369431
/**
370432
* Define a new measure
371433
*
372434
* @param measure the measure to be created
373435
*/
374436
public void define(Measure measure) throws BanyanDBException {
375437
MeasureMetadataRegistry measureRegistry = new MeasureMetadataRegistry(checkNotNull(this.channel));
376-
measureRegistry.create(measure);
438+
long modRevision = measureRegistry.create(measure);
377439
defineIndexRules(measure, measure.indexRules());
440+
441+
measure = measure.withModRevision(modRevision);
378442
this.metadataCache.register(measure);
379443
}
380444

445+
/**
446+
* Delete a measure
447+
*
448+
* @param measure the measure to be deleted
449+
* @return true if the measure is deleted successfully
450+
*/
451+
public boolean delete(Measure measure) throws BanyanDBException {
452+
MeasureMetadataRegistry measureRegistry = new MeasureMetadataRegistry(checkNotNull(this.channel));
453+
if (measureRegistry.delete(measure.group(), measure.name())) {
454+
this.metadataCache.unregister(measure);
455+
return true;
456+
}
457+
return false;
458+
}
459+
381460
/**
382461
* Define a new TopNAggregation
383462
*

src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,16 @@
2020

2121
import io.grpc.stub.StreamObserver;
2222
import lombok.extern.slf4j.Slf4j;
23+
24+
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
2325
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
2426
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
27+
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
2528

2629
import javax.annotation.concurrent.ThreadSafe;
30+
31+
import java.util.HashSet;
32+
import java.util.Set;
2733
import java.util.concurrent.CompletableFuture;
2834

2935
/**
@@ -33,30 +39,50 @@
3339
@ThreadSafe
3440
public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
3541
MeasureServiceGrpc.MeasureServiceStub> {
42+
private final BanyanDBClient client;
43+
3644
/**
3745
* Create the processor.
3846
*
39-
* @param measureServiceStub stub for gRPC call.
40-
* @param maxBulkSize the max bulk size for the flush operation
41-
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
42-
* automatically. Unit is second.
43-
* @param concurrency the number of concurrency would run for the flush max.
47+
* @param client the client
48+
* @param maxBulkSize the max bulk size for the flush operation
49+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
50+
* automatically. Unit is second.
51+
* @param concurrency the number of concurrency would run for the flush max.
4452
*/
4553
protected MeasureBulkWriteProcessor(
46-
final MeasureServiceGrpc.MeasureServiceStub measureServiceStub,
54+
final BanyanDBClient client,
4755
final int maxBulkSize,
4856
final int flushInterval,
4957
final int concurrency) {
50-
super(measureServiceStub, "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
58+
super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
59+
this.client = client;
5160
}
5261

5362
@Override
5463
protected StreamObserver<BanyandbMeasure.WriteRequest> buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
5564
CompletableFuture<Void> batch) {
5665
return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
66+
private final Set<String> schemaExpired = new HashSet<>();
67+
5768
@Override
5869
public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
59-
70+
switch (writeResponse.getStatus()) {
71+
case STATUS_EXPIRED_SCHEMA:
72+
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
73+
String schemaKey = metadata.getGroup() + "." + metadata.getName();
74+
if (!schemaExpired.contains(schemaKey)) {
75+
log.warn("The schema {} is expired, trying update the schema...", schemaKey);
76+
try {
77+
client.findMeasure(metadata.getGroup(), metadata.getName());
78+
schemaExpired.add(schemaKey);
79+
} catch (BanyanDBException e) {
80+
throw new RuntimeException(e);
81+
}
82+
}
83+
break;
84+
default:
85+
}
6086
}
6187

6288
@Override

src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureWrite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ protected BanyandbMeasure.WriteRequest build(BanyandbCommon.Metadata metadata, T
9393
}
9494

9595
builder.setDataPoint(datapointValueBuilder);
96+
builder.setMessageId(System.nanoTime());
9697
return builder.build();
9798
}
9899
}

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,16 @@
2121
import io.grpc.stub.StreamObserver;
2222

2323
import lombok.extern.slf4j.Slf4j;
24+
25+
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
2426
import org.apache.skywalking.banyandb.stream.v1.StreamServiceGrpc;
2527
import org.apache.skywalking.banyandb.stream.v1.BanyandbStream;
28+
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
2629

2730
import javax.annotation.concurrent.ThreadSafe;
31+
32+
import java.util.HashSet;
33+
import java.util.Set;
2834
import java.util.concurrent.CompletableFuture;
2935

3036
/**
@@ -34,29 +40,50 @@
3440
@ThreadSafe
3541
public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
3642
StreamServiceGrpc.StreamServiceStub> {
43+
private final BanyanDBClient client;
44+
3745
/**
3846
* Create the processor.
3947
*
40-
* @param serviceStub stub for gRPC call.
48+
* @param client the client
4149
* @param maxBulkSize the max bulk size for the flush operation
4250
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
4351
* automatically. Unit is second.
4452
* @param concurrency the number of concurrency would run for the flush max.
4553
*/
4654
protected StreamBulkWriteProcessor(
47-
final StreamServiceGrpc.StreamServiceStub serviceStub,
55+
final BanyanDBClient client,
4856
final int maxBulkSize,
4957
final int flushInterval,
5058
final int concurrency) {
51-
super(serviceStub, "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
59+
super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency);
60+
this.client = client;
5261
}
5362

5463
@Override
5564
protected StreamObserver<BanyandbStream.WriteRequest> buildStreamObserver(StreamServiceGrpc.StreamServiceStub stub, CompletableFuture<Void> batch) {
5665
return stub.write(
5766
new StreamObserver<BanyandbStream.WriteResponse>() {
67+
private final Set<String> schemaExpired = new HashSet<>();
68+
5869
@Override
5970
public void onNext(BanyandbStream.WriteResponse writeResponse) {
71+
switch (writeResponse.getStatus()) {
72+
case STATUS_EXPIRED_SCHEMA:
73+
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
74+
String schemaKey = metadata.getGroup() + "." + metadata.getName();
75+
if (!schemaExpired.contains(schemaKey)) {
76+
log.warn("The schema {} is expired, trying update the schema...", schemaKey);
77+
try {
78+
client.findStream(metadata.getGroup(), metadata.getName());
79+
schemaExpired.add(schemaKey);
80+
} catch (BanyanDBException e) {
81+
throw new RuntimeException(e);
82+
}
83+
}
84+
break;
85+
default:
86+
}
6087
}
6188

6289
@Override

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamWrite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ protected BanyandbStream.WriteRequest build(BanyandbCommon.Metadata metadata, Ti
9494
elemValBuilder.addTagFamilies(BanyandbModel.TagFamilyForWrite.newBuilder().addAllTags(tags).build());
9595
}
9696
builder.setElement(elemValBuilder);
97+
builder.setMessageId(System.nanoTime());
9798
return builder.build();
9899
}
99100
}

0 commit comments

Comments
 (0)