Skip to content

Commit d6c878f

Browse files
committed
Some minor code quality improvements
1 parent 501d88c commit d6c878f

File tree

1 file changed

+12
-15
lines changed

1 file changed

+12
-15
lines changed

dispatcher.go

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ func (d *Dispatcher) WithJobErrFn(jobErrFn JobErrorFunction) *Dispatcher {
9898
// and it will halt the current thread until the queue becomes available
9999
func (d *Dispatcher) EnqueueJobAllowWait(job Job) {
100100
if blocked := d.BlockWhileQueueFull(); blocked {
101-
_, _ = d.log("blocked during enqueue because queue full")
101+
_, _ = d.dispatchLogFn("blocked during enqueue because queue full")
102102
}
103103
d.jobQueue <- job
104104
}
@@ -141,7 +141,8 @@ func (d *Dispatcher) BlockWhileQueueFull() bool {
141141
return didBlock
142142
}
143143

144-
// get the overall utilization for the dispatcher (all workers), as well as a summary of how effective each worker was at staying busy
144+
// GetUtilization gets the overall utilization for the dispatcher (all workers), as well as a summary of how effective
145+
//each worker was at staying busy
145146
func (d *Dispatcher) GetUtilization() Utilization {
146147
var results []WorkerUtilization
147148
for _, v := range d.workers {
@@ -157,8 +158,8 @@ func (d *Dispatcher) GetUtilization() Utilization {
157158
}
158159
}
159160

160-
// blocks until all workers are idle, then resumes - typically, use this at the end of your flow to make sure all
161-
// workers are done before proceeding or exiting
161+
// WaitUntilIdle blocks until all workers are idle, then resumes - typically, use this at the end of your flow to make
162+
// sure all workers are done before proceeding or exiting
162163
func (d *Dispatcher) WaitUntilIdle() {
163164

164165
// allocate a channel
@@ -177,7 +178,7 @@ func (d *Dispatcher) WaitUntilIdle() {
177178
stopChan <- true
178179
return
179180
} else {
180-
_, _ = d.waitLogFn("queued all jobs, but still running %d of them", runCount)
181+
_, _ = d.waitLogFn("waiting for %d jobs to complete before continuing", runCount)
181182
}
182183
}
183184
}()
@@ -186,27 +187,27 @@ func (d *Dispatcher) WaitUntilIdle() {
186187
<-stopChan
187188
}
188189

189-
func (d *Dispatcher) log(format string, a ...interface{}) (n int, err error) {
190-
return d.dispatchLogFn(format, a...)
190+
func (d *Dispatcher) IsAnyWorkerIdle() bool {
191+
return int(d.RunCount()) < cap(d.workerPool)
191192
}
192193

193194
// pulls a job from the job queue and adds it to the worker's job queue - a worker will grab it in the worker logic
194195
func (d *Dispatcher) dispatch() {
195196
for {
196197

197-
// if there are no workers ready to receive the job, let the job queue fill up
198+
// if there are no workers ready to receive the job, let the job queue potentially fill up
198199
if !d.IsAnyWorkerIdle() {
199-
time.Sleep(30 * time.Millisecond)
200+
time.Sleep(50 * time.Millisecond)
200201
continue
201202
}
202203

203-
_, _ = d.log("during round-robin enqueueing: %d running vs %d total", int(d.RunCount()), cap(d.workerPool))
204+
_, _ = d.dispatchLogFn("during round-robin dispatching: %d / %d jobs running", int(d.RunCount()), cap(d.workerPool))
204205

205206
select {
206207
case job := <-d.jobQueue:
207208
go func() {
208209
workerJobQueue := <-d.workerPool
209-
_, _ = d.log("adding job to workerJobQueue")
210+
_, _ = d.dispatchLogFn("adding job to workerJobQueue")
210211
workerJobQueue <- job
211212
}()
212213
}
@@ -232,7 +233,3 @@ func (d *Dispatcher) sample() {
232233
}
233234
}()
234235
}
235-
236-
func (d *Dispatcher) IsAnyWorkerIdle() bool {
237-
return int(d.RunCount()) < cap(d.workerPool)
238-
}

0 commit comments

Comments
 (0)