Skip to content

Commit e5bffa9

Browse files
authored
Add write histogram for client. (#91)
* Add write obs for client. * Update change logs.
1 parent 24c354e commit e5bffa9

File tree

7 files changed

+119
-24
lines changed

7 files changed

+119
-24
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ Release Notes.
1313
* Add replicas configuration to the API: introduce `replicas` in LifecycleStage and ResourceOpts to support high availability.
1414
* Simplify TLS options: remove unsupported mTLS client certificate settings from Options and DefaultChannelFactory; trust CA is still supported.
1515
* Support auth with username and password.
16+
* Update gRPC to 1.75.0.
17+
* Add histogram metrics to write/insert/update operations of the measure, stream and property.
1618

1719
0.8.0
1820
------------------

pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@
8282
<!-- core lib dependency -->
8383
<bytebuddy.version>1.10.19</bytebuddy.version>
8484
<!-- grpc version should align with the Skywalking main repo -->
85-
<grpc.version>1.63.0</grpc.version>
85+
<grpc.version>1.75.0</grpc.version>
8686
<protoc.version>3.25.3</protoc.version>
8787
<os-maven-plugin.version>1.7.1</os-maven-plugin.version>
8888
<protobuf-maven-plugin.version>0.6.1</protobuf-maven-plugin.version>
@@ -95,6 +95,7 @@
9595
<!-- necessary for Java 9+ -->
9696
<org.apache.tomcat.annotations-api.version>6.0.53</org.apache.tomcat.annotations-api.version>
9797
<slf4j.version>1.7.36</slf4j.version>
98+
<prometheus.client.version>0.16.0</prometheus.client.version>
9899

99100
<!-- Plugin versions -->
100101
<docker.plugin.version>0.4.13</docker.plugin.version>
@@ -161,6 +162,11 @@
161162
<artifactId>pgv-java-stub</artifactId>
162163
<version>${bufbuild.protoc-gen-validate.version}</version>
163164
</dependency>
165+
<dependency>
166+
<groupId>io.prometheus</groupId>
167+
<artifactId>simpleclient</artifactId>
168+
<version>${prometheus.client.version}</version>
169+
</dependency>
164170

165171
<dependency>
166172
<groupId>com.google.auto.value</groupId>

src/main/java/org/apache/skywalking/banyandb/v1/client/AbstractBulkWriteProcessor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.auto.value.AutoValue;
2222
import io.grpc.stub.AbstractAsyncStub;
2323
import io.grpc.stub.StreamObserver;
24+
import io.prometheus.client.Histogram;
2425
import java.io.Closeable;
2526
import java.util.ArrayList;
2627
import java.util.List;
@@ -134,14 +135,16 @@ public void flush() {
134135

135136
final List<Holder> batch = new ArrayList<>(requests.size());
136137
requests.drainTo(batch);
137-
final CompletableFuture<Void> future = doFlush(batch);
138+
final CompletableFuture<Void> future = doObservedFlush(batch);
138139
future.whenComplete((v, t) -> semaphore.release());
139140
future.join();
140141
lastFlushTS = System.currentTimeMillis();
141142

142143
}
143144

144-
protected CompletableFuture<Void> doFlush(final List<Holder> data) {
145+
protected abstract CompletableFuture<Void> doObservedFlush(final List<Holder> data);
146+
147+
protected CompletableFuture<Void> doFlush(final List<Holder> data, Histogram.Timer timer) {
145148
// The batch is used to control the completion of the flush operation.
146149
// There is at most one error per batch,
147150
// because the database server would terminate the batch process when the first error occurs.

src/main/java/org/apache/skywalking/banyandb/v1/client/BanyanDBClient.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import io.grpc.ManagedChannel;
2828
import io.grpc.Status;
2929
import io.grpc.stub.StreamObserver;
30+
import io.prometheus.client.Histogram;
3031
import java.time.ZoneOffset;
3132
import java.time.ZonedDateTime;
3233
import lombok.AccessLevel;
@@ -101,6 +102,7 @@
101102
@Slf4j
102103
public class BanyanDBClient implements Closeable {
103104
public static final ZonedDateTime DEFAULT_EXPIRE_AT = ZonedDateTime.of(2099, 1, 1, 0, 0, 0, 0, ZoneOffset.UTC);
105+
private static Histogram WRITE_HISTOGRAM;
104106
private final String[] targets;
105107
/**
106108
* Options for server connection.
@@ -146,6 +148,16 @@ public class BanyanDBClient implements Closeable {
146148
*/
147149
private final MetadataCache metadataCache;
148150

151+
static {
152+
// init prometheus metric
153+
WRITE_HISTOGRAM = Histogram.build()
154+
.name("banyandb_write_latency_seconds")
155+
.help("BanyanDB Bulk Write latency in seconds")
156+
.buckets(0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0)
157+
.labelNames("catalog", "operation", "instanceID")
158+
.register();
159+
}
160+
149161
/**
150162
* Create a BanyanDB client instance with a default options.
151163
*
@@ -234,6 +246,13 @@ void connect(Channel channel) {
234246
public CompletableFuture<Void> write(StreamWrite streamWrite) {
235247
checkState(this.streamServiceStub != null, "stream service is null");
236248

249+
Histogram.Timer timer
250+
= WRITE_HISTOGRAM.labels(
251+
"stream",
252+
"single_write", // single write for non-bulk operation.
253+
options.getPrometheusMetricsOpts().getClientID()
254+
)
255+
.startTimer();
237256
CompletableFuture<Void> future = new CompletableFuture<>();
238257
final StreamObserver<BanyandbStream.WriteRequest> writeRequestStreamObserver
239258
= this.streamServiceStub
@@ -279,12 +298,14 @@ public void onNext(BanyandbStream.WriteResponse writeResponse) {
279298

280299
@Override
281300
public void onError(Throwable throwable) {
301+
timer.observeDuration();
282302
log.error("Error occurs in flushing streams.", throwable);
283303
future.completeExceptionally(throwable);
284304
}
285305

286306
@Override
287307
public void onCompleted() {
308+
timer.observeDuration();
288309
if (responseException == null) {
289310
future.complete(null);
290311
} else {
@@ -313,7 +334,7 @@ public void onCompleted() {
313334
public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) {
314335
checkState(this.streamServiceStub != null, "stream service is null");
315336

316-
return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout);
337+
return new StreamBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options);
317338
}
318339

319340
/**
@@ -329,7 +350,7 @@ public StreamBulkWriteProcessor buildStreamWriteProcessor(int maxBulkSize, int f
329350
public MeasureBulkWriteProcessor buildMeasureWriteProcessor(int maxBulkSize, int flushInterval, int concurrency, int timeout) {
330351
checkState(this.measureServiceStub != null, "measure service is null");
331352

332-
return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout);
353+
return new MeasureBulkWriteProcessor(this, maxBulkSize, flushInterval, concurrency, timeout, WRITE_HISTOGRAM, options);
333354
}
334355

335356
/**
@@ -827,7 +848,7 @@ public List<IndexRuleBinding> findIndexRuleBindings(String group) throws BanyanD
827848

828849
/**
829850
* Define a new property.
830-
*
851+
*
831852
* @param property the property to be stored in the BanyanBD
832853
* @throws BanyanDBException if the property is invalid
833854
*/
@@ -889,7 +910,13 @@ public boolean deletePropertyDefinition(String group, String name) throws Banyan
889910
*/
890911
public ApplyResponse apply(Property property) throws BanyanDBException {
891912
PropertyStore store = new PropertyStore(checkNotNull(this.channel));
892-
return store.apply(property);
913+
try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
914+
"property",
915+
"single_write",
916+
options.getPrometheusMetricsOpts().getClientID()
917+
).startTimer()) {
918+
return store.apply(property);
919+
}
893920
}
894921

895922
/**
@@ -901,7 +928,13 @@ public ApplyResponse apply(Property property) throws BanyanDBException {
901928
public ApplyResponse apply(Property property, Strategy strategy) throws
902929
BanyanDBException {
903930
PropertyStore store = new PropertyStore(checkNotNull(this.channel));
904-
return store.apply(property, strategy);
931+
try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
932+
"property",
933+
"single_write",
934+
options.getPrometheusMetricsOpts().getClientID()
935+
).startTimer()) {
936+
return store.apply(property, strategy);
937+
}
905938
}
906939

907940
/**
@@ -926,7 +959,13 @@ public BanyandbProperty.QueryResponse query(BanyandbProperty.QueryRequest reques
926959
public DeleteResponse deleteProperty(String group, String name, String id) throws
927960
BanyanDBException {
928961
PropertyStore store = new PropertyStore(checkNotNull(this.channel));
929-
return store.delete(group, name, id);
962+
try (Histogram.Timer timer = WRITE_HISTOGRAM.labels(
963+
"property",
964+
"delete",
965+
options.getPrometheusMetricsOpts().getClientID()
966+
).startTimer()) {
967+
return store.delete(group, name, id);
968+
}
930969
}
931970

932971
/**

src/main/java/org/apache/skywalking/banyandb/v1/client/MeasureBulkWriteProcessor.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
package org.apache.skywalking.banyandb.v1.client;
2020

2121
import io.grpc.stub.StreamObserver;
22+
import io.prometheus.client.Histogram;
23+
import java.util.List;
2224
import lombok.extern.slf4j.Slf4j;
2325

2426
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -42,25 +44,29 @@
4244
public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
4345
MeasureServiceGrpc.MeasureServiceStub> {
4446
private final BanyanDBClient client;
47+
private final Histogram writeHistogram;
48+
private final Options options;
4549

4650
/**
4751
* Create the processor.
4852
*
49-
* @param client the client
50-
* @param maxBulkSize the max bulk size for the flush operation
51-
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
52-
* automatically. Unit is second.
53-
* @param timeout network timeout threshold in seconds.
54-
* @param concurrency the number of concurrency would run for the flush max.
53+
* @param client the client
54+
* @param maxBulkSize the max bulk size for the flush operation
55+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
56+
* automatically. Unit is second.
57+
* @param concurrency the number of concurrency would run for the flush max.
58+
* @param timeout network timeout threshold in seconds.
5559
*/
5660
protected MeasureBulkWriteProcessor(
57-
final BanyanDBClient client,
58-
final int maxBulkSize,
59-
final int flushInterval,
60-
final int concurrency,
61-
final int timeout) {
61+
final BanyanDBClient client,
62+
final int maxBulkSize,
63+
final int flushInterval,
64+
final int concurrency,
65+
final int timeout, final Histogram writeHistogram, final Options options) {
6266
super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout);
6367
this.client = client;
68+
this.writeHistogram = writeHistogram;
69+
this.options = options;
6470
}
6571

6672
@Override
@@ -105,4 +111,14 @@ public void onCompleted() {
105111
}
106112
});
107113
}
114+
115+
@Override
116+
protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) {
117+
Histogram.Timer timer = writeHistogram.labels(
118+
"measure",
119+
"bulk_write",
120+
options.getPrometheusMetricsOpts().getClientID()
121+
).startTimer();
122+
return super.doFlush(data, timer);
123+
}
108124
}

src/main/java/org/apache/skywalking/banyandb/v1/client/Options.java

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,14 +63,24 @@ public class Options {
6363
* Basic Auth: password of BanyanDB server
6464
*/
6565
private String password = "";
66+
/**
67+
* Enable Prometheus metrics
68+
*/
69+
private PrometheusMetricsOpts prometheusMetricsOpts = new PrometheusMetricsOpts();
6670

6771
public Options() {
6872
}
6973

7074
ChannelManagerSettings buildChannelManagerSettings() {
7175
return ChannelManagerSettings.newBuilder()
72-
.setRefreshInterval(this.refreshInterval)
73-
.setForceReconnectionThreshold(this.forceReconnectionThreshold)
74-
.build();
76+
.setRefreshInterval(this.refreshInterval)
77+
.setForceReconnectionThreshold(this.forceReconnectionThreshold)
78+
.build();
79+
}
80+
81+
public static class PrometheusMetricsOpts {
82+
@Setter(AccessLevel.PUBLIC)
83+
@Getter
84+
private String clientID = "default";
7585
}
7686
}

src/main/java/org/apache/skywalking/banyandb/v1/client/StreamBulkWriteProcessor.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020

2121
import io.grpc.stub.StreamObserver;
2222

23+
import io.prometheus.client.Histogram;
24+
import java.util.List;
2325
import lombok.extern.slf4j.Slf4j;
2426

2527
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
@@ -43,6 +45,8 @@
4345
public class StreamBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbStream.WriteRequest,
4446
StreamServiceGrpc.StreamServiceStub> {
4547
private final BanyanDBClient client;
48+
private final Histogram writeHistogram;
49+
private final Options options;
4650

4751
/**
4852
* Create the processor.
@@ -59,9 +63,13 @@ protected StreamBulkWriteProcessor(
5963
final int maxBulkSize,
6064
final int flushInterval,
6165
final int concurrency,
62-
final int timeout) {
66+
final int timeout,
67+
final Histogram writeHistogram,
68+
final Options options) {
6369
super(client.getStreamServiceStub(), "StreamBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout);
6470
this.client = client;
71+
this.writeHistogram = writeHistogram;
72+
this.options = options;
6573
}
6674

6775
@Override
@@ -106,4 +114,15 @@ public void onCompleted() {
106114
}
107115
});
108116
}
117+
118+
@Override
119+
protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) {
120+
Histogram.Timer timer = writeHistogram.labels(
121+
"stream",
122+
"bulk_write",
123+
options.getPrometheusMetricsOpts().getClientID()
124+
).startTimer();
125+
return super.doFlush(data, timer);
126+
}
109127
}
128+

0 commit comments

Comments
 (0)