2020import com .google .api .gax .rpc .ServerStreamingCallable ;
2121import com .google .api .gax .rpc .UnaryCallable ;
2222import com .google .cloud .bigquery .storage .v1 .stub .EnhancedBigQueryReadStub ;
23+ import io .opentelemetry .api .common .Attributes ;
24+ import io .opentelemetry .api .trace .Span ;
25+ import io .opentelemetry .context .Scope ;
2326import java .io .IOException ;
2427import java .util .concurrent .TimeUnit ;
2528
@@ -128,7 +131,9 @@ protected BigQueryReadClient(BigQueryReadSettings settings) throws IOException {
128131 this .settings = settings ;
129132 this .stub =
130133 EnhancedBigQueryReadStub .create (
131- settings .getTypedStubSettings (), settings .getReadRowsRetryAttemptListener ());
134+ settings .getTypedStubSettings (),
135+ settings .getReadRowsRetryAttemptListener (),
136+ settings .isOpenTelemetryEnabled ());
132137 }
133138
134139 @ BetaApi ("A restructuring of stub classes is planned, so this may break in the future" )
@@ -229,7 +234,32 @@ public final ReadSession createReadSession(
229234 * @throws com.google.api.gax.rpc.ApiException if the remote call fails
230235 */
231236 public final ReadSession createReadSession (CreateReadSessionRequest request ) {
232- return createReadSessionCallable ().call (request );
237+ Span createReadSession = null ;
238+ if (settings .isOpenTelemetryEnabled ()) {
239+ createReadSession =
240+ settings
241+ .getOpenTelemetryTracer ()
242+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.createReadSession" )
243+ .setAttribute (
244+ "bq.storage.read_session.request.parent" , getFieldAsString (request .getParent ()))
245+ .setAttribute (
246+ "bq.storage.read_session.request.max_stream_count" , request .getMaxStreamCount ())
247+ .setAttribute (
248+ "bq.storage.read_session.request.preferred_min_stream_count" ,
249+ request .getPreferredMinStreamCount ())
250+ .setAttribute (
251+ "bq.storage.read_session.request.serialized_size" , request .getSerializedSize ())
252+ .setAllAttributes (otelAttributesFrom (request .getReadSession ()))
253+ .startSpan ();
254+ }
255+ try (Scope createReadSessionScope =
256+ createReadSession != null ? createReadSession .makeCurrent () : null ) {
257+ return createReadSessionCallable ().call (request );
258+ } finally {
259+ if (createReadSession != null ) {
260+ createReadSession .end ();
261+ }
262+ }
233263 }
234264
235265 /**
@@ -262,7 +292,22 @@ public final ReadSession createReadSession(CreateReadSessionRequest request) {
262292 * </code></pre>
263293 */
264294 public final UnaryCallable <CreateReadSessionRequest , ReadSession > createReadSessionCallable () {
265- return stub .createReadSessionCallable ();
295+ Span createReadSessionCallable = null ;
296+ if (settings .isOpenTelemetryEnabled ()) {
297+ createReadSessionCallable =
298+ settings
299+ .getOpenTelemetryTracer ()
300+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.createReadSessionCallable" )
301+ .startSpan ();
302+ }
303+ try (Scope createReadSessionCallableScope =
304+ createReadSessionCallable != null ? createReadSessionCallable .makeCurrent () : null ) {
305+ return stub .createReadSessionCallable ();
306+ } finally {
307+ if (createReadSessionCallable != null ) {
308+ createReadSessionCallable .end ();
309+ }
310+ }
266311 }
267312
268313 /**
@@ -287,7 +332,22 @@ public final UnaryCallable<CreateReadSessionRequest, ReadSession> createReadSess
287332 * </code></pre>
288333 */
289334 public final ServerStreamingCallable <ReadRowsRequest , ReadRowsResponse > readRowsCallable () {
290- return stub .readRowsCallable ();
335+ Span readRowsCallable = null ;
336+ if (settings .isOpenTelemetryEnabled ()) {
337+ readRowsCallable =
338+ settings
339+ .getOpenTelemetryTracer ()
340+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.readRowsCallable" )
341+ .startSpan ();
342+ }
343+ try (Scope readRowsCallableScope =
344+ readRowsCallable != null ? readRowsCallable .makeCurrent () : null ) {
345+ return stub .readRowsCallable ();
346+ } finally {
347+ if (readRowsCallable != null ) {
348+ readRowsCallable .end ();
349+ }
350+ }
291351 }
292352
293353 /**
@@ -315,7 +375,23 @@ public final ServerStreamingCallable<ReadRowsRequest, ReadRowsResponse> readRows
315375 * @throws com.google.api.gax.rpc.ApiException if the remote call fails
316376 */
317377 public final SplitReadStreamResponse splitReadStream (SplitReadStreamRequest request ) {
318- return splitReadStreamCallable ().call (request );
378+ Span splitReadStream = null ;
379+ if (settings .isOpenTelemetryEnabled ()) {
380+ splitReadStream =
381+ settings
382+ .getOpenTelemetryTracer ()
383+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.splitReadStream" )
384+ .setAllAttributes (otelAttributesFrom (request ))
385+ .startSpan ();
386+ }
387+ try (Scope splitReadStreamScope =
388+ splitReadStream != null ? splitReadStream .makeCurrent () : null ) {
389+ return splitReadStreamCallable ().call (request );
390+ } finally {
391+ if (splitReadStream != null ) {
392+ splitReadStream .end ();
393+ }
394+ }
319395 }
320396
321397 /**
@@ -343,17 +419,60 @@ public final SplitReadStreamResponse splitReadStream(SplitReadStreamRequest requ
343419 */
344420 public final UnaryCallable <SplitReadStreamRequest , SplitReadStreamResponse >
345421 splitReadStreamCallable () {
346- return stub .splitReadStreamCallable ();
422+ Span splitReadStreamCallable = null ;
423+ if (settings .isOpenTelemetryEnabled ()) {
424+ splitReadStreamCallable =
425+ settings
426+ .getOpenTelemetryTracer ()
427+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.splitReadStreamCallable" )
428+ .startSpan ();
429+ }
430+ try (Scope readRowsCallableScope =
431+ splitReadStreamCallable != null ? splitReadStreamCallable .makeCurrent () : null ) {
432+ return stub .splitReadStreamCallable ();
433+ } finally {
434+ if (splitReadStreamCallable != null ) {
435+ splitReadStreamCallable .end ();
436+ }
437+ }
347438 }
348439
349440 @ Override
350441 public final void close () {
351- stub .close ();
442+ Span close = null ;
443+ if (settings .isOpenTelemetryEnabled ()) {
444+ close =
445+ settings
446+ .getOpenTelemetryTracer ()
447+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.close" )
448+ .startSpan ();
449+ }
450+ try (Scope closeScope = close != null ? close .makeCurrent () : null ) {
451+ stub .close ();
452+ } finally {
453+ if (close != null ) {
454+ close .end ();
455+ }
456+ }
352457 }
353458
354459 @ Override
355460 public void shutdown () {
356- stub .shutdown ();
461+ Span shutdown = null ;
462+ if (settings .isOpenTelemetryEnabled ()) {
463+ shutdown =
464+ settings
465+ .getOpenTelemetryTracer ()
466+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.shutdown" )
467+ .startSpan ();
468+ }
469+ try (Scope shutdownScope = shutdown != null ? shutdown .makeCurrent () : null ) {
470+ stub .shutdown ();
471+ } finally {
472+ if (shutdown != null ) {
473+ shutdown .end ();
474+ }
475+ }
357476 }
358477
359478 @ Override
@@ -368,11 +487,89 @@ public boolean isTerminated() {
368487
369488 @ Override
370489 public void shutdownNow () {
371- stub .shutdownNow ();
490+ Span shutdownNow = null ;
491+ if (settings .isOpenTelemetryEnabled ()) {
492+ shutdownNow =
493+ settings
494+ .getOpenTelemetryTracer ()
495+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.shutdownNow" )
496+ .startSpan ();
497+ }
498+ try (Scope shutdownNowScope = shutdownNow != null ? shutdownNow .makeCurrent () : null ) {
499+ stub .shutdownNow ();
500+ } finally {
501+ if (shutdownNow != null ) {
502+ shutdownNow .end ();
503+ }
504+ }
372505 }
373506
374507 @ Override
375508 public boolean awaitTermination (long duration , TimeUnit unit ) throws InterruptedException {
376- return stub .awaitTermination (duration , unit );
509+ Span awaitTermination = null ;
510+ if (settings .isOpenTelemetryEnabled ()) {
511+ awaitTermination =
512+ settings
513+ .getOpenTelemetryTracer ()
514+ .spanBuilder ("com.google.cloud.bigquery.storage.v1.read.awaitTermination" )
515+ .setAttribute ("duration" , duration )
516+ .setAttribute ("unit" , unit .toString ())
517+ .startSpan ();
518+ }
519+ try (Scope awaitTerminationScope =
520+ awaitTermination != null ? awaitTermination .makeCurrent () : null ) {
521+ return stub .awaitTermination (duration , unit );
522+ } finally {
523+ if (awaitTermination != null ) {
524+ awaitTermination .end ();
525+ }
526+ }
527+ }
528+
529+ public void disableOpenTelemetryTracing () {
530+ settings .setEnableOpenTelemetryTracing (false );
531+ }
532+
533+ public void enableOpenTelemetryTracing () {
534+ settings .setEnableOpenTelemetryTracing (true );
535+ }
536+
537+ private static String getFieldAsString (Object field ) {
538+ return field == null ? "null" : field .toString ();
539+ }
540+
541+ private Attributes otelAttributesFrom (ReadSession readSession ) {
542+ return Attributes .builder ()
543+ .put ("bq.storage.read_session.name" , getFieldAsString (readSession .getName ()))
544+ .put (
545+ "bq.storage.read_session.data_format_value" ,
546+ getFieldAsString (readSession .getDataFormatValue ()))
547+ .put (
548+ "bq.storage.read_session.serialized_size" ,
549+ getFieldAsString (readSession .getSerializedSize ()))
550+ .put ("bq.storage.read_session.table" , getFieldAsString (readSession .getTable ()))
551+ .put ("bq.storage.read_session.estimated_row_count" , readSession .getEstimatedRowCount ())
552+ .put (
553+ "bq.storage.read_session.estimated_total_bytes_scanned" ,
554+ readSession .getEstimatedTotalBytesScanned ())
555+ .put (
556+ "bq.storage.read_session.estimated_total_physical_bytes" ,
557+ readSession .getEstimatedTotalPhysicalFileSize ())
558+ .put ("bq.storage.read_session.streams_count" , readSession .getStreamsCount ())
559+ .put ("bq.storage.read_session.trace_id" , getFieldAsString (readSession .getTraceId ()))
560+ .put ("bq.storage.read_session.expire_time" , getFieldAsString (readSession .getExpireTime ()))
561+ .build ();
562+ }
563+
564+ private Attributes otelAttributesFrom (SplitReadStreamRequest request ) {
565+ return Attributes .builder ()
566+ .put ("bq.storage.split_read_stream_request.name" , getFieldAsString (request .getName ()))
567+ .put (
568+ "bq.storage.split_read_stream_request.serialized_size" ,
569+ getFieldAsString (request .getSerializedSize ()))
570+ .put (
571+ "bq.storage.split_read_stream_request.fraction" ,
572+ getFieldAsString (request .getFraction ()))
573+ .build ();
377574 }
378575}
0 commit comments