Skip to content

Commit 41834bc

Browse files
committed
Merge pull request #136 from spenczar/fix_workermanager_memory_leak
Allocate a new taskBatch for each batch of tasks in the WorkerManager
2 parents 806cde9 + 83504cf commit 41834bc

File tree

1 file changed

+1
-0
lines changed

1 file changed

+1
-0
lines changed

workers.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ func (wm *WorkerManager) Stop() chan bool {
147147

148148
func (wm *WorkerManager) startBatch(batch []*Message) {
149149
inLock(&wm.stopLock, func() {
150+
wm.currentBatch = newTaskBatch()
150151
wm.batchOrder = make([]TaskId, 0)
151152
for _, message := range batch {
152153
topicPartition := TopicAndPartition{message.Topic, message.Partition}

0 commit comments

Comments
 (0)