4343import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .IndexRule ;
4444import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .IndexRuleBinding ;
4545import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Subject ;
46+ import org .apache .skywalking .banyandb .database .v1 .BanyandbDatabase .Trace ;
4647import org .apache .skywalking .banyandb .model .v1 .BanyandbModel ;
4748import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty ;
4849import org .apache .skywalking .banyandb .property .v1 .BanyandbProperty .Property ;
5354import org .apache .skywalking .banyandb .measure .v1 .MeasureServiceGrpc ;
5455import org .apache .skywalking .banyandb .stream .v1 .BanyandbStream ;
5556import org .apache .skywalking .banyandb .stream .v1 .StreamServiceGrpc ;
57+ import org .apache .skywalking .banyandb .trace .v1 .BanyandbTrace ;
58+ import org .apache .skywalking .banyandb .trace .v1 .TraceServiceGrpc ;
5659import org .apache .skywalking .banyandb .v1 .client .auth .AuthInterceptor ;
5760import org .apache .skywalking .banyandb .v1 .client .grpc .HandleExceptionsWith ;
5861import org .apache .skywalking .banyandb .v1 .client .grpc .channel .ChannelManager ;
6972import org .apache .skywalking .banyandb .v1 .client .metadata .ResourceExist ;
7073import org .apache .skywalking .banyandb .v1 .client .metadata .StreamMetadataRegistry ;
7174import org .apache .skywalking .banyandb .v1 .client .metadata .TopNAggregationMetadataRegistry ;
75+ import org .apache .skywalking .banyandb .v1 .client .metadata .TraceMetadataRegistry ;
7276
7377import java .io .Closeable ;
7478import java .io .IOException ;
@@ -124,6 +128,11 @@ public class BanyanDBClient implements Closeable {
124128 */
125129 @ Getter (value = AccessLevel .PACKAGE )
126130 private MeasureServiceGrpc .MeasureServiceStub measureServiceStub ;
131+ /**
132+ * gRPC client stub
133+ */
134+ @ Getter (value = AccessLevel .PACKAGE )
135+ private TraceServiceGrpc .TraceServiceStub traceServiceStub ;
127136 /**
128137 * gRPC future stub.
129138 */
@@ -134,6 +143,11 @@ public class BanyanDBClient implements Closeable {
134143 */
135144 @ Getter (value = AccessLevel .PACKAGE )
136145 private MeasureServiceGrpc .MeasureServiceBlockingStub measureServiceBlockingStub ;
146+ /**
147+ * gRPC future stub.
148+ */
149+ @ Getter (value = AccessLevel .PACKAGE )
150+ private TraceServiceGrpc .TraceServiceBlockingStub traceServiceBlockingStub ;
137151 /**
138152 * The connection status.
139153 */
@@ -211,8 +225,10 @@ public void connect() throws IOException {
211225 this .channel = interceptedChannel ;
212226 streamServiceBlockingStub = StreamServiceGrpc .newBlockingStub (this .channel );
213227 measureServiceBlockingStub = MeasureServiceGrpc .newBlockingStub (this .channel );
228+ traceServiceBlockingStub = TraceServiceGrpc .newBlockingStub (this .channel );
214229 streamServiceStub = StreamServiceGrpc .newStub (this .channel );
215230 measureServiceStub = MeasureServiceGrpc .newStub (this .channel );
231+ traceServiceStub = TraceServiceGrpc .newStub (this .channel );
216232 isConnected = true ;
217233 }
218234 } finally {
@@ -228,8 +244,10 @@ void connect(Channel channel) {
228244 this .channel = channel ;
229245 streamServiceBlockingStub = StreamServiceGrpc .newBlockingStub (this .channel );
230246 measureServiceBlockingStub = MeasureServiceGrpc .newBlockingStub (this .channel );
247+ traceServiceBlockingStub = TraceServiceGrpc .newBlockingStub (this .channel );
231248 streamServiceStub = StreamServiceGrpc .newStub (this .channel );
232249 measureServiceStub = MeasureServiceGrpc .newStub (this .channel );
250+ traceServiceStub = TraceServiceGrpc .newStub (this .channel );
233251 isConnected = true ;
234252 }
235253 } finally {
@@ -396,6 +414,49 @@ public StreamWrite createStreamWrite(String group, String name, final String ele
396414 return new StreamWrite (this .metadataCache .findStreamMetadata (group , name ), elementId , timestamp );
397415 }
398416
417+ /**
418+ * Build a trace bulk write processor.
419+ *
420+ * @param maxBulkSize the max size of each flush. The actual size is determined by the length of byte array.
421+ * @param flushInterval if given maxBulkSize is not reached in this period, the flush would be trigger
422+ * automatically. Unit is second.
423+ * @param concurrency the number of concurrency would run for the flush max.
424+ * @param timeout network timeout threshold in seconds.
425+ * @return trace bulk write processor
426+ */
427+ public TraceBulkWriteProcessor buildTraceWriteProcessor (int maxBulkSize , int flushInterval , int concurrency , int timeout ) {
428+ checkState (this .traceServiceStub != null , "trace service is null" );
429+
430+ return new TraceBulkWriteProcessor (this , maxBulkSize , flushInterval , concurrency , timeout , WRITE_HISTOGRAM , options );
431+ }
432+
433+ /**
434+ * Build a TraceWrite request.
435+ *
436+ * @param group the group of the trace
437+ * @param name the name of the trace
438+ * @param timestamp the timestamp of the trace
439+ * @return the request to be built
440+ */
441+ public TraceWrite createTraceWrite (String group , String name , long timestamp ) throws BanyanDBException {
442+ Preconditions .checkArgument (!Strings .isNullOrEmpty (group ));
443+ Preconditions .checkArgument (!Strings .isNullOrEmpty (name ));
444+ return new TraceWrite (this .metadataCache .findTraceMetadata (group , name ), timestamp );
445+ }
446+
447+ /**
448+ * Build a TraceWrite request without initial timestamp.
449+ *
450+ * @param group the group of the trace
451+ * @param name the name of the trace
452+ * @return the request to be built
453+ */
454+ public TraceWrite createTraceWrite (String group , String name ) throws BanyanDBException {
455+ Preconditions .checkArgument (!Strings .isNullOrEmpty (group ));
456+ Preconditions .checkArgument (!Strings .isNullOrEmpty (name ));
457+ return new TraceWrite (this .metadataCache .findTraceMetadata (group , name ));
458+ }
459+
399460 /**
400461 * Query streams according to given conditions
401462 *
@@ -456,6 +517,29 @@ public MeasureQueryResponse query(MeasureQuery measureQuery) throws BanyanDBExce
456517 throw new RuntimeException ("No metadata found for the query" );
457518 }
458519
520+ /**
521+ * Query traces according to given conditions
522+ *
523+ * @param traceQuery condition for query
524+ * @return trace query response.
525+ */
526+ public TraceQueryResponse query (TraceQuery traceQuery ) throws BanyanDBException {
527+ checkState (this .traceServiceStub != null , "trace service is null" );
528+
529+ for (String group : traceQuery .groups ) {
530+ MetadataCache .EntityMetadata em = this .metadataCache .findTraceMetadata (group , traceQuery .name );
531+ if (em != null ) {
532+ final BanyandbTrace .QueryResponse response = HandleExceptionsWith .callAndTranslateApiException (() ->
533+ this .traceServiceBlockingStub
534+ .withDeadlineAfter (this .getOptions ().getDeadline (), TimeUnit .SECONDS )
535+ .query (traceQuery .build (em )));
536+ return new TraceQueryResponse (response );
537+ }
538+
539+ }
540+ throw new RuntimeException ("No metadata found for the query" );
541+ }
542+
459543 /**
460544 * Define a new group and attach to the current client.
461545 *
@@ -968,6 +1052,69 @@ public DeleteResponse deleteProperty(String group, String name, String id) throw
9681052 }
9691053 }
9701054
1055+ /**
1056+ * Define a new trace
1057+ *
1058+ * @param trace the trace to be stored in the BanyanDB
1059+ * @throws BanyanDBException if the trace is invalid
1060+ */
1061+ public void define (Trace trace ) throws BanyanDBException {
1062+ TraceMetadataRegistry registry = new TraceMetadataRegistry (checkNotNull (this .channel ));
1063+ registry .create (trace );
1064+ }
1065+
1066+ /**
1067+ * Update the trace.
1068+ *
1069+ * @param trace the trace to be stored in the BanyanDB
1070+ * @throws BanyanDBException if the trace is invalid
1071+ */
1072+ public void update (Trace trace ) throws BanyanDBException {
1073+ TraceMetadataRegistry registry = new TraceMetadataRegistry (checkNotNull (this .channel ));
1074+ registry .update (trace );
1075+ }
1076+
1077+ /**
1078+ * Find the trace with given group and name
1079+ *
1080+ * @param group group of the metadata
1081+ * @param name name of the metadata
1082+ * @return the trace found in BanyanDB. Otherwise, null is returned.
1083+ */
1084+ public Trace findTrace (String group , String name ) throws BanyanDBException {
1085+ try {
1086+ return new TraceMetadataRegistry (checkNotNull (this .channel )).get (group , name );
1087+ } catch (BanyanDBException ex ) {
1088+ if (ex .getStatus ().equals (Status .Code .NOT_FOUND )) {
1089+ return null ;
1090+ }
1091+ throw ex ;
1092+ }
1093+ }
1094+
1095+ /**
1096+ * Find the traces with given group
1097+ *
1098+ * @param group group of the metadata
1099+ * @return the traces found in BanyanDB
1100+ */
1101+ public List <Trace > findTraces (String group ) throws BanyanDBException {
1102+ TraceMetadataRegistry registry = new TraceMetadataRegistry (checkNotNull (this .channel ));
1103+ return registry .list (group );
1104+ }
1105+
1106+ /**
1107+ * Delete the trace
1108+ *
1109+ * @param group group of the metadata
1110+ * @param name name of the metadata
1111+ * @return if this trace has been deleted
1112+ */
1113+ public boolean deleteTrace (String group , String name ) throws BanyanDBException {
1114+ TraceMetadataRegistry registry = new TraceMetadataRegistry (checkNotNull (this .channel ));
1115+ return registry .delete (group , name );
1116+ }
1117+
9711118 /**
9721119 * Try to find the group defined
9731120 *
@@ -1169,6 +1316,20 @@ public ResourceExist existProperty(String group, String name) throws BanyanDBExc
11691316 return new PropertyMetadataRegistry (checkNotNull (this .channel )).exist (group , name );
11701317 }
11711318
1319+ /**
1320+ * Check whether the trace definition is existed in the server
1321+ *
1322+ * @param group group of the metadata
1323+ * @param name name of the metadata
1324+ * @return ResourceExist which indicates whether group and trace exist
1325+ */
1326+ public ResourceExist existTrace (String group , String name ) throws BanyanDBException {
1327+ Preconditions .checkArgument (!Strings .isNullOrEmpty (group ));
1328+ Preconditions .checkArgument (!Strings .isNullOrEmpty (name ));
1329+
1330+ return new TraceMetadataRegistry (checkNotNull (this .channel )).exist (group , name );
1331+ }
1332+
11721333 /**
11731334 * Update the stream metadata cache from the server
11741335 * @param group the group of the stream
@@ -1193,6 +1354,18 @@ public MetadataCache.EntityMetadata updateMeasureMetadataCacheFromSever(String g
11931354 return this .metadataCache .updateMeasureFromSever (group , name );
11941355 }
11951356
1357+ /**
1358+ * Update the trace metadata cache from the server
1359+ * @param group the group of the trace
1360+ * @param name the name of the trace
1361+ * @return the updated trace metadata, or null if the trace does not exist
1362+ */
1363+ public MetadataCache .EntityMetadata updateTraceMetadataCacheFromServer (String group , String name ) throws BanyanDBException {
1364+ Preconditions .checkArgument (!Strings .isNullOrEmpty (group ));
1365+ Preconditions .checkArgument (!Strings .isNullOrEmpty (name ));
1366+ return this .metadataCache .updateTraceFromServer (group , name );
1367+ }
1368+
11961369 /**
11971370 * Get the API version of the server
11981371 *
0 commit comments