diff --git a/kq/pusher.go b/kq/pusher.go index b254d4c..02e6f23 100644 --- a/kq/pusher.go +++ b/kq/pusher.go @@ -30,6 +30,8 @@ type ( // kafka.Writer options allowAutoTopicCreation bool balancer kafka.Balancer + batchTimeout time.Duration + batchBytes int64 // executors.ChunkExecutor options chunkSize int @@ -59,6 +61,12 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher { if options.balancer != nil { producer.Balancer = options.balancer } + if options.batchTimeout > 0 { + producer.BatchTimeout = options.batchTimeout + } + if options.batchBytes > 0 { + producer.BatchBytes = options.batchBytes + } pusher := &Pusher{ producer: producer, @@ -178,3 +186,17 @@ func WithSyncPush() PushOption { options.syncPush = true } } + +// WithBatchTimeout customizes the Pusher with the given batch timeout. +func WithBatchTimeout(timeout time.Duration) PushOption { + return func(options *pushOptions) { + options.batchTimeout = timeout + } +} + +// WithBatchBytes customizes the Pusher with the given batch bytes. +func WithBatchBytes(bytes int64) PushOption { + return func(options *pushOptions) { + options.batchBytes = bytes + } +} diff --git a/kq/pusher_test.go b/kq/pusher_test.go index 72b380d..6d2396b 100644 --- a/kq/pusher_test.go +++ b/kq/pusher_test.go @@ -63,6 +63,30 @@ func TestNewPusher(t *testing.T) { assert.NotNil(t, pusher) assert.True(t, pusher.producer.(*kafka.Writer).AllowAutoTopicCreation) }) + + t.Run("WithBatchTimeout", func(t *testing.T) { + timeout := time.Second * 5 + pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout)) + assert.NotNil(t, pusher) + assert.Equal(t, timeout, pusher.producer.(*kafka.Writer).BatchTimeout) + }) + + t.Run("WithBatchBytes", func(t *testing.T) { + batchBytes := int64(1024 * 1024) // 1MB + pusher := NewPusher(addrs, topic, WithBatchBytes(batchBytes)) + assert.NotNil(t, pusher) + assert.Equal(t, batchBytes, pusher.producer.(*kafka.Writer).BatchBytes) + }) + + t.Run("WithMultipleBatchOptions", func(t *testing.T) { + timeout := time.Second * 3 + batchBytes := int64(512 * 1024) // 512KB + pusher := NewPusher(addrs, topic, WithBatchTimeout(timeout), WithBatchBytes(batchBytes)) + assert.NotNil(t, pusher) + writer := pusher.producer.(*kafka.Writer) + assert.Equal(t, timeout, writer.BatchTimeout) + assert.Equal(t, batchBytes, writer.BatchBytes) + }) } func TestPusher_Close(t *testing.T) { @@ -139,3 +163,17 @@ func TestPusher_PushWithKey_Error(t *testing.T) { assert.Equal(t, expectedError, err) mockWriter.AssertExpectations(t) } + +func TestWithBatchTimeout(t *testing.T) { + options := &pushOptions{} + timeout := time.Second * 5 + WithBatchTimeout(timeout)(options) + assert.Equal(t, timeout, options.batchTimeout) +} + +func TestWithBatchBytes(t *testing.T) { + options := &pushOptions{} + batchBytes := int64(1024 * 1024) + WithBatchBytes(batchBytes)(options) + assert.Equal(t, batchBytes, options.batchBytes) +}