Skip to content

Kafka initialization failed due to injection of OpenTelemetryMetricsReporter. #12538

@Cirilla-zmh

Description

@Cirilla-zmh

When we install OTel Java Agent for an application that integrates the Kafka client, it causes the Kafka producer initialization to fail. We eventually identified and resolved the issue, but I believe it's worth bringing this up for discussion in the community. Here are the details of the issue:

Background

We used spring-kafka with version 2.3.13.RELEASE, which depends on kafka-clients version 2.5.1.

In the application, we use DefaultKafkaProducerFactory to create the Producer and set it to per-thread mode, meaning we allocate a producer instance for each thread in the worker thread group. Here is core codes:

@Bean
@Primary
public KafkaTemplate<String, String> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {
    DefaultKafkaProducerFactory<String, String> factory = new DefaultKafkaProducerFactory<>(producerConfigs());
    factory.setProducerPerThread(true);
    return factory;
}

public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
    props.put(ProducerConfig.ACKS_CONFIG, acks);
    props.put(ProducerConfig.RETRIES_CONFIG, retries);
    props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
    props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
    props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, maxRequestSize);
    props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType);
    props.put(ProducerConfig.SEND_BUFFER_CONFIG, sendBuffer);
    KafkaSaslConfig.kafkaSaslConfig(props, propertiesConfig);
    return props;
}

When we install the Java Agent in production and start the application, it throws an exception:

org.apache.kafka.common.config.ConfigException: Missing required configuration "key.serializer" which has no default value.

However, the key.serializer is indeed present in our actual code as you can see above.

Problem Localization

The application works fine when run independently. But when it runs with OTel Java Agent, the actual configuration only includes:

@HashMap[
    @String[compression.type]:@String[gzip],
    @String[metric.reporters]:@String[io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.OpenTelemetryMetricsReporter],
    @String[acks]:@String[-1],
    @String[batch.size]:@Integer[4096],
    @String[key.serializer]:@Class[class org.apache.kafka.common.serialization.StringSerializer],     @String[opentelemetry.supplier]:@OpenTelemetrySupplier[io.opentelemetry.javaagent.shaded.instrumentation.kafka.internal.OpenTelemetrySupplier@20db01b6],
    @String[opentelemetry.instrumentation_name]:@String[io.opentelemetry.kafka-clients-0.11],
    @String[retries]:@Integer[1],
    @String[max.request.size]:@Integer[2097152],
    @String[value.serializer]:@Class[class org.apache.kafka.common.serialization.StringSerializer],
]

Since our configuration has exactly 11 items, very close to the threshold for automatic resizing of HashMap, we suspect that concurrently adding keys to a resizing HashMap caused some nodes to be lost.

We reviewed the code of the OTel Java Agent and spring-kafka and we confirmed that this is the cause:

In spring-kafka, when creating DefaultKafkaProducerFactory, the config is stored in a HashMap. Later, when creating the producer, it directly reuses this config, even in a multi-threaded environment (which is fine if the config does not change).
截屏2024-10-30 22 12 19
截屏2024-10-30 22 13 07

When creating the producer, it first enters the OTel Java Agent's advice code block, which executes the enhanceConfig() method. During this process, the config map is "unexpectedly" modified concurrently, leading to concurrency conflicts.

@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) {
// ensure config is a mutable map
if (config.getClass() != HashMap.class) {
config = new HashMap<>(config);
}
enhanceConfig(config);
}

Fix

By creating a copy of the incoming config before calling enhanceConfig(), the issue was resolved.

 @Advice.OnMethodEnter(suppress = Throwable.class) 
 public static void onEnter( 
         @Advice.Argument(value = 0, readOnly = false) Map<String, Object> config) { 
     config = new HashMap<>(config);
     enhanceConfig(config); 
 } 

Some Thoughts

Similar to another issue #11946 I have raised, it seems that Java Agent developers are often troubled by such corner cases. Is there any approaches or suggestions to anticipate these kinds of issues rather than solving them after they occur?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions