Skip to content

Commit 83504cf

Browse files
committed
Allocate a new taskBatch for each batch of tasks in the WorkerManager
1 parent 34c98ac commit 83504cf

File tree

1 file changed

+1
-0
lines changed

1 file changed

+1
-0
lines changed

workers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (wm *WorkerManager) Stop() chan bool {
147147

148148
func (wm *WorkerManager) startBatch(batch []*Message) {
149149
inLock(&wm.stopLock, func() {
150+
wm.currentBatch = newTaskBatch()
150151
wm.batchOrder = make([]TaskId, 0)
151152
for _, message := range batch {
152153
topicPartition := TopicAndPartition{message.Topic, message.Partition}

0 commit comments

Comments
 (0)