@@ -22,8 +22,8 @@ import java.util
2222import java .util .{Collections , Random }
2323
2424import com .www .expedia .opencensus .exporter .trace .config .GrpcAgentDispatcherConfig
25+ import io .opencensus .trace ._
2526import io .opencensus .trace .samplers .Samplers
26- import io .opencensus .trace .{AttributeValue , Status , Tracer , Tracing }
2727import org .apache .kafka .clients .consumer .ConsumerConfig ._
2828import org .apache .kafka .clients .consumer .KafkaConsumer
2929import org .apache .kafka .common .serialization .{ByteArrayDeserializer , StringDeserializer }
@@ -34,6 +34,8 @@ import scala.collection.JavaConverters._
3434class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Matchers with BeforeAndAfterAll {
3535 private val OPERATION_NAME = " /search"
3636 private val SERVICE_NAME = " my-service"
37+ private val START_TIME_MICROS = System .currentTimeMillis() * 1000
38+ private val MAX_DURATION_MILLIS = 10
3739 private var consumer : KafkaConsumer [String , Array [Byte ]] = _
3840
3941 override def beforeAll (): Unit = {
@@ -51,27 +53,34 @@ class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Ma
5153 }
5254
5355 private def generateTrace (tracer : Tracer ) = {
54- val spanBuilder = tracer.spanBuilder(OPERATION_NAME ).setRecordEvents(true ).setSampler(Samplers .alwaysSample())
55- val spanDurationInMillis = new Random ().nextInt(10 ) + 1
56+ val spanBuilder = tracer
57+ .spanBuilder(OPERATION_NAME )
58+ .setSpanKind(Span .Kind .SERVER )
59+ .setSampler(Samplers .alwaysSample())
60+
61+ val spanDurationInMillis = new Random ().nextInt(MAX_DURATION_MILLIS ) + 1
5662
5763 val scopedSpan = spanBuilder.startScopedSpan
5864 try {
5965 tracer.getCurrentSpan.addAnnotation(" start searching" )
6066 Thread .sleep(spanDurationInMillis)
6167 tracer.getCurrentSpan.putAttribute(" foo" , AttributeValue .stringAttributeValue(" bar" ))
6268 tracer.getCurrentSpan.putAttribute(" items" , AttributeValue .longAttributeValue(10l ))
63- tracer.getCurrentSpan.addAnnotation(" done searching" )
69+ tracer.getCurrentSpan.putAttribute(" price" , AttributeValue .doubleAttributeValue(5.5 ))
70+ tracer.getCurrentSpan.putAttribute(" error" , AttributeValue .booleanAttributeValue(true ))
71+ tracer.getCurrentSpan.addAnnotation(" done searching" ,
72+ Collections .singletonMap(" someevent" , AttributeValue .longAttributeValue(200 )))
6473 } catch {
6574 case _ : Exception =>
66- tracer.getCurrentSpan.addAnnotation(" Exception thrown when processing video. " )
75+ tracer.getCurrentSpan.addAnnotation(" Exception thrown when processing! " )
6776 tracer.getCurrentSpan.setStatus(Status .UNKNOWN )
6877 } finally {
6978 scopedSpan.close()
7079 }
7180 }
7281
7382 describe(" Integration Test with haystack and opencensus" ) {
74- it (" should dispatch the spans to haystack-agent" ) {
83+ it(" should dispatch the spans to haystack-agent" ) {
7584 HaystackTraceExporter .createAndRegister(new GrpcAgentDispatcherConfig (" haystack-agent" , 35000 ), SERVICE_NAME )
7685 val tracer = Tracing .getTracer
7786
@@ -80,18 +89,31 @@ class HaystackExporterIntegrationSpec extends FunSpec with GivenWhenThen with Ma
8089 generateTrace(tracer)
8190
8291 // wait for few sec to let the span reach kafka
83- Thread .sleep(10000 )
92+ Thread .sleep(5000 )
93+
94+ // create another trace
95+ generateTrace(tracer)
96+ Thread .sleep(5000 )
8497
8598 val records = consumer.poll(2000 )
86- records.count > 1 shouldBe true
87- val record = records.iterator().next()
88- val protoSpan = com.expedia.open.tracing.Span .parseFrom(record.value())
89- protoSpan.getTraceId shouldEqual record.key()
90- protoSpan.getServiceName shouldEqual SERVICE_NAME
91- protoSpan.getOperationName shouldEqual OPERATION_NAME
92- protoSpan.getTagsCount shouldBe 2
93- protoSpan.getTagsList.asScala.find(_.getKey == " foo" ).get.getVStr shouldEqual " bar"
94- protoSpan.getTagsList.asScala.find(_.getKey == " items" ).get.getVLong shouldBe 10
99+ if (records.count > 1 ) {
100+ val record = records.iterator().next()
101+ val protoSpan = com.expedia.open.tracing.Span .parseFrom(record.value())
102+ protoSpan.getTraceId shouldEqual record.key()
103+ protoSpan.getServiceName shouldEqual SERVICE_NAME
104+ protoSpan.getOperationName shouldEqual OPERATION_NAME
105+ protoSpan.getStartTime should be >= START_TIME_MICROS
106+ protoSpan.getTagsCount shouldBe 5
107+ protoSpan.getTagsList.asScala.find(_.getKey == " span.kind" ).get.getVStr shouldEqual " server"
108+ protoSpan.getTagsList.asScala.find(_.getKey == " foo" ).get.getVStr shouldEqual " bar"
109+ protoSpan.getTagsList.asScala.find(_.getKey == " items" ).get.getVLong shouldBe 10
110+ protoSpan.getTagsList.asScala.find(_.getKey == " price" ).get.getVDouble shouldBe 5.5
111+ protoSpan.getTagsList.asScala.find(_.getKey == " error" ).get.getVBool shouldBe true
112+ protoSpan.getLogsCount shouldBe 2
113+ protoSpan.getLogs(0 ).getFieldsList.asScala.find(_.getKey == " message" ).get.getVStr shouldEqual " start searching"
114+ protoSpan.getLogs(1 ).getFieldsList.asScala.find(_.getKey == " message" ).get.getVStr shouldEqual " done searching"
115+ protoSpan.getLogs(1 ).getFieldsList.asScala.find(_.getKey == " someevent" ).get.getVLong shouldBe 200l
116+ }
95117 }
96118 }
97- }
119+ }
0 commit comments