-
Notifications
You must be signed in to change notification settings - Fork 838
Description
The scenario:
- auto topic creation is enabled
- consumer is started before topic has been created (kafka is running)
- producer creating the topic
=> consumer never receives a message unless restarted and hangs forever (at least 15 minutes).
Kafka Version
Default docker apache/kafka:4.1.1
Kafka-go version:
github.com/segmentio/kafka-go v0.4.49
docker run --restart always --name kafka -d -p 9092:9092 apache/kafka:4.1.1package main
import (
"context"
"fmt"
"log"
"strconv"
"time"
"github.com/business-introspector/spikes/shared/config"
"github.com/segmentio/kafka-go"
)
const topic = "missing-topic-test-11"
func main() {
// Start reader goroutine
go readerRoutine("localhost:9092")
// Start writer goroutine
go writerRoutine("localhost:9092")
// Keep main running
select {}
}
func readerRoutine(brokerURL string) {
//time.Sleep(10 * time.Second)
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{brokerURL},
Topic: topic,
GroupID: "missing-topic-repro-group",
MinBytes: 1, // 100 bytes
MaxBytes: 10e6, // 10MB
})
defer reader.Close()
log.Printf("Reader: Starting to read from topic '%s'\n", topic)
for {
msg, err := reader.ReadMessage(context.Background())
if err != nil {
log.Printf("Reader: Error reading message: %v\n", err)
continue
}
fmt.Printf("Reader: Received message - Value: %s", string(msg.Value))
fmt.Println()
}
}
func writerRoutine(brokerURL string) {
// Initial 10-second sleep
log.Println("Writer: Sleeping for 10 seconds before starting...")
time.Sleep(10 * time.Second)
writer := &kafka.Writer{
Addr: kafka.TCP(brokerURL),
Topic: topic,
Balancer: &kafka.LeastBytes{},
RequiredAcks: kafka.RequireOne,
AllowAutoTopicCreation: true,
}
defer writer.Close()
log.Printf("Writer: Starting to write to topic '%s'\n", topic)
counter := 0
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for range ticker.C {
value := strconv.Itoa(counter)
msg := kafka.Message{
Value: []byte(value),
}
err := writer.WriteMessages(context.Background(), msg)
if err != nil {
log.Printf("Writer: Error writing message %d: %v\n", counter, err)
} else {
log.Printf("Writer: Sent message %d\n", counter)
}
counter++
}
}Expected Behavior
Repro case starts a reader and 10 seconds later a writer. I expect the reader to eventually catch up with the messages, but this appears to never happen.
I expect log output like
Reader: Received message - Value: 1
Reader: Received message - Value: 2
Reader: Received message - Value: 3
Reader: Received message - Value: 4
Reader: Received message - Value: 5
Reader: Received message - Value: 6
Observed Behavior
Writer writes continuously but the reader never gets a message. Only restarting the program with the same topic name causes the reader to receive the messages.
2025/11/23 15:53:32 Writer: Sleeping for 10 seconds before starting...
2025/11/23 15:53:32 Reader: Starting to read from topic 'missing-topic-test-13'
2025/11/23 15:53:42 Writer: Starting to write to topic 'missing-topic-test-13'
2025/11/23 15:53:44 Writer: Sent message 0
2025/11/23 15:53:45 Writer: Sent message 1
2025/11/23 15:53:46 Writer: Sent message 2
2025/11/23 15:53:47 Writer: Sent message 3
2025/11/23 15:53:48 Writer: Sent message 4
2025/11/23 15:53:49 Writer: Sent message 5
2025/11/23 15:53:50 Writer: Sent message 6
Bonus weirdness
Inconsistency 1
- Rerunning the application with the same topic a second time works (topic was created before)
- Changing the topic name to another new name that does not exist, then calling
go run .also works (!!1)
=> this should not happen because it is inconsistent. The output reports the correct new topic name, but for whatever reason, the reader can now read from this topic shortly after creation.
Inconsistency 2
- Repeat Step 1 from above to get correct message reads first
- Changing the topic name again to a new one
go build .go run .
=> Unlike Inconsistency 1, now the problem re-appears that the reader is blocked. Very confusing.