-
Notifications
You must be signed in to change notification settings - Fork 214
Description
I have been testing Kafka consumers against Azure Event Hub, but I am noticing that the throughput is not utilizing the Event Hub's full capacity. I have tried both the Confluent Kafka Go example and the Sarama Go example, but both of them seem to achieve a throughput of around 1-2MB/s regardless of the partitions and throughput units (TUs) that i configure in Event Hub.
Test Setup:
- I used the latest versions of the Confluent Kafka Go and Sarama libraries.
- The app is deployed on Azure and the Event Hub is in the same region.
- The Event Hub has 40 throughput units (TUs) and 10 partitions.
- The theoretical maximum Event Hub consumer throughput should be 10 partitions * 2MB/s per partition = 20MB/s.
- The code to fetch events was copy pasted from the examples and i just added a log to print the consumed MB/s while the events are fetched
Results:
- My tests report a maximum throughput of just ~2MB/s, regardless of how many partitions are being consumed. I tried both with 1 partition and 10 partitions, still limited to 2mb/s.
- I validated that the app is correctly connecting to all 10 partitions and consuming events from each partition, but the throughput doesn’t exceed 2MB/s.
Experiment:
I used the Azure Event Hubs Go SDK in the same app and noticed I could easily fetch up to 25MB/s from the Event Hub, which indicates the Event Hub itself is not the limiting factor.
However, when using Kafka consumers, the throughput never goes above 2MB/s, even after adjusting the configuration settings in kafka consumers i couldn't achieve anything more.
Issue:
I would like to continue using Kafka for my consumers but cannot seem to achieve the expected throughput. I have tried playing with different configurations but haven't been able to improve the throughput.
Question:
Is there any advice or configuration change that could help me fully utilize the throughput of the Event Hub using Kafka consumers, or is there a known issue with Kafka's integration with Azure Event Hub that could be causing this limitation?
Code snippet
func confluentKafkaConsumer(ctx context.Context) (string, error) {
c, err := kafka1.NewConsumer(&kafka1.ConfigMap{
"bootstrap.servers": kafkaBrokers[0],
"broker.address.family": "v4",
"group.id": consumerGroup,
"session.timeout.ms": 30000,
"auto.offset.reset": "earliest",
"security.protocol": "SASL_SSL",
"sasl.mechanisms": "PLAIN",
"sasl.username": "$ConnectionString",
"sasl.password": connectionString,
"enable.auto.offset.store": false,
"enable.auto.commit": false,
})
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
return "", nil
}
start := time.Now()
totalBytes := 0
totalMessages := 0
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to subscribe to topics: %s\n", err)
os.Exit(1)
}
for {
msg, err := c.ReadMessage(-1)
if err != nil {
fmt.Fprintf(os.Stderr, "Consumer error: %v (%v)\n", err, msg)
break
}
totalBytes += len(msg.Value)
totalMessages++
if totalMessages%1000 == 0 {
fmt.Println("MB/s -", float64(totalBytes)/(1024*1024)/time.Since(start).Seconds())
fmt.Println("Messages per second -", float64(totalMessages)/time.Since(start).Seconds())
fmt.Println("Total Messages -", totalMessages)
}
}
return "", nil
}