@@ -23,10 +23,10 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
2323 Assert . NotNull ( _management ) ;
2424 Assert . NotNull ( _meterFactory ) ;
2525 Assert . NotNull ( _metricsReporter ) ;
26- var messageSendCollector =
27- new MetricCollector < int > ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
28- var messageSendDurationCollector =
29- new MetricCollector < double > ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
26+ MetricCollector < int > clientSendMessageCounterCollector =
27+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
28+ MetricCollector < double > clientSendDurationCollector =
29+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
3030
3131 IQueueSpecification queueSpecification = _management . Queue ( _queueName ) ;
3232 await queueSpecification . DeclareAsync ( ) ;
@@ -37,34 +37,122 @@ public async Task PublisherMetricsShouldBeIncrementedWhenMessageIsSendWithSucces
3737
3838 await SystemUtils . WaitUntilQueueMessageCount ( queueSpecification , 1 ) ;
3939
40- var consumedMessagesMeasurements = messageSendCollector . GetMeasurementSnapshot ( ) ;
41- Assert . NotEmpty ( consumedMessagesMeasurements ) ;
42- Assert . Equal ( 1 , consumedMessagesMeasurements [ 0 ] . Value ) ;
43- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
44- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
45- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
46- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
40+ var clientSendMessagesMeasurements =
41+ clientSendMessageCounterCollector
42+ . GetMeasurementSnapshot ( ) ;
43+ Assert . NotEmpty ( clientSendMessagesMeasurements ) ;
44+ Assert . Equal ( 1 , clientSendMessagesMeasurements [ 0 ] . Value ) ;
45+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
46+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
47+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
48+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
4749 $ "/queues/{ Utils . EncodePathSegment ( queueSpecification . QueueName ) } ") ;
48- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "server.port" ] ,
50+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "server.port" ] ,
4951 _connectionSettings ! . Port ) ;
50- Assert . Equal ( consumedMessagesMeasurements [ 0 ] . Tags [ "server.address" ] ,
52+ Assert . Equal ( clientSendMessagesMeasurements [ 0 ] . Tags [ "server.address" ] ,
5153 _connectionSettings ! . Host ) ;
5254
53- var sendDurationsMeasurements = messageSendDurationCollector . GetMeasurementSnapshot ( ) ;
54- Assert . NotEmpty ( sendDurationsMeasurements ) ;
55- Assert . True ( sendDurationsMeasurements [ 0 ] . Value > 0 ) ;
56- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
57- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
58- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
59- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
55+ var clientSendDurationsMeasurements =
56+ clientSendDurationCollector
57+ . GetMeasurementSnapshot ( ) ;
58+ Assert . NotEmpty ( clientSendDurationsMeasurements ) ;
59+ Assert . True ( clientSendDurationsMeasurements [ 0 ] . Value > 0 ) ;
60+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.system" ] , "rabbitmq" ) ;
61+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.name" ] , "publish" ) ;
62+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.operation.type" ] , "send" ) ;
63+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "messaging.destination.name" ] ,
6064 $ "/queues/{ Utils . EncodePathSegment ( queueSpecification . QueueName ) } ") ;
61- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "server.port" ] ,
65+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "server.port" ] ,
6266 _connectionSettings ! . Port ) ;
63- Assert . Equal ( sendDurationsMeasurements [ 0 ] . Tags [ "server.address" ] ,
67+ Assert . Equal ( clientSendDurationsMeasurements [ 0 ] . Tags [ "server.address" ] ,
6468 _connectionSettings ! . Host ) ;
6569
6670 await publisher . CloseAsync ( ) ;
6771 publisher . Dispose ( ) ;
6872 }
73+
74+ [ Fact ]
75+ public async Task PublisherShouldRecordAFailureWhenSendingThrowAnException ( )
76+ {
77+ Assert . NotNull ( _connection ) ;
78+ Assert . NotNull ( _management ) ;
79+ Assert . NotNull ( _meterFactory ) ;
80+ Assert . NotNull ( _metricsReporter ) ;
81+ MetricCollector < int > clientSendMessageCounterCollector =
82+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.sent.messages" ) ;
83+ MetricCollector < double > clientSendDurationCollector =
84+ new ( _meterFactory , "RabbitMQ.Amqp" , "messaging.client.operation.duration" ) ;
85+
86+ IMessage message = new AmqpMessage ( Encoding . ASCII . GetBytes ( "hello" ) ) ;
87+
88+ IExchangeSpecification exchangeSpecification = _management . Exchange ( _exchangeName ) . Type ( ExchangeType . FANOUT ) ;
89+ await exchangeSpecification . DeclareAsync ( ) ;
90+
91+ IPublisherBuilder publisherBuilder = _connection . PublisherBuilder ( ) ;
92+ // TODO implement Listeners
93+ IPublisher publisher = await publisherBuilder . Exchange ( exchangeSpecification ) . BuildAsync ( ) ;
94+
95+ try
96+ {
97+ IQueueSpecification queueSpecification = _management . Queue ( ) . Exclusive ( true ) ;
98+ IQueueInfo queueInfo = await queueSpecification . DeclareAsync ( ) ;
99+ IBindingSpecification bindingSpecification = _management . Binding ( )
100+ . SourceExchange ( _exchangeName )
101+ . DestinationQueue ( queueInfo . Name ( ) ) ;
102+ await bindingSpecification . BindAsync ( ) ;
103+
104+ PublishResult publishResult = await publisher . PublishAsync ( message ) ;
105+ Assert . Equal ( OutcomeState . Accepted , publishResult . Outcome . State ) ;
106+ }
107+ finally
108+ {
109+ await exchangeSpecification . DeleteAsync ( ) ;
110+ }
111+
112+ for ( int i = 0 ; i < 100 ; i ++ )
113+ {
114+ PublishResult nextPublishResult = await publisher . PublishAsync ( message ) ;
115+ if ( OutcomeState . Rejected == nextPublishResult . Outcome . State )
116+ {
117+ break ;
118+ }
119+
120+ await Task . Delay ( TimeSpan . FromMilliseconds ( 100 ) ) ;
121+ }
122+
123+ var failedSendMeasure =
124+ clientSendMessageCounterCollector
125+ . LastMeasurement ! ;
126+ Assert . Equal ( 1 , failedSendMeasure . Value ) ;
127+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.system" ] , "rabbitmq" ) ;
128+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.operation.name" ] , "publish" ) ;
129+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.operation.type" ] , "send" ) ;
130+ Assert . Equal ( failedSendMeasure . Tags [ "messaging.destination.name" ] ,
131+ $ "/exchanges/{ Utils . EncodePathSegment ( exchangeSpecification . ExchangeName ) } ") ;
132+ Assert . Equal ( failedSendMeasure . Tags [ "server.port" ] ,
133+ _connectionSettings ! . Port ) ;
134+ Assert . Equal ( failedSendMeasure . Tags [ "server.address" ] ,
135+ _connectionSettings ! . Host ) ;
136+ Assert . Equal ( failedSendMeasure . Tags [ "error.type" ] ,
137+ "amqp:not-found" ) ;
138+
139+ var failedMessageSendDuration =
140+ clientSendDurationCollector
141+ . LastMeasurement ! ;
142+ Assert . True ( failedMessageSendDuration . Value > 0 ) ;
143+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.system" ] , "rabbitmq" ) ;
144+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.operation.name" ] , "publish" ) ;
145+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.operation.type" ] , "send" ) ;
146+ Assert . Equal ( failedMessageSendDuration . Tags [ "messaging.destination.name" ] ,
147+ $ "/exchanges/{ Utils . EncodePathSegment ( exchangeSpecification . ExchangeName ) } ") ;
148+ Assert . Equal ( failedMessageSendDuration . Tags [ "server.port" ] ,
149+ _connectionSettings ! . Port ) ;
150+ Assert . Equal ( failedMessageSendDuration . Tags [ "server.address" ] ,
151+ _connectionSettings ! . Host ) ;
152+ Assert . Equal ( failedMessageSendDuration . Tags [ "error.type" ] ,
153+ "amqp:not-found" ) ;
154+ await publisher . CloseAsync ( ) ;
155+ publisher . Dispose ( ) ;
156+ }
69157}
70158#endif
0 commit comments