11import 'mocha' ;
22import expect from 'expect' ;
3- import { InMemorySpanExporter , SimpleSpanProcessor } from '@opentelemetry/tracing' ;
4- import { NodeTracerProvider } from '@opentelemetry/node' ;
5- import { HttpTraceContext } from '@opentelemetry/core' ;
63import { AmqplibInstrumentation } from '../src' ;
74
85const instrumentation = new AmqplibInstrumentation ( ) ;
96instrumentation . enable ( ) ;
107import amqpCallback from 'amqplib/callback_api' ;
118import { MessagingDestinationKindValues , SemanticAttributes } from '@opentelemetry/semantic-conventions' ;
12- import { propagation , SpanKind } from '@opentelemetry/api' ;
9+ import { SpanKind , context } from '@opentelemetry/api' ;
1310import { asyncConsume } from './utils' ;
11+ import { TEST_RABBITMQ_HOST , TEST_RABBITMQ_PORT } from './config' ;
12+ import { getTestSpans } from 'opentelemetry-instrumentation-testing-utils' ;
1413
1514const msgPayload = 'payload from test' ;
1615const queueName = 'queue-name-from-unittest' ;
1716
1817describe ( 'amqplib instrumentation callback model' , function ( ) {
19- const provider = new NodeTracerProvider ( ) ;
20- const memoryExporter = new InMemorySpanExporter ( ) ;
21- const spanProcessor = new SimpleSpanProcessor ( memoryExporter ) ;
22- propagation . setGlobalPropagator ( new HttpTraceContext ( ) ) ;
23- provider . addSpanProcessor ( spanProcessor ) ;
24- instrumentation . setTracerProvider ( provider ) ;
25-
26- const url = 'amqp://localhost:22221' ;
18+ const url = `amqp://${ TEST_RABBITMQ_HOST } :${ TEST_RABBITMQ_PORT } ` ;
2719 let conn : amqpCallback . Connection ;
2820 before ( ( done ) => {
2921 amqpCallback . connect ( url , ( err , connection ) => {
@@ -38,19 +30,27 @@ describe('amqplib instrumentation callback model', function () {
3830
3931 let channel : amqpCallback . Channel ;
4032 beforeEach ( ( done ) => {
41- memoryExporter . reset ( ) ;
4233 instrumentation . enable ( ) ;
43- conn . createChannel ( ( err , c ) => {
44- channel = c ;
45- // install an error handler, otherwise when we have tests that create error on the channel,
46- // it throws and crash process
47- channel . on ( 'error' , ( ) => { } ) ;
48- channel . assertQueue ( queueName , { durable : false } , ( err , ok ) => {
49- channel . purgeQueue ( queueName , ( err , ok ) => {
50- done ( ) ;
51- } ) ;
52- } ) ;
53- } ) ;
34+ conn . createChannel (
35+ context . bind ( ( err , c ) => {
36+ channel = c ;
37+ // install an error handler, otherwise when we have tests that create error on the channel,
38+ // it throws and crash process
39+ channel . on ( 'error' , ( ) => { } ) ;
40+ channel . assertQueue (
41+ queueName ,
42+ { durable : false } ,
43+ context . bind ( ( err , ok ) => {
44+ channel . purgeQueue (
45+ queueName ,
46+ context . bind ( ( err , ok ) => {
47+ done ( ) ;
48+ } )
49+ ) ;
50+ } )
51+ ) ;
52+ } )
53+ ) ;
5454 } ) ;
5555
5656 afterEach ( ( done ) => {
@@ -69,7 +69,7 @@ describe('amqplib instrumentation callback model', function () {
6969 asyncConsume ( channel , queueName , [ ( msg ) => expect ( msg . content . toString ( ) ) . toEqual ( msgPayload ) ] , {
7070 noAck : true ,
7171 } ) . then ( ( ) => {
72- const [ publishSpan , consumeSpan ] = memoryExporter . getFinishedSpans ( ) ;
72+ const [ publishSpan , consumeSpan ] = getTestSpans ( ) ;
7373
7474 // assert publish span
7575 expect ( publishSpan . kind ) . toEqual ( SpanKind . PRODUCER ) ;
@@ -82,8 +82,8 @@ describe('amqplib instrumentation callback model', function () {
8282 expect ( publishSpan . attributes [ SemanticAttributes . MESSAGING_PROTOCOL ] ) . toEqual ( 'AMQP' ) ;
8383 expect ( publishSpan . attributes [ SemanticAttributes . MESSAGING_PROTOCOL_VERSION ] ) . toEqual ( '0.9.1' ) ;
8484 expect ( publishSpan . attributes [ SemanticAttributes . MESSAGING_URL ] ) . toEqual ( url ) ;
85- expect ( publishSpan . attributes [ SemanticAttributes . NET_PEER_NAME ] ) . toEqual ( 'localhost' ) ;
86- expect ( publishSpan . attributes [ SemanticAttributes . NET_PEER_PORT ] ) . toEqual ( 22221 ) ;
85+ expect ( publishSpan . attributes [ SemanticAttributes . NET_PEER_NAME ] ) . toEqual ( TEST_RABBITMQ_HOST ) ;
86+ expect ( publishSpan . attributes [ SemanticAttributes . NET_PEER_PORT ] ) . toEqual ( TEST_RABBITMQ_PORT ) ;
8787
8888 // assert consume span
8989 expect ( consumeSpan . kind ) . toEqual ( SpanKind . CONSUMER ) ;
@@ -96,8 +96,8 @@ describe('amqplib instrumentation callback model', function () {
9696 expect ( consumeSpan . attributes [ SemanticAttributes . MESSAGING_PROTOCOL ] ) . toEqual ( 'AMQP' ) ;
9797 expect ( consumeSpan . attributes [ SemanticAttributes . MESSAGING_PROTOCOL_VERSION ] ) . toEqual ( '0.9.1' ) ;
9898 expect ( consumeSpan . attributes [ SemanticAttributes . MESSAGING_URL ] ) . toEqual ( url ) ;
99- expect ( consumeSpan . attributes [ SemanticAttributes . NET_PEER_NAME ] ) . toEqual ( 'localhost' ) ;
100- expect ( consumeSpan . attributes [ SemanticAttributes . NET_PEER_PORT ] ) . toEqual ( 22221 ) ;
99+ expect ( consumeSpan . attributes [ SemanticAttributes . NET_PEER_NAME ] ) . toEqual ( TEST_RABBITMQ_HOST ) ;
100+ expect ( consumeSpan . attributes [ SemanticAttributes . NET_PEER_PORT ] ) . toEqual ( TEST_RABBITMQ_PORT ) ;
101101
102102 // assert context propagation
103103 expect ( consumeSpan . spanContext . traceId ) . toEqual ( publishSpan . spanContext . traceId ) ;
@@ -112,7 +112,7 @@ describe('amqplib instrumentation callback model', function () {
112112
113113 asyncConsume ( channel , queueName , [ ( msg ) => channel . ack ( msg ) ] ) . then ( ( ) => {
114114 // assert consumed message span has ended
115- expect ( memoryExporter . getFinishedSpans ( ) . length ) . toBe ( 2 ) ;
115+ expect ( getTestSpans ( ) . length ) . toBe ( 2 ) ;
116116 done ( ) ;
117117 } ) ;
118118 } ) ;
@@ -124,7 +124,7 @@ describe('amqplib instrumentation callback model', function () {
124124 ( msg ) =>
125125 setTimeout ( ( ) => {
126126 channel . ack ( msg ) ;
127- expect ( memoryExporter . getFinishedSpans ( ) . length ) . toBe ( 2 ) ;
127+ expect ( getTestSpans ( ) . length ) . toBe ( 2 ) ;
128128 done ( ) ;
129129 } , 1 ) ,
130130 ] ) ;
0 commit comments