Skip to content

Commit b81ef19

Browse files
authored
BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support observe metrics for write operations. (#13541)
1 parent 2bb22eb commit b81ef19

File tree

14 files changed

+1158
-122
lines changed

14 files changed

+1158
-122
lines changed

docs/en/changes/changes.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@
105105
* BanyanDB: support add group prefix (namespace) for BanyanDB groups.
106106
* BanyanDB: fix when setting `@BanyanDB.TimestampColumn`, the column should not be indexed.
107107
* OAP Self Observability: make Trace analysis metrics separate by label `protocol`, add Zipkin span dropped metrics.
108+
* BanyanDB: Move data write logic from BanyanDB Java Client to OAP and support observe metrics for write operations.
108109

109110
#### UI
110111

oap-server-bom/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
<httpcore.version>4.4.16</httpcore.version>
7373
<httpasyncclient.version>4.1.5</httpasyncclient.version>
7474
<commons-compress.version>1.21</commons-compress.version>
75-
<banyandb-java-client.version>0.9.1</banyandb-java-client.version>
75+
<banyandb-java-client.version>0.9.2</banyandb-java-client.version>
7676
<kafka-clients.version>3.4.0</kafka-clients.version>
7777
<spring-kafka-test.version>2.4.6.RELEASE</spring-kafka-test.version>
7878
<consul.client.version>1.5.3</consul.client.version>

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBBatchDAO.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,14 @@
2121
import java.util.List;
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.function.Function;
24-
import org.apache.skywalking.banyandb.v1.client.MeasureBulkWriteProcessor;
25-
import org.apache.skywalking.banyandb.v1.client.StreamBulkWriteProcessor;
26-
import org.apache.skywalking.banyandb.v1.client.TraceBulkWriteProcessor;
2724
import org.apache.skywalking.oap.server.core.storage.AbstractDAO;
2825
import org.apache.skywalking.oap.server.core.storage.IBatchDAO;
2926
import org.apache.skywalking.oap.server.library.client.request.InsertRequest;
3027
import org.apache.skywalking.oap.server.library.client.request.PrepareRequest;
3128
import org.apache.skywalking.oap.server.library.util.CollectionUtils;
29+
import org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.MeasureBulkWriteProcessor;
30+
import org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.StreamBulkWriteProcessor;
31+
import org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk.TraceBulkWriteProcessor;
3232
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureInsertRequest;
3333
import org.apache.skywalking.oap.server.storage.plugin.banyandb.measure.BanyanDBMeasureUpdateRequest;
3434
import org.apache.skywalking.oap.server.storage.plugin.banyandb.stream.BanyanDBStreamInsertRequest;
@@ -42,7 +42,7 @@ public class BanyanDBBatchDAO extends AbstractDAO<BanyanDBStorageClient> impleme
4242

4343
private MeasureBulkWriteProcessor measureBulkWriteProcessor;
4444

45-
private TraceBulkWriteProcessor traceBulkWriteProcessor;
45+
private TraceBulkWriteProcessor traceBulkWriteProcessor;
4646

4747
private final int maxBulkSize;
4848

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageClient.java

Lines changed: 222 additions & 89 deletions
Large diffs are not rendered by default.

oap-server/server-storage-plugin/storage-banyandb-plugin/src/main/java/org/apache/skywalking/oap/server/storage/plugin/banyandb/BanyanDBStorageProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ public void prepare() throws ServiceNotProvidedException, ModuleStartException {
148148
}
149149
this.registerServiceImplementation(StorageBuilderFactory.class, new StorageBuilderFactory.Default());
150150

151-
this.client = new BanyanDBStorageClient(config);
151+
this.client = new BanyanDBStorageClient(getManager(), config);
152152
this.modelInstaller = new BanyanDBIndexInstaller(client, getManager(), this.config);
153153

154154
// Stream
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
20+
21+
import io.grpc.stub.AbstractAsyncStub;
22+
import io.grpc.stub.StreamObserver;
23+
import java.io.Closeable;
24+
import java.util.ArrayList;
25+
import java.util.List;
26+
import java.util.concurrent.ArrayBlockingQueue;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.concurrent.ScheduledThreadPoolExecutor;
29+
import java.util.concurrent.Semaphore;
30+
import java.util.concurrent.TimeUnit;
31+
import lombok.Getter;
32+
import lombok.SneakyThrows;
33+
import lombok.extern.slf4j.Slf4j;
34+
import org.apache.skywalking.banyandb.v1.client.AbstractWrite;
35+
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
36+
37+
@Slf4j
38+
public abstract class AbstractBulkWriteProcessor<REQ extends com.google.protobuf.GeneratedMessageV3,
39+
STUB extends AbstractAsyncStub<STUB>>
40+
implements Runnable, Closeable {
41+
private final STUB stub;
42+
private final int maxBulkSize;
43+
private final int flushInterval;
44+
private final ArrayBlockingQueue<Holder> requests;
45+
private final Semaphore semaphore;
46+
private final long flushInternalInMillis;
47+
private final ScheduledThreadPoolExecutor scheduler;
48+
private final int timeout;
49+
private volatile long lastFlushTS = 0;
50+
51+
/**
52+
* Create the processor.
53+
*
54+
* @param stub an implementation of {@link AbstractAsyncStub}
55+
* @param processorName name of the processor for logging
56+
* @param maxBulkSize the max bulk size for the flush operation
57+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
58+
* automatically. Unit is second.
59+
* @param concurrency the number of concurrency would run for the flush max.
60+
* @param timeout network timeout threshold in seconds.
61+
*/
62+
protected AbstractBulkWriteProcessor(STUB stub,
63+
String processorName,
64+
int maxBulkSize,
65+
int flushInterval,
66+
int concurrency,
67+
int timeout) {
68+
this.stub = stub;
69+
this.maxBulkSize = maxBulkSize;
70+
this.flushInterval = flushInterval;
71+
this.timeout = timeout;
72+
requests = new ArrayBlockingQueue<>(maxBulkSize + 1);
73+
this.semaphore = new Semaphore(concurrency > 0 ? concurrency : 1);
74+
75+
scheduler = new ScheduledThreadPoolExecutor(1, r -> {
76+
final Thread thread = new Thread(r);
77+
thread.setName("BanyanDB BulkProcessor");
78+
return thread;
79+
});
80+
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
81+
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
82+
scheduler.setRemoveOnCancelPolicy(true);
83+
flushInternalInMillis = flushInterval * 1000;
84+
scheduler.scheduleWithFixedDelay(
85+
this, 0, flushInterval, TimeUnit.SECONDS);
86+
}
87+
88+
/**
89+
* Add the measure to the bulk processor.
90+
*
91+
* @param writeEntity to add.
92+
*/
93+
@SneakyThrows
94+
public CompletableFuture<Void> add(AbstractWrite<REQ> writeEntity) {
95+
final CompletableFuture<Void> f = new CompletableFuture<>();
96+
requests.put(Holder.create(writeEntity, f));
97+
flushIfNeeded();
98+
return f;
99+
}
100+
101+
public void run() {
102+
try {
103+
doPeriodicalFlush();
104+
} catch (Throwable t) {
105+
log.error("Failed to flush data to BanyanDB", t);
106+
}
107+
}
108+
109+
@SneakyThrows
110+
protected void flushIfNeeded() {
111+
if (requests.size() >= maxBulkSize) {
112+
flush();
113+
}
114+
}
115+
116+
private void doPeriodicalFlush() {
117+
if (System.currentTimeMillis() - lastFlushTS > flushInternalInMillis / 2) {
118+
// Run periodical flush if there is no `flushIfNeeded` executed in the second half of the flush period.
119+
// Otherwise, wait for the next round. By default, the last 2 seconds of the 5s period.
120+
// This could avoid periodical flush running among bulks(controlled by bulkActions).
121+
flush();
122+
}
123+
}
124+
125+
public void flush() {
126+
if (requests.isEmpty()) {
127+
return;
128+
}
129+
130+
try {
131+
semaphore.acquire();
132+
} catch (InterruptedException e) {
133+
log.error("Interrupted when trying to get semaphore to execute bulk requests", e);
134+
return;
135+
}
136+
137+
final List<Holder> batch = new ArrayList<>(requests.size());
138+
requests.drainTo(batch);
139+
final CompletableFuture<Void> future = doObservedFlush(batch);
140+
future.whenComplete((v, t) -> semaphore.release());
141+
future.join();
142+
lastFlushTS = System.currentTimeMillis();
143+
144+
}
145+
146+
protected abstract CompletableFuture<Void> doObservedFlush(final List<Holder> data);
147+
148+
protected CompletableFuture<Void> doFlush(final List<Holder> data, HistogramMetrics.Timer timer) {
149+
// The batch is used to control the completion of the flush operation.
150+
// There is at most one error per batch,
151+
// because the database server would terminate the batch process when the first error occurs.
152+
final CompletableFuture<Void> batch = new CompletableFuture<>();
153+
final StreamObserver<REQ> writeRequestStreamObserver
154+
= this.buildStreamObserver(stub.withDeadlineAfter(timeout, TimeUnit.SECONDS), batch);
155+
156+
try {
157+
data.forEach(h -> {
158+
AbstractWrite<REQ> entity = (AbstractWrite<REQ>) h.getWriteEntity();
159+
REQ request;
160+
try {
161+
request = entity.build();
162+
} catch (Throwable bt) {
163+
log.error("building the entity fails: {}", entity.toString(), bt);
164+
h.getFuture().completeExceptionally(bt);
165+
return;
166+
}
167+
writeRequestStreamObserver.onNext(request);
168+
h.getFuture().complete(null);
169+
});
170+
} finally {
171+
writeRequestStreamObserver.onCompleted();
172+
}
173+
batch.whenComplete((ignored, exp) -> {
174+
timer.close();
175+
if (exp != null) {
176+
log.error("Failed to execute requests in bulk", exp);
177+
}
178+
});
179+
return batch;
180+
}
181+
182+
public void close() {
183+
scheduler.shutdownNow();
184+
}
185+
186+
protected abstract StreamObserver<REQ> buildStreamObserver(STUB stub, CompletableFuture<Void> batch);
187+
188+
@Getter
189+
static class Holder {
190+
private final AbstractWrite<?> writeEntity;
191+
private final CompletableFuture<Void> future;
192+
193+
private Holder(AbstractWrite<?> writeEntity, CompletableFuture<Void> future) {
194+
this.writeEntity = writeEntity;
195+
this.future = future;
196+
}
197+
198+
public static <REQ extends com.google.protobuf.GeneratedMessageV3> Holder create(AbstractWrite<REQ> writeEntity,
199+
CompletableFuture<Void> future) {
200+
future.whenComplete((v, t) -> {
201+
if (t != null) {
202+
log.error("Failed to execute the request: {}", writeEntity.toString(), t);
203+
}
204+
});
205+
return new Holder(writeEntity, future);
206+
}
207+
}
208+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.skywalking.oap.server.storage.plugin.banyandb.bulk;
20+
21+
import io.grpc.stub.StreamObserver;
22+
import java.util.HashSet;
23+
import java.util.List;
24+
import java.util.Set;
25+
import java.util.concurrent.CompletableFuture;
26+
import javax.annotation.concurrent.ThreadSafe;
27+
import lombok.extern.slf4j.Slf4j;
28+
import org.apache.skywalking.banyandb.common.v1.BanyandbCommon;
29+
import org.apache.skywalking.banyandb.measure.v1.BanyandbMeasure;
30+
import org.apache.skywalking.banyandb.measure.v1.MeasureServiceGrpc;
31+
import org.apache.skywalking.banyandb.model.v1.BanyandbModel;
32+
import org.apache.skywalking.banyandb.v1.client.BanyanDBClient;
33+
import org.apache.skywalking.banyandb.v1.client.Options;
34+
import org.apache.skywalking.banyandb.v1.client.grpc.exception.BanyanDBException;
35+
import org.apache.skywalking.banyandb.v1.client.util.StatusUtil;
36+
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
37+
38+
/**
39+
* MeasureBulkWriteProcessor works for measure flush.
40+
*/
41+
@Slf4j
42+
@ThreadSafe
43+
public class MeasureBulkWriteProcessor extends AbstractBulkWriteProcessor<BanyandbMeasure.WriteRequest,
44+
MeasureServiceGrpc.MeasureServiceStub> {
45+
private final BanyanDBClient client;
46+
private final HistogramMetrics writeHistogram;
47+
private final Options options;
48+
49+
/**
50+
* Create the processor.
51+
*
52+
* @param client the client
53+
* @param maxBulkSize the max bulk size for the flush operation
54+
* @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
55+
* automatically. Unit is second.
56+
* @param concurrency the number of concurrency would run for the flush max.
57+
* @param timeout network timeout threshold in seconds.
58+
*/
59+
public MeasureBulkWriteProcessor(
60+
final BanyanDBClient client,
61+
final int maxBulkSize,
62+
final int flushInterval,
63+
final int concurrency,
64+
final int timeout, final HistogramMetrics writeHistogram, final Options options) {
65+
super(client.getMeasureServiceStub(), "MeasureBulkWriteProcessor", maxBulkSize, flushInterval, concurrency, timeout);
66+
this.client = client;
67+
this.writeHistogram = writeHistogram;
68+
this.options = options;
69+
}
70+
71+
@Override
72+
protected StreamObserver<BanyandbMeasure.WriteRequest> buildStreamObserver(MeasureServiceGrpc.MeasureServiceStub stub,
73+
CompletableFuture<Void> batch) {
74+
return stub.write(new StreamObserver<BanyandbMeasure.WriteResponse>() {
75+
private final Set<String> schemaExpired = new HashSet<>();
76+
77+
@Override
78+
public void onNext(BanyandbMeasure.WriteResponse writeResponse) {
79+
BanyandbModel.Status status = StatusUtil.convertStringToStatus(writeResponse.getStatus());
80+
switch (status) {
81+
case STATUS_SUCCEED:
82+
break;
83+
case STATUS_EXPIRED_SCHEMA:
84+
BanyandbCommon.Metadata metadata = writeResponse.getMetadata();
85+
String schemaKey = metadata.getGroup() + "." + metadata.getName();
86+
if (!schemaExpired.contains(schemaKey)) {
87+
log.warn("The schema {} is expired, trying update the schema...", schemaKey);
88+
try {
89+
client.updateMeasureMetadataCacheFromSever(metadata.getGroup(), metadata.getName());
90+
schemaExpired.add(schemaKey);
91+
} catch (BanyanDBException e) {
92+
log.error(e.getMessage(), e);
93+
}
94+
}
95+
break;
96+
default:
97+
log.warn("Write measure failed with status: {}", status);
98+
}
99+
}
100+
101+
@Override
102+
public void onError(Throwable t) {
103+
batch.completeExceptionally(t);
104+
log.error("Error occurs in flushing measures", t);
105+
}
106+
107+
@Override
108+
public void onCompleted() {
109+
batch.complete(null);
110+
}
111+
});
112+
}
113+
114+
@Override
115+
protected CompletableFuture<Void> doObservedFlush(final List<Holder> data) {
116+
HistogramMetrics.Timer timer = writeHistogram.createTimer();
117+
return super.doFlush(data, timer);
118+
}
119+
}

0 commit comments

Comments
 (0)