11using System . Diagnostics ;
2+ using System . Diagnostics . Metrics ;
23
34namespace NATS . Client . Core . Internal ;
45
56// https://opentelemetry.io/docs/specs/semconv/attributes-registry/messaging/
67// https://opentelemetry.io/docs/specs/semconv/messaging/messaging-spans/#messaging-attributes
78internal static class Telemetry
89{
9- internal static readonly ActivitySource NatsActivities = new ( name : NatsActivitySource ) ;
10-
1110 private const string NatsActivitySource = "NATS.Net" ;
12- private static readonly object BoxedTrue = true ;
11+ private const string MeterName = "NATS.Net" ;
12+ private static readonly ActivitySource NatsActivities = new ( name : NatsActivitySource ) ;
13+ private static readonly Meter NatsMeter = new Meter ( MeterName ) ;
14+
15+ private static readonly Counter < long > _subscriptionCounter = NatsMeter . CreateCounter < long > (
16+ Constants . SubscriptionInstrumentName ,
17+ unit : "{subscriptions}" ,
18+ description : "Number of subscriptions" ) ;
19+
20+ private static readonly Counter < long > _pendingMessagesCounter = NatsMeter . CreateCounter < long > (
21+ Constants . PendingMessagesInstrumentName ,
22+ unit : "{messages}" ,
23+ description : "Number of pending messages" ) ;
24+
25+ private static readonly Histogram < long > _sentBytesHistogram = NatsMeter . CreateHistogram < long > (
26+ Constants . SentBytesInstrumentName ,
27+ unit : "{bytes}" ,
28+ description : "Number of bytes sent" ) ;
29+
30+ private static readonly Histogram < long > _receivedBytesHistogram = NatsMeter . CreateHistogram < long > (
31+ Constants . ReceivedBytesInstrumentName ,
32+ unit : "{bytes}" ,
33+ description : "Number of bytes received" ) ;
34+
35+ private static readonly Counter < long > _sentMessagesCounter = NatsMeter . CreateCounter < long > (
36+ Constants . SentMessagesInstrumentName ,
37+ unit : "{messages}" ,
38+ description : "Number of messages sent" ) ;
39+
40+ private static readonly Counter < long > _receivedMessagesCounter = NatsMeter . CreateCounter < long > (
41+ Constants . ReceivedMessagesInstrumentName ,
42+ unit : "{messages}" ,
43+ description : "Number of messages received" ) ;
44+
45+ private static readonly Histogram < double > _durationOperationHistogram = NatsMeter . CreateHistogram < double > (
46+ Constants . DurationOperationInstrumentName ,
47+ unit : "{s}" ,
48+ description : "Duration of messaging operation initiated by a producer or consumer client." ) ;
49+
50+ private static readonly Histogram < double > _durationProcessHistogram = NatsMeter . CreateHistogram < double > (
51+ Constants . DurationProcessInstrumentName ,
52+ unit : "{s}" ,
53+ description : "Duration of processing operation within client." ) ;
1354
14- internal static bool HasListeners ( ) => NatsActivities . HasListeners ( ) ;
55+ private static readonly object BoxedTrue = true ;
1556
1657 internal static Activity ? StartSendActivity (
1758 string name ,
@@ -20,9 +61,6 @@ internal static class Telemetry
2061 string ? replyTo ,
2162 ActivityContext ? parentContext = null )
2263 {
23- if ( ! NatsActivities . HasListeners ( ) )
24- return null ;
25-
2664 KeyValuePair < string , object ? > [ ] tags ;
2765 if ( connection is NatsConnection { ServerInfo : not null } conn )
2866 {
@@ -106,9 +144,6 @@ internal static void AddTraceContextHeaders(Activity? activity, ref NatsHeaders?
106144 long size ,
107145 NatsHeaders ? headers )
108146 {
109- if ( ! NatsActivities . HasListeners ( ) )
110- return null ;
111-
112147 KeyValuePair < string , object ? > [ ] tags ;
113148 if ( connection is NatsConnection { ServerInfo : not null } conn )
114149 {
@@ -216,6 +251,51 @@ static string GetStackTrace(Exception? exception)
216251 }
217252 }
218253
254+ internal static void IncrementSubscriptionCount ( KeyValuePair < string , object ? > [ ] tags )
255+ {
256+ _subscriptionCounter . Add ( 1 , tags ) ;
257+ }
258+
259+ internal static void DecrementSubscriptionCount ( KeyValuePair < string , object ? > [ ] tags )
260+ {
261+ _subscriptionCounter . Add ( - 1 , tags ) ;
262+ }
263+
264+ internal static void AddPendingMessages ( long messages , KeyValuePair < string , object ? > [ ] tags )
265+ {
266+ _pendingMessagesCounter . Add ( messages , tags ) ;
267+ }
268+
269+ internal static void RecordSentBytes ( long bytes , KeyValuePair < string , object ? > [ ] tags )
270+ {
271+ _sentBytesHistogram . Record ( bytes , tags ) ;
272+ }
273+
274+ internal static void RecordReceivedBytes ( long bytes , KeyValuePair < string , object ? > [ ] tags )
275+ {
276+ _receivedBytesHistogram . Record ( bytes , tags ) ;
277+ }
278+
279+ internal static void AddSentMessages ( long messages , KeyValuePair < string , object ? > [ ] tags )
280+ {
281+ _sentMessagesCounter . Add ( messages , tags ) ;
282+ }
283+
284+ internal static void AddReceivedMessages ( long messages , KeyValuePair < string , object ? > [ ] tags )
285+ {
286+ _receivedMessagesCounter . Add ( messages , tags ) ;
287+ }
288+
289+ internal static void RecordOperationDuration ( double duration , KeyValuePair < string , object ? > [ ] tags )
290+ {
291+ _durationOperationHistogram . Record ( duration , tags ) ;
292+ }
293+
294+ internal static void RecordProcessDuration ( long bytes , KeyValuePair < string , object ? > [ ] tags )
295+ {
296+ _durationProcessHistogram . Record ( bytes , tags ) ;
297+ }
298+
219299 private static bool TryParseTraceContext ( NatsHeaders headers , out ActivityContext context )
220300 {
221301 DistributedContextPropagator . Current . ExtractTraceIdAndState (
@@ -278,7 +358,7 @@ internal class Constants
278358 public const string DestIsTemporary = "messaging.destination.temporary" ;
279359 public const string DestPubName = "messaging.destination_publish.name" ;
280360
281- public const string QueueGroup = "messaging.nats. consumer.group" ;
361+ public const string QueueGroup = "messaging.consumer.group.name " ;
282362 public const string ReplyTo = "messaging.nats.message.reply_to" ;
283363 public const string Subject = "messaging.nats.message.subject" ;
284364
@@ -289,5 +369,14 @@ internal class Constants
289369 public const string NetworkPeerAddress = "network.peer.address" ;
290370 public const string NetworkPeerPort = "network.peer.port" ;
291371 public const string NetworkLocalAddress = "network.local.address" ;
372+
373+ public const string PendingMessagesInstrumentName = $ "messaging.client.pending.messages";
374+ public const string SentBytesInstrumentName = $ "messaging.client.sent.bytes";
375+ public const string ReceivedBytesInstrumentName = $ "messaging.client.consumed.bytes";
376+ public const string SentMessagesInstrumentName = $ "messaging.client.sent.messages";
377+ public const string ReceivedMessagesInstrumentName = $ "messaging.client.consumed.messages";
378+ public const string SubscriptionInstrumentName = $ "messaging.client.nats.subscription.count";
379+ public const string DurationOperationInstrumentName = $ "messaging.client.operation.duration";
380+ public const string DurationProcessInstrumentName = $ "messaging.process.duration";
292381 }
293382}
0 commit comments