-
Notifications
You must be signed in to change notification settings - Fork 55
Description
I'm trying to set the request.required.acks property on a producer, my code looks like this:
let brokers = brokersList $ map BrokerAddress $ kafkaBrokers config
extra = extraProps $ M.fromList [("request.required.acks", "0")]
properties = brokers <> extra <> debugOptions [DebugAll]
in newProducer propertiesIt seems to be set correctly, when I start my producer, I see this in the logs:
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: Client configuration:
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: client.software.version = 1.6.0
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: metadata.broker.list = localhost:9092
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: default_topic_conf = 0x7f89e8002260
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: ssl_key = [redacted]
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: dr_msg_cb = 0x7f8a220d2058
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: Default topic configuration:
%7|1617942667.281|CONF|rdkafka#producer-2| [thrd:app]: request.required.acks = 0
^^^^^^^^ there
As a test case, I created a topic with a replication factor lower than min.insync.replicas :
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic my.test.topic --config min.insync.replicas=2
Now with these settings, publishing to this topic with a producer acks set to 0 or 1 should work, but fail with acks set to -1 or all (according to the docs, and indeed this behavior is honored by kafka-console-producer.sh).
With my producer however, nothing is ever published successfully. The logs output lines such as this one:
REQERR|rdkafka#producer-2| ProduceRequest failed: Broker: Not enough in-sync replicas: explicit actions Retry,MsgNotPersisted
So either extraProps are (sometimes ?) not passed down to librdkafka correctly, or something is overriding this specific property somewhere else.
Any idea what might be happening here ?