Skip to content

Commit 27c5ffa

Browse files
committed
#1717 Standardize broker naming and improve telemetry.
Updated `BrokerAddress` properties in consumer clients to use lowercase identifiers for messaging systems. Modified `ServiceBusHelpers` for consistent naming. Enhanced `AddHeaderIfDoesntExist` to use `Append` for HTTP response headers. Improved tagging in `DiagnosticListener` for better telemetry data collection. Added `GetExecutionInstanceId` method to `TransportMessage` for additional context in message processing.
1 parent 7eec04f commit 27c5ffa

File tree

9 files changed

+57
-33
lines changed

9 files changed

+57
-33
lines changed

src/DotNetCore.CAP.AmazonSQS/AmazonSQSConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public AmazonSQSConsumerClient(string groupId, byte groupConcurrent, IOptions<Am
4343

4444
public Action<LogMessageEventArgs>? OnLogCallback { get; set; }
4545

46-
public BrokerAddress BrokerAddress => new("AmazonSQS", _queueUrl);
46+
public BrokerAddress BrokerAddress => new("aws_sqs", _queueUrl);
4747

4848
public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
4949
{

src/DotNetCore.CAP.AzureServiceBus/Helpers/ServiceBusHelpers.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ _ when string.IsNullOrWhiteSpace(@namespace)
2121
_ => throw new InvalidOperationException("Unhandled case in switch expression.")
2222
};
2323

24-
return new BrokerAddress("AzureServiceBus", host);
24+
return new BrokerAddress("servicebus", host);
2525
}
2626

2727

src/DotNetCore.CAP.Dashboard/GatewayProxy/GatewayProxyAgent.cs

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -148,18 +148,16 @@ private void SetDownStreamRequestUri(Node node, string requestPath, string query
148148
uriBuilder = new UriBuilder(node.Address + requestPath + queryString);
149149
else
150150
uriBuilder = new UriBuilder("http://", node.Address, node.Port, requestPath, queryString);
151-
151+
152152
if (node.Port > 0)
153153
uriBuilder.Port = node.Port;
154-
154+
155155
DownstreamRequest.RequestUri = uriBuilder.Uri;
156156
}
157157

158-
private static void AddHeaderIfDoesntExist(HttpContext context,
159-
KeyValuePair<string, IEnumerable<string>> httpResponseHeader)
158+
private static void AddHeaderIfDoesntExist(HttpContext context, KeyValuePair<string, IEnumerable<string>> httpResponseHeader)
160159
{
161160
if (!context.Response.Headers.ContainsKey(httpResponseHeader.Key))
162-
context.Response.Headers.Add(httpResponseHeader.Key,
163-
new StringValues(httpResponseHeader.Value.ToArray()));
161+
context.Response.Headers.Append(httpResponseHeader.Key, new StringValues(httpResponseHeader.Value.ToArray()));
164162
}
165163
}

src/DotNetCore.CAP.Kafka/KafkaConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ public KafkaConsumerClient(string groupId, byte groupConcurrent,
4141

4242
public Action<LogMessageEventArgs>? OnLogCallback { get; set; }
4343

44-
public BrokerAddress BrokerAddress => new("Kafka", _kafkaOptions.Servers);
44+
public BrokerAddress BrokerAddress => new("kafka", _kafkaOptions.Servers);
4545

4646
public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
4747
{

src/DotNetCore.CAP.NATS/NATSConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public NATSConsumerClient(string groupName, byte groupConcurrent, IOptions<NATSO
3939

4040
public Action<LogMessageEventArgs>? OnLogCallback { get; set; }
4141

42-
public BrokerAddress BrokerAddress => new("NATS", _natsOptions.Servers);
42+
public BrokerAddress BrokerAddress => new("nats", _natsOptions.Servers);
4343

4444
public ICollection<string> FetchTopics(IEnumerable<string> topicNames)
4545
{

src/DotNetCore.CAP.OpenTelemetry/DiagnosticListener.cs

Lines changed: 42 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
using DotNetCore.CAP.Diagnostics;
99
using OpenTelemetry;
1010
using OpenTelemetry.Context.Propagation;
11-
using OpenTelemetry.Trace;
1211
using CapEvents = DotNetCore.CAP.Diagnostics.CapDiagnosticListenerNames;
1312

1413
namespace DotNetCore.CAP.OpenTelemetry;
@@ -52,7 +51,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
5251
ActivityKind.Internal, parentContext);
5352
if (activity != null)
5453
{
55-
activity.SetTag("message.name", eventData.Operation);
54+
activity.SetTag("messaging.destination.name", eventData.Operation);
5655
activity.AddEvent(new ActivityEvent("CAP message persistence start...",
5756
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
5857

@@ -61,7 +60,8 @@ public void OnNext(KeyValuePair<string, object?> evt)
6160
Propagator.Inject(new PropagationContext(Activity.Current.Context, Baggage.Current),
6261
eventData.Message,
6362
(msg, key, value) => { msg.Headers[key] = value; });
64-
};
63+
}
64+
;
6565
}
6666
}
6767
break;
@@ -84,7 +84,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
8484
{
8585
var exception = eventData.Exception!;
8686
activity.SetStatus(ActivityStatusCode.Error, exception.Message);
87-
activity.RecordException(exception);
87+
activity.AddException(exception);
8888
activity.Stop();
8989
}
9090
}
@@ -108,12 +108,22 @@ public void OnNext(KeyValuePair<string, object?> evt)
108108
if (activity != null)
109109
{
110110
activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
111-
activity.SetTag("messaging.destination", eventData.Operation);
112-
activity.SetTag("messaging.destination_kind", "topic");
113-
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint?.Replace("-1", "5672"));
114-
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
115-
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body.Length);
116-
111+
activity.SetTag("messaging.message.id", eventData.TransportMessage.GetId());
112+
activity.SetTag("messaging.message.body.size", eventData.TransportMessage.Body.Length);
113+
activity.SetTag("messaging.message.conversation_id", eventData.TransportMessage.GetCorrelationId());
114+
activity.SetTag("messaging.destination.name", eventData.Operation);
115+
if (eventData.BrokerAddress.Endpoint is { } endpoint)
116+
{
117+
var parts = endpoint.Split(':');
118+
if (parts.Length > 0)
119+
{
120+
activity.SetTag("server.address", parts[0]);
121+
}
122+
if (parts.Length > 1 && int.TryParse(parts[1], out var port))
123+
{
124+
activity.SetTag("server.port", port);
125+
}
126+
}
117127
activity.AddEvent(new ActivityEvent("Message publishing start...",
118128
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
119129

@@ -143,7 +153,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
143153
{
144154
var exception = eventData.Exception!;
145155
activity.SetStatus(ActivityStatusCode.Error, exception.Message);
146-
activity.RecordException(exception);
156+
activity.AddException(exception);
147157
activity.Stop();
148158
}
149159
}
@@ -166,12 +176,24 @@ public void OnNext(KeyValuePair<string, object?> evt)
166176
if (activity != null)
167177
{
168178
activity.SetTag("messaging.system", eventData.BrokerAddress.Name);
169-
activity.SetTag("messaging.destination", eventData.Operation);
170-
activity.SetTag("messaging.destination_kind", "topic");
171-
activity.SetTag("messaging.url", eventData.BrokerAddress.Endpoint?.Replace("-1", "5672"));
172-
activity.SetTag("messaging.message_id", eventData.TransportMessage.GetId());
173-
activity.SetTag("messaging.message_payload_size_bytes", eventData.TransportMessage.Body.Length);
174-
179+
activity.SetTag("messaging.message.id", eventData.TransportMessage.GetId());
180+
activity.SetTag("messaging.message.body.size", eventData.TransportMessage.Body.Length);
181+
activity.SetTag("messaging.operation.type", "receive");
182+
activity.SetTag("messaging.client.id", eventData.TransportMessage.GetExecutionInstanceId());
183+
activity.SetTag("messaging.destination.name", eventData.Operation);
184+
activity.SetTag("messaging.consumer.group.name", eventData.TransportMessage.GetGroup());
185+
if (eventData.BrokerAddress.Endpoint is { } endpoint)
186+
{
187+
var parts = endpoint.Split(':');
188+
if (parts.Length > 0)
189+
{
190+
activity.SetTag("server.address", parts[0]);
191+
}
192+
if (parts.Length > 1 && int.TryParse(parts[1], out var port))
193+
{
194+
activity.SetTag("server.port", port);
195+
}
196+
}
175197
activity.AddEvent(new ActivityEvent("CAP message persistence start...",
176198
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
177199
}
@@ -197,7 +219,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
197219
{
198220
var exception = eventData.Exception!;
199221
activity.SetStatus(ActivityStatusCode.Error, exception.Message);
200-
activity.RecordException(exception);
222+
activity.AddException(exception);
201223
activity.Stop();
202224
}
203225
}
@@ -224,8 +246,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
224246

225247
if (activity != null)
226248
{
227-
activity.SetTag("messaging.operation", "process");
228-
activity.SetTag("code.function", eventData.MethodInfo.Name);
249+
activity.SetTag("code.function.name", eventData.MethodInfo.Name);
229250

230251
activity.AddEvent(new ActivityEvent("Begin invoke the subscriber:" + eventData.MethodInfo.Name,
231252
DateTimeOffset.FromUnixTimeMilliseconds(eventData.OperationTimestamp!.Value)));
@@ -252,7 +273,7 @@ public void OnNext(KeyValuePair<string, object?> evt)
252273
{
253274
var exception = eventData.Exception!;
254275
activity.SetStatus(ActivityStatusCode.Error, exception.Message);
255-
activity.RecordException(exception);
276+
activity.AddException(exception);
256277
activity.Stop();
257278
}
258279
}

src/DotNetCore.CAP.Pulsar/PulsarConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public PulsarConsumerClient(IOptions<PulsarOptions> options, PulsarClient client
3636

3737
public Action<LogMessageEventArgs>? OnLogCallback { get; set; }
3838

39-
public BrokerAddress BrokerAddress => new("Pulsar", _pulsarOptions.ServiceUrl);
39+
public BrokerAddress BrokerAddress => new("pulsar", _pulsarOptions.ServiceUrl);
4040

4141
public void Subscribe(IEnumerable<string> topics)
4242
{

src/DotNetCore.CAP.RabbitMQ/RabbitMQConsumerClient.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public RabbitMqConsumerClient(string groupName, byte groupConcurrent,
4242

4343
public Action<LogMessageEventArgs>? OnLogCallback { get; set; }
4444

45-
public BrokerAddress BrokerAddress => new("RabbitMQ", $"{_rabbitMqOptions.HostName}:{_rabbitMqOptions.Port}");
45+
public BrokerAddress BrokerAddress => new("rabbitmq", $"{_rabbitMqOptions.HostName}:{_rabbitMqOptions.Port}");
4646

4747
public void Subscribe(IEnumerable<string> topics)
4848
{

src/DotNetCore.CAP/Messages/TransportMessage.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,4 +48,9 @@ public string GetName()
4848
{
4949
return Headers.TryGetValue(Messages.Headers.CorrelationId, out var value) ? value : null;
5050
}
51+
52+
public string? GetExecutionInstanceId()
53+
{
54+
return Headers.TryGetValue(Messages.Headers.ExecutionInstanceId, out var value) ? value : null;
55+
}
5156
}

0 commit comments

Comments
 (0)