11import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
22import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace
33
4+ import com.couchbase.client.core.cnc.RequestSpan
5+ import com.couchbase.client.core.cnc.RequestTracer
46import com.couchbase.client.core.env.TimeoutConfig
57import com.couchbase.client.core.error.CouchbaseException
68import com.couchbase.client.core.error.DocumentNotFoundException
79import com.couchbase.client.core.error.ParsingFailureException
10+ import com.couchbase.client.core.msg.RequestContext
811import com.couchbase.client.java.Bucket
912import com.couchbase.client.java.Cluster
1013import com.couchbase.client.java.ClusterOptions
@@ -19,10 +22,13 @@ import datadog.trace.api.DDTags
1922import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags
2023import datadog.trace.bootstrap.instrumentation.api.Tags
2124import datadog.trace.core.DDSpan
25+ import java.time.Instant
26+ import java.util.concurrent.CopyOnWriteArrayList
2227import org.slf4j.Logger
2328import org.slf4j.LoggerFactory
2429import org.testcontainers.couchbase.BucketDefinition
2530import org.testcontainers.couchbase.CouchbaseContainer
31+ import reactor.core.publisher.Mono
2632import spock.lang.Shared
2733
2834import java.time.Duration
@@ -394,6 +400,69 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
394400 }
395401 }
396402
403+ def " check basic spans with custom request tracer" () {
404+ setup :
405+ def customTracer = new TestRequestTracer ()
406+
407+ ClusterEnvironment environmentWithCustomTracer = ClusterEnvironment . builder()
408+ .timeoutConfig(TimeoutConfig . kvTimeout(Duration . ofSeconds(10 )))
409+ .requestTracer(customTracer)
410+ .build()
411+
412+ def connectionString = " couchbase://${ couchbase.host} :${ couchbase.bootstrapCarrierDirectPort} ,${ couchbase.host} :${ couchbase.bootstrapHttpDirectPort} =manager"
413+
414+ Cluster localCluster = Cluster . connect(
415+ connectionString,
416+ ClusterOptions
417+ .clusterOptions(couchbase. username, couchbase. password)
418+ .environment(environmentWithCustomTracer)
419+ )
420+ Bucket localBucket = localCluster. bucket(BUCKET )
421+ localBucket. waitUntilReady(Duration . ofSeconds(30 ))
422+ def collection = localBucket. defaultCollection()
423+
424+ when :
425+ collection. get(" data 0" )
426+
427+ then :
428+ assertTraces(1 ) {
429+ sortSpansByStart()
430+ trace(2 ) {
431+ assertCouchbaseCall(it, " get" , [
432+ ' db.couchbase.collection' : ' _default' ,
433+ ' db.couchbase.document_id' : { String },
434+ ' db.couchbase.retries' : { Long },
435+ ' db.couchbase.scope' : ' _default' ,
436+ ' db.couchbase.service' : ' kv' ,
437+ ' db.name' : BUCKET ,
438+ ' db.operation' : ' get'
439+ ])
440+ assertCouchbaseDispatchCall(it, span(0 ), [
441+ ' db.couchbase.collection' : ' _default' ,
442+ ' db.couchbase.document_id' : { String },
443+ ' db.couchbase.scope' : ' _default' ,
444+ ' db.name' : BUCKET
445+ ])
446+ }
447+ }
448+
449+ and : " custom tracer also saw spans"
450+ customTracer. spans. size() > 0
451+ customTracer. spans* . ended. every { it == true }
452+
453+ cleanup :
454+ try {
455+ localCluster?. disconnect()
456+ } catch (Throwable t) {
457+ LOGGER . debug(" Unable to properly disconnect localCluster in custom tracer test" , t)
458+ }
459+ try {
460+ environmentWithCustomTracer?. shutdown()
461+ } catch (Throwable t) {
462+ LOGGER . debug(" Unable to properly shutdown environmentWithCustomTracer" , t)
463+ }
464+ }
465+
397466 void assertCouchbaseCall (TraceAssert trace , String name , Map<String , Serializable > extraTags , boolean internal = false , Throwable ex = null ) {
398467 assertCouchbaseCall(trace, name, extraTags, null , internal, ex)
399468 }
@@ -453,6 +522,75 @@ abstract class CouchbaseClient32Test extends VersionedNamingTestBase {
453522 }
454523 assertCouchbaseCall(trace, ' dispatch_to_server' , allExtraTags, parentSpan, true , null )
455524 }
525+
526+ static class TestRequestTracer implements RequestTracer {
527+
528+ final List<TestRequestSpan > spans = new CopyOnWriteArrayList<> ()
529+
530+ @Override
531+ RequestSpan requestSpan (String requestName , RequestSpan parent ) {
532+ def span = new TestRequestSpan (requestName, parent)
533+ spans. add(span)
534+ return span
535+ }
536+
537+ @Override
538+ Mono<Void > start () {
539+ return Mono . empty()
540+ }
541+
542+ @Override
543+ Mono<Void > stop (Duration timeout ) {
544+ return Mono . empty()
545+ }
546+ }
547+
548+ static class TestRequestSpan implements RequestSpan {
549+
550+ final String name
551+ final RequestSpan parent
552+ final Map<String , Object > attributes = new LinkedHashMap<> ()
553+ final List<String > events = []
554+ volatile boolean ended = false
555+
556+ TestRequestSpan (String name , RequestSpan parent ) {
557+ this . name = name
558+ this . parent = parent
559+ }
560+
561+ @Override
562+ void end () {
563+ ended = true
564+ }
565+
566+ @Override
567+ void attribute (String key , String value ) {
568+ attributes. put(key, value)
569+ }
570+
571+ @Override
572+ void attribute (String key , boolean value ) {
573+ attributes. put(key, value)
574+ }
575+
576+ @Override
577+ void attribute (String key , long value ) {
578+ attributes. put(key, value)
579+ }
580+
581+ @Override
582+ void event (String name , Instant timestamp ) {
583+ events. add(name)
584+ }
585+
586+ @Override
587+ void status (StatusCode status ) {
588+ }
589+
590+ @Override
591+ void requestContext (RequestContext requestContext ) {
592+ }
593+ }
456594}
457595
458596class CouchbaseClient32V0Test extends CouchbaseClient32Test {
0 commit comments