-
Notifications
You must be signed in to change notification settings - Fork 198
Description
Since version 4.31.0 KafkaDelayedRetryTopic does not work as expected anymore.
Dispite of my config being:
mp.messaging.incoming.my-topic.connector=smallrye-kafka
mp.messaging.incoming.my-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
mp.messaging.incoming.my-topic.topic=my-topic
mp.messaging.incoming.my-topic.group.id=grpid
mp.messaging.incoming.my-topic.failure-strategy=delayed-retry-topic
mp.messaging.incoming.my-topic.delayed-retry-topic.topics=my-topic-retry_300000
mp.messaging.incoming.my-topic.delayed-retry-topic.max-retries=2147483647
mp.messaging.incoming.my-topic.delayed-retry-topic.timeout=2147483647
mp.messaging.incoming.my-topic.delayed-retry-topic.consumer.group.id=grpid
Following Exception is thrown:
Caused by: java.lang.IllegalArgumentException: The attribute value.deserializer on connector 'smallrye-kafka' (channel: my-topic) must be set
at io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration.lambda$getValueDeserializer$0(KafkaConnectorIncomingConfiguration.java:72)
at java.base/java.util.Optional.orElseThrow(Optional.java:403)
at io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration.getValueDeserializer(KafkaConnectorIncomingConfiguration.java:72)
at io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration.validate(KafkaConnectorIncomingConfiguration.java:435)
at io.smallrye.reactive.messaging.kafka.KafkaConnectorIncomingConfiguration.(KafkaConnectorIncomingConfiguration.java:16)
at io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic$Factory.create(KafkaDelayedRetryTopic.java:172)
at io.smallrye.reactive.messaging.kafka.fault.KafkaDelayedRetryTopic$Factory_ClientProxy.create(Unknown Source)
at io.smallrye.reactive.messaging.kafka.impl.KafkaSource.createFailureHandler(KafkaSource.java:380)
I think the problem is somewhere inbetween codelines 166 and 172 where config properties are gathered, merged and validated.
By debugging i could gather following information:
config.config().getPropertyNames(): value.deserializer exists.
retryConsumerConfig.getPropertyNames(): value.deserializer does not exist.
retryConsumerConfig.overall.getPropertyNames(): value.deserializer exists.