-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
I am using async producer to produce to Kafka in a best-effort way, so I am okay with dropping messages. But I am surprised on the rate of dropping messages. Here is my code
type asyncKafkaProducer struct {
producer sarama.AsyncProducer
logger log.Logger
}
func NewAsyncKafkaProducer(producer sarama.AsyncProducer, logger log.Logger) KafkaProducer {
return &asyncKafkaProducer{
producer: producer,
logger: logger,
}
}
func (a *asyncKafkaProducer) Produce(msgs []*sarama.ProducerMessage) error {
for _, msg := range msgs {
select {
case a.producer.Input() <- msg:
// message queued
default:
// if Input() is full or closed, drop message silently
a.logger.Error(nil, "Async producer input channel is full or closed, dropping message")
}
}
return nil
}
func (a *asyncKafkaProducer) Close() error {
a.producer.AsyncClose()
return nil
}
The related configs I have are
cfg.Producer.Return.Successes = false
cfg.Producer.Return.Errors = false
// Increase the channel buffer size to avoid blocking the producer, default is 256
cfg.ChannelBufferSize = 10000
Since this is a best-effort producing, I configured the Successes() and Errors() return channel to false. I set the ChannelBufferSize to something big, hoping to handle high/burst traffic better. However, with this implementation, I saw a lot of "Async producer input channel is full or closed, dropping message" messages almost immediately under load of 200msg/s.
After looking into the code, I realized that the producer.Input() channel is an unbuffered channel. The ChannelBufferSize config is used to create buffered channel on a topic producer level. But the unbuffered channel consumer does a bunch of things before dispatching the message to the topic producer. So it's possible that the dispatching is not keeping up with queueing the message to Input() channel?
Is this expected? Or there is something wrong with my implementation or config settings?