Skip to content

ConcurrentModifiedException when read metrics from Kafka clients. #11946

@Cirilla-zmh

Description

@Cirilla-zmh

Describe the bug

TraceStack:

msg:"An exception occurred invoking callback for CallbackRegistration{instrumentDescriptors=[InstrumentDescriptor{name=kafka.consumer.assigned_partitions, description=The number of partitions currently assigned to this consumer, unit=, type=OBSERVABLE_GAUGE, valueType=DOUBLE, advice=Advice{explicitBucketBoundaries=null}}]}. java.util.ConcurrentModificationException
	at java.base/java.util.LinkedHashMap$LinkedHashIterator.nextNode(LinkedHashMap.java:719)
	at java.base/java.util.LinkedHashMap$LinkedKeyIterator.next(LinkedHashMap.java:741)
	at java.base/java.util.AbstractCollection.addAll(AbstractCollection.java:351)
	at java.base/java.util.HashSet.<init>(HashSet.java:120)
	at org.apache.kafka.common.internals.PartitionStates.partitionSet(PartitionStates.java:65)
	at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedPartitions(SubscriptionState.java:298)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$ConsumerCoordinatorMetrics$1.measure(ConsumerCoordinator.java:896)
	at io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.KafkaMetricRegistry.value(KafkaMetricRegistry.java:145)
	at io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.KafkaMetricRegistry.lambda$createObservable$2(KafkaMetricRegistry.java:124)
	at io.opentelemetry.sdk.metrics.AbstractInstrumentBuilder.lambda$registerDoubleAsynchronousInstrument$0(AbstractInstrumentBuilder.java:107)
	at io.opentelemetry.sdk.metrics.internal.state.CallbackRegistration.invokeCallback(CallbackRegistration.java:84)
	at io.opentelemetry.sdk.metrics.internal.state.MeterSharedState.collectAll(MeterSharedState.java:96)
	at io.opentelemetry.sdk.metrics.SdkMeter.collectAll(SdkMeter.java:75)
	at io.opentelemetry.sdk.metrics.SdkMeterProvider$LeasedMetricProducer.collectAllMetrics(SdkMeterProvider.java:184)
	......
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
	at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
	at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.base/java.lang.Thread.run(Thread.java:829)

Steps to reproduce

In fact, this is not a bug that belongs to the instrumentation project; it was only triggered by the instrumentation.

Expected behavior

A ConcurrentModifiedException will be thrown at the metric collection point, which will cause a single metric collection to fail.

Actual behavior

The OpenTelemetryMetricsReporter instances should not be injected into Kafka clients with a version lower than 2.0.1. (See the link; version 2.0.1 of kafka-clients has fixed this issue.)

Javaagent or library instrumentation version

v2.5.0

Environment

JDK: HotSpot 17.0.7
OS: Darwin 22.3.0

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workingneeds triageNew issue that requires triage

    Type

    No type

    Projects

    Status

    No status

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions