@@ -54,9 +54,13 @@ func NewWorkerManager(id string, config *ConsumerConfig, topicPartition TopicAnd
5454 availableWorkers := make (chan * Worker , config .NumWorkers )
5555 for i := 0 ; i < config .NumWorkers ; i ++ {
5656 workers [i ] = & Worker {
57+ InputChannel : make (chan * TaskAndStrategy ),
5758 OutputChannel : make (chan WorkerResult ),
59+ HandlerInputChannel : make (chan * TaskAndStrategy ),
60+ HandlerOutputChannel : make (chan WorkerResult ),
5861 TaskTimeout : config .WorkerTaskTimeout ,
5962 }
63+ workers [i ].Start ()
6064 availableWorkers <- workers [i ]
6165 }
6266
@@ -138,6 +142,12 @@ func (wm *WorkerManager) Stop() chan bool {
138142 Debug (wm , "Stopped failure counter" )
139143 finished <- true
140144 Debug (wm , "Leaving manager stop" )
145+
146+ Debug (wm , "Stopping workers" )
147+ for _ , worker := range wm .workers {
148+ worker .Stop ()
149+ }
150+ Debug (wm , "Stopped all workers" )
141151 })
142152 Debugf (wm , "Stopped workerManager" )
143153 }()
@@ -163,7 +173,7 @@ func (wm *WorkerManager) startBatch(batch []*Message) {
163173 if wm .shutdownDecision == nil {
164174 wm .metrics .activeWorkers ().Inc (1 )
165175 wm .metrics .pendingWMsTasks ().Dec (1 )
166- worker .Start ( task , wm .config .Strategy )
176+ worker .InputChannel <- & TaskAndStrategy { task , wm .config .Strategy }
167177 } else {
168178 return
169179 }
@@ -293,7 +303,9 @@ func (wm *WorkerManager) processBatch() {
293303 } else {
294304 Debugf (wm , "Retrying worker task %s %dth time" , result .Id (), task .Retries )
295305 time .Sleep (wm .config .WorkerBackoff )
296- go task .Callee .Start (task , wm .config .Strategy )
306+ go func () {
307+ task .Callee .InputChannel <- & TaskAndStrategy {task , wm .config .Strategy }
308+ }()
297309 }
298310 }
299311
@@ -354,9 +366,18 @@ func (wm *WorkerManager) UpdateLargestOffset(offset int64) {
354366
355367// Represents a worker that is able to process a single message.
356368type Worker struct {
369+ // Channel to write tasks to.
370+ InputChannel chan * TaskAndStrategy
371+
357372 // Channel to write processing results to.
358373 OutputChannel chan WorkerResult
359374
375+ // Intermediate channel for pushing result to strategy handler
376+ HandlerInputChannel chan * TaskAndStrategy
377+
378+ // Intermediate channel for pushing result from strategy handler
379+ HandlerOutputChannel chan WorkerResult
380+
360381 // Timeout for a single worker task.
361382 TaskTimeout time.Duration
362383
@@ -370,39 +391,51 @@ func (w *Worker) String() string {
370391
371392// Starts processing a given task using given strategy with this worker.
372393// Call to this method blocks until the task is done or timed out.
373- func (w * Worker ) Start (task * Task , strategy WorkerStrategy ) {
374- task . Callee = w
394+ func (w * Worker ) Start () {
395+ handlerInterrupted := false
375396 go func () {
376- shouldStop := false
377- resultChannel := make (chan WorkerResult )
378- go func () {
379- result := strategy (w , task .Msg , task .Id ())
380- for ! shouldStop {
397+ for taskAndStrategy := range w .HandlerInputChannel {
398+ result := taskAndStrategy .Strategy (w , taskAndStrategy .WorkerTask .Msg , taskAndStrategy .WorkerTask .Id ())
399+ Loop:
400+ for ! handlerInterrupted {
381401 timeout := time .NewTimer (5 * time .Second )
382402 select {
383- case resultChannel <- result :
403+ case w . HandlerOutputChannel <- result :
384404 timeout .Stop ()
385- return
405+ break Loop
386406 case <- timeout .C :
387407 }
388408 }
389- }()
390- timeout := time .NewTimer (w .TaskTimeout )
391- select {
392- case result := <- resultChannel :
393- {
394- w .OutputChannel <- result
395- }
396- case <- timeout .C :
397- {
398- shouldStop = true
399- w .OutputChannel <- & TimedOutResult {task .Id ()}
409+ handlerInterrupted = false
410+ }
411+ }()
412+
413+ go func () {
414+ for taskAndStrategy := range w .InputChannel {
415+ taskAndStrategy .WorkerTask .Callee = w
416+ w .HandlerInputChannel <- taskAndStrategy
417+ timeout := time .NewTimer (w .TaskTimeout )
418+ select {
419+ case result := <- w .HandlerOutputChannel :
420+ {
421+ w .OutputChannel <- result
422+ }
423+ case <- timeout .C :
424+ {
425+ handlerInterrupted = true
426+ w .OutputChannel <- & TimedOutResult {taskAndStrategy .WorkerTask .Id ()}
427+ }
400428 }
429+ timeout .Stop ()
401430 }
402- timeout .Stop ()
403431 }()
404432}
405433
434+ func (w * Worker ) Stop () {
435+ close (w .InputChannel )
436+ close (w .HandlerInputChannel )
437+ }
438+
406439// Defines what to do with a single Kafka message. Returns a WorkerResult to distinguish successful and unsuccessful processings.
407440type WorkerStrategy func (* Worker , * Message , TaskId ) WorkerResult
408441
@@ -622,3 +655,8 @@ func (b *taskBatch) numOutstanding() int {
622655func (b * taskBatch ) done () bool {
623656 return b .numOutstanding () == 0
624657}
658+
659+ type TaskAndStrategy struct {
660+ WorkerTask * Task
661+ Strategy WorkerStrategy
662+ }
0 commit comments