3232 * @author Gary Russell
3333 * @author Christian Mergenthaler
3434 * @author Wang Zhiyang
35+ * @author Christian Fredriksson
3536 *
3637 * @since 3.0
3738 *
@@ -224,33 +225,45 @@ public static class DefaultKafkaListenerObservationConvention implements KafkaLi
224225 new DefaultKafkaListenerObservationConvention ();
225226
226227 @ Override
228+ @ NonNull
227229 public KeyValues getLowCardinalityKeyValues (KafkaRecordReceiverContext context ) {
228-
229- return KeyValues .of (
230+ String groupId = context . getGroupId ();
231+ KeyValues keyValues = KeyValues .of (
230232 ListenerLowCardinalityTags .LISTENER_ID .withValue (context .getListenerId ()),
231233 ListenerLowCardinalityTags .MESSAGING_SYSTEM .withValue ("kafka" ),
232234 ListenerLowCardinalityTags .MESSAGING_OPERATION .withValue ("receive" ),
233235 ListenerLowCardinalityTags .MESSAGING_SOURCE_NAME .withValue (context .getSource ()),
234- ListenerLowCardinalityTags .MESSAGING_SOURCE_KIND .withValue ("topic" ),
235- ListenerLowCardinalityTags .MESSAGING_CONSUMER_GROUP .withValue (context .getGroupId ())
236+ ListenerLowCardinalityTags .MESSAGING_SOURCE_KIND .withValue ("topic" )
236237 );
238+
239+ if (StringUtils .hasText (groupId )) {
240+ keyValues = keyValues
241+ .and (ListenerLowCardinalityTags .MESSAGING_CONSUMER_GROUP .withValue (groupId ));
242+ }
243+
244+ return keyValues ;
237245 }
238246
239247 @ Override
240248 @ NonNull
241249 public KeyValues getHighCardinalityKeyValues (KafkaRecordReceiverContext context ) {
242250 String clientId = context .getClientId ();
251+ String consumerId = getConsumerId (context .getGroupId (), clientId );
243252 KeyValues keyValues = KeyValues .of (
244253 ListenerHighCardinalityTags .MESSAGING_PARTITION .withValue (context .getPartition ()),
245- ListenerHighCardinalityTags .MESSAGING_OFFSET .withValue (context .getOffset ()),
246- ListenerHighCardinalityTags .MESSAGING_CONSUMER_ID .withValue (getConsumerId (context , clientId ))
254+ ListenerHighCardinalityTags .MESSAGING_OFFSET .withValue (context .getOffset ())
247255 );
248256
249257 if (StringUtils .hasText (clientId )) {
250258 keyValues = keyValues
251259 .and (ListenerHighCardinalityTags .MESSAGING_CLIENT_ID .withValue (clientId ));
252260 }
253261
262+ if (StringUtils .hasText (consumerId )) {
263+ keyValues = keyValues
264+ .and (ListenerHighCardinalityTags .MESSAGING_CONSUMER_ID .withValue (consumerId ));
265+ }
266+
254267 return keyValues ;
255268 }
256269
@@ -259,11 +272,14 @@ public String getContextualName(KafkaRecordReceiverContext context) {
259272 return context .getSource () + " receive" ;
260273 }
261274
262- private static @ Nullable String getConsumerId (KafkaRecordReceiverContext context , @ Nullable String clientId ) {
263- if (StringUtils .hasText (clientId )) {
264- return context .getGroupId () + " - " + clientId ;
275+ private static @ Nullable String getConsumerId (@ Nullable String groupId , @ Nullable String clientId ) {
276+ if (StringUtils .hasText (groupId )) {
277+ if (StringUtils .hasText (clientId )) {
278+ return groupId + " - " + clientId ;
279+ }
280+ return groupId ;
265281 }
266- return context . getGroupId () ;
282+ return clientId ;
267283 }
268284
269285 }
0 commit comments