1919package org .apache .skywalking .oap .server .storage .plugin .banyandb ;
2020
2121import io .grpc .Status ;
22+ import io .grpc .StatusRuntimeException ;
23+ import java .io .IOException ;
24+ import java .util .Arrays ;
25+ import java .util .Collections ;
26+ import java .util .List ;
27+ import lombok .extern .slf4j .Slf4j ;
28+ import org .apache .skywalking .banyandb .common .v1 .BanyandbCommon ;
29+ import org .apache .skywalking .banyandb .common .v1 .BanyandbCommon .Group ;
2230import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase ;
31+ import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Measure ;
32+ import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Stream ;
33+ import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .TopNAggregation ;
2334import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty ;
35+ import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .ApplyRequest .Strategy ;
36+ import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .DeleteResponse ;
37+ import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .Property ;
2438import org .apache .skywalking .banyandb .v1 .client .BanyanDBClient ;
2539import org .apache .skywalking .banyandb .v1 .client .MeasureBulkWriteProcessor ;
2640import org .apache .skywalking .banyandb .v1 .client .MeasureQuery ;
3347import org .apache .skywalking .banyandb .v1 .client .StreamWrite ;
3448import org .apache .skywalking .banyandb .v1 .client .TopNQuery ;
3549import org .apache .skywalking .banyandb .v1 .client .TopNQueryResponse ;
36- import org .apache .skywalking .banyandb .common .v1 .BanyandbCommon .Group ;
37- import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .TopNAggregation ;
38- import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Measure ;
39- import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Stream ;
40- import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .Property ;
41- import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .ApplyRequest .Strategy ;
42- import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .DeleteResponse ;
4350import org .apache .skywalking .banyandb .v1 .client .grpc .exception .AlreadyExistsException ;
4451import org .apache .skywalking .banyandb .v1 .client .grpc .exception .BanyanDBException ;
4552import org .apache .skywalking .oap .server .library .client .Client ;
4653import org .apache .skywalking .oap .server .library .client .healthcheck .DelegatedHealthChecker ;
4754import org .apache .skywalking .oap .server .library .client .healthcheck .HealthCheckable ;
4855import org .apache .skywalking .oap .server .library .util .HealthChecker ;
4956
50- import java .io .IOException ;
51- import java .util .Collections ;
52- import java .util .List ;
53-
5457/**
5558 * BanyanDBStorageClient is a simple wrapper for the underlying {@link BanyanDBClient},
5659 * which implement {@link Client} and {@link HealthCheckable}.
5760 */
61+ @ Slf4j
5862public class BanyanDBStorageClient implements Client , HealthCheckable {
63+ private static final String [] COMPATIBLE_SERVER_API_VERSIONS = {"0.8" };
5964 final BanyanDBClient client ;
6065 private final DelegatedHealthChecker healthChecker = new DelegatedHealthChecker ();
6166 private final int flushTimeout ;
@@ -70,6 +75,29 @@ public BanyanDBStorageClient(BanyanDBStorageConfig config) {
7075 @ Override
7176 public void connect () throws Exception {
7277 this .client .connect ();
78+ BanyandbCommon .APIVersion apiVersion ;
79+ try {
80+ apiVersion = this .client .getAPIVersion ();
81+ } catch (BanyanDBException e ) {
82+ final Throwable cause = e .getCause ();
83+ if (cause instanceof StatusRuntimeException ) {
84+ final Status status = ((StatusRuntimeException ) cause ).getStatus ();
85+ if (Status .Code .UNIMPLEMENTED .equals (status .getCode ())) {
86+ log .error ("fail to get BanyanDB API version, server version < 0.8 is not supported." );
87+ }
88+ }
89+ throw e ;
90+ }
91+ final boolean isCompatible = Arrays .stream (COMPATIBLE_SERVER_API_VERSIONS )
92+ .anyMatch (v -> v .equals (apiVersion .getVersion ()));
93+ final String revision = apiVersion .getRevision ();
94+ log .info ("BanyanDB server API version: {}, revision: {}" , apiVersion .getVersion (), revision );
95+ if (!isCompatible ) {
96+ throw new IllegalStateException (
97+ "Incompatible BanyanDB server API version: " + apiVersion .getVersion () + ". But accepted versions: "
98+ + String .join (", " , COMPATIBLE_SERVER_API_VERSIONS ));
99+ }
100+
73101 }
74102
75103 @ Override
@@ -79,10 +107,10 @@ public void shutdown() throws IOException {
79107
80108 public List <Property > listProperties (String group , String name ) throws IOException {
81109 try {
82- BanyandbProperty .QueryResponse resp = this .client .query (BanyandbProperty .QueryRequest .newBuilder ()
83- .addGroups (group )
84- .setContainer (name )
85- .build ());
110+ BanyandbProperty .QueryResponse resp = this .client .query (BanyandbProperty .QueryRequest .newBuilder ()
111+ .addGroups (group )
112+ .setContainer (name )
113+ .build ());
86114 this .healthChecker .health ();
87115 return resp .getPropertiesList ();
88116 } catch (BanyanDBException ex ) {
@@ -99,10 +127,10 @@ public List<Property> listProperties(String group, String name) throws IOExcepti
99127 public Property queryProperty (String group , String name , String id ) throws IOException {
100128 try {
101129 BanyandbProperty .QueryResponse resp = this .client .query (BanyandbProperty .QueryRequest .newBuilder ()
102- .addGroups (group )
103- .setContainer (name )
104- .addIds (id )
105- .build ());
130+ .addGroups (group )
131+ .setContainer (name )
132+ .addIds (id )
133+ .build ());
106134 this .healthChecker .health ();
107135 if (resp .getPropertiesCount () == 0 ) {
108136 return null ;
0 commit comments