Skip to content

Commit b01cd51

Browse files
committed
fix: add SeekTimeout consumer option
1 parent 43310dd commit b01cd51

File tree

4 files changed

+98
-65
lines changed

4 files changed

+98
-65
lines changed

consumer.go

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type consumer struct {
2929
groupId string
3030
handlers map[string]Handler
3131
readTimeout time.Duration
32+
seekTimeout time.Duration
3233
sig chan os.Signal
3334
logsChannel chan kafka.LogEvent
3435
waiting []chan error
@@ -48,6 +49,60 @@ type consumer struct {
4849
}
4950
}
5051

52+
// NewConsumer creates a new Kafka consumer with the specified configuration
53+
// and options. The consumer must be started before it can receive messages.
54+
func NewConsumer(config *Config, opts ...ConsumerOption) (Consumer, error) {
55+
handle := func(err error) (Consumer, error) {
56+
return nil, fmt.Errorf("kafka.NewConsumer: %w", err)
57+
}
58+
59+
const defaultReadTimeout = 1 * time.Second
60+
const defaultSeekTimeout = 100 * time.Millisecond
61+
62+
consumer := &consumer{
63+
handlers: map[string]Handler{},
64+
readTimeout: defaultReadTimeout,
65+
seekTimeout: defaultSeekTimeout,
66+
}
67+
68+
// clone the config and apply options
69+
config = config.clone()
70+
if err := consumer.configure(config, opts...); err != nil {
71+
return handle(err)
72+
}
73+
74+
// create the confluent consumer
75+
var err error
76+
if consumer.Consumer, err = createConsumer(&config.config); err != nil {
77+
return handle(fmt.Errorf("createConsumer: %w", err))
78+
}
79+
consumer.funcs.close = consumer.Consumer.Close
80+
consumer.funcs.commitOffsets = consumer.Consumer.CommitOffsets
81+
consumer.funcs.readMessage = consumer.Consumer.ReadMessage
82+
consumer.funcs.seek = consumer.Consumer.Seek
83+
consumer.funcs.subscribe = consumer.Consumer.SubscribeTopics
84+
85+
// start a go routine to read events from the logs channel to prevent
86+
// it filling up;
87+
consumer.logsChannel = consumer.Consumer.Logs()
88+
go func() {
89+
for {
90+
if _, ok := <-consumer.logsChannel; !ok {
91+
consumer.logsChannel = nil
92+
return
93+
}
94+
}
95+
}()
96+
97+
// establish a ctrl-c signal channel for running in interactive console
98+
consumer.sig = make(chan os.Signal, 1)
99+
signal.Notify(consumer.sig, syscall.SIGINT, syscall.SIGTERM)
100+
101+
consumer.setState(csReady)
102+
103+
return consumer, nil
104+
}
105+
51106
// commit commits the offset of the specified message.
52107
//
53108
// If readNext is true, the offset is committed as is (the consumer will read
@@ -59,12 +114,10 @@ type consumer struct {
59114
// If the commit fails, a FatalError is returned (the consumer will stop
60115
// processing messages)
61116
var commitOffset = func(ctx context.Context, c *consumer, msg *kafka.Message, readNext bool) error {
62-
const seekTimeoutMS = 100 // FUTURE: make configurable per consumer?
63-
64117
offset := msg.TopicPartition
65118
if readNext {
66119
offset.Offset += 1
67-
} else if err := c.funcs.seek(offset, seekTimeoutMS); err != nil {
120+
} else if err := c.funcs.seek(offset, int(c.seekTimeout.Milliseconds())); err != nil {
68121
return fmt.Errorf("commit: seek: %w", err)
69122
}
70123

@@ -532,53 +585,3 @@ func (c *consumer) configure(cfg *Config, opts ...ConsumerOption) error {
532585

533586
return nil
534587
}
535-
536-
// NewConsumer creates a new Kafka consumer with the specified configuration
537-
// and options. The consumer must be started before it can receive messages.
538-
func NewConsumer(config *Config, opts ...ConsumerOption) (Consumer, error) {
539-
handle := func(err error) (Consumer, error) {
540-
return nil, fmt.Errorf("kafka.NewConsumer: %w", err)
541-
}
542-
543-
consumer := &consumer{
544-
handlers: map[string]Handler{},
545-
readTimeout: 1 * time.Second,
546-
}
547-
548-
// clone the config and apply options
549-
config = config.clone()
550-
if err := consumer.configure(config, opts...); err != nil {
551-
return handle(err)
552-
}
553-
554-
// create the confluent consumer
555-
var err error
556-
if consumer.Consumer, err = createConsumer(&config.config); err != nil {
557-
return handle(fmt.Errorf("createConsumer: %w", err))
558-
}
559-
consumer.funcs.close = consumer.Consumer.Close
560-
consumer.funcs.commitOffsets = consumer.Consumer.CommitOffsets
561-
consumer.funcs.readMessage = consumer.Consumer.ReadMessage
562-
consumer.funcs.seek = consumer.Consumer.Seek
563-
consumer.funcs.subscribe = consumer.Consumer.SubscribeTopics
564-
565-
// start a go routine to read events from the logs channel to prevent
566-
// it filling up;
567-
consumer.logsChannel = consumer.Consumer.Logs()
568-
go func() {
569-
for {
570-
if _, ok := <-consumer.logsChannel; !ok {
571-
consumer.logsChannel = nil
572-
return
573-
}
574-
}
575-
}()
576-
577-
// establish a ctrl-c signal channel for running in interactive console
578-
consumer.sig = make(chan os.Signal, 1)
579-
signal.Notify(consumer.sig, syscall.SIGINT, syscall.SIGTERM)
580-
581-
consumer.setState(csReady)
582-
583-
return consumer, nil
584-
}

consumerOption.go

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -66,14 +66,48 @@ func MessageDecryption(fn CypherFunc) ConsumerOption {
6666
}
6767
}
6868

69-
// Message
69+
// ReadTimeoutNever is a special value for the ReadTimeout option
70+
// that indicates the consumer should never time out when reading
71+
// messages.
72+
//
73+
// Use this with caution; a consumer that never times out may be
74+
// ejected from the consumer group if it does not receive a message
75+
// within the configured session timeout.
76+
const ReadTimeoutNever = time.Duration(-1)
77+
78+
// ReadTimeout sets the read timeout for the consumer.
79+
// If not set, a default of 1s is used.
80+
func ReadTimeout(timeout time.Duration) ConsumerOption {
81+
return func(cfg *Config, consumer *consumer) error {
82+
if timeout < 0 && timeout != ReadTimeoutNever {
83+
return ErrInvalidReadTimeout
84+
}
85+
consumer.readTimeout = timeout
86+
return nil
87+
}
88+
}
89+
90+
// SeekTimeout sets the timeout for seek operations on the consumer.
91+
// If not set, a default of 100ms is used.
92+
func SeekTimeout(timeout time.Duration) ConsumerOption {
93+
return func(cfg *Config, consumer *consumer) error {
94+
if timeout <= 0 {
95+
return ErrInvalidSeekTimeout
96+
}
97+
consumer.seekTimeout = timeout
98+
return nil
99+
}
100+
}
101+
102+
// TopicHandler registers a handler for a specific topic.
70103
func TopicHandler[T comparable](topic T, h Handler) ConsumerOption {
71104
return func(_ *Config, c *consumer) error {
72105
c.handlers[fmt.Sprintf("%v", topic)] = h
73106
return nil
74107
}
75108
}
76109

110+
// TopicHandlers registers multiple handlers for specific topics.
77111
func TopicHandlers[T comparable](handlers map[T]Handler) ConsumerOption {
78112
return func(_ *Config, c *consumer) error {
79113
for topic, h := range handlers {
@@ -82,13 +116,3 @@ func TopicHandlers[T comparable](handlers map[T]Handler) ConsumerOption {
82116
return nil
83117
}
84118
}
85-
86-
func ReadTimeout(timeout time.Duration) ConsumerOption {
87-
return func(cfg *Config, consumer *consumer) error {
88-
if timeout < 0 && timeout != -1 {
89-
return ErrInvalidReadTimeout
90-
}
91-
consumer.readTimeout = timeout
92-
return nil
93-
}
94-
}

consumerOption_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,10 @@ func TestConsumerOptions(t *testing.T) {
5858
Config: &Config{config: kafka.ConfigMap{}},
5959
consumer: &consumer{readTimeout: 1 * time.Hour},
6060
}},
61+
{name: "SeekTimeout", sut: SeekTimeout(250 * time.Millisecond), result: result{
62+
Config: &Config{config: kafka.ConfigMap{}},
63+
consumer: &consumer{seekTimeout: 250 * time.Millisecond},
64+
}},
6165
}
6266
for _, tc := range testcases {
6367
t.Run(tc.name, func(t *testing.T) {
@@ -102,6 +106,7 @@ func TestConsumerConfigurationErrorHandling(t *testing.T) {
102106
{name: "auto offset reset (invalid)", sut: AutoOffsetReset("invalid"), result: cfgerr},
103107
{name: "group id", sut: ConsumerGroupID("group"), result: cfgerr},
104108
{name: "read timeout", sut: ReadTimeout(-2), result: ErrInvalidReadTimeout},
109+
{name: "seek timeout", sut: SeekTimeout(-2), result: ErrInvalidSeekTimeout},
105110
}
106111
for _, tc := range testcases {
107112
t.Run(tc.name, func(t *testing.T) {

errors.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ var (
1212
ErrConsumerNotRunning = errors.New("consumer is not running")
1313
ErrConsumerNotStarted = errors.New("consumer has not been started")
1414
ErrInvalidOperation = errors.New("invalid operation")
15-
ErrInvalidReadTimeout = errors.New("invalid read timeout (must be -1 or >= 0)")
15+
ErrInvalidReadTimeout = errors.New("invalid read timeout (must be kafka.ReadTimeoutNever or >= 0ms)")
16+
ErrInvalidSeekTimeout = errors.New("invalid seek timeout (must be > 0ms)")
1617
ErrNoHandler = errors.New("no handler for topic")
1718
ErrNoHandlersConfigured = errors.New("no handlers configured")
1819
ErrReprocessMessage = errors.New("message will be reprocessed")

0 commit comments

Comments
 (0)