Skip to content

Commit 567ba48

Browse files
committed
Implement real ScheduleWithTimeout
1 parent c2b22c3 commit 567ba48

File tree

1 file changed

+47
-10
lines changed

1 file changed

+47
-10
lines changed

worker/pool.go

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,8 @@ import (
1111
)
1212

1313
var (
14-
// ErrWorkerPoolIsEmpty WorkerPool Is Empty
15-
ErrWorkerPoolIsEmpty = errors.New("workerPool is empty")
16-
// ErrWorkerPoolIsFull WorkerPool Is Full
17-
ErrWorkerPoolIsFull = errors.New("workerPool is full")
14+
// ErrWorkerPoolJobQueueIsFull WorkerPool JobQueue Is Full
15+
ErrWorkerPoolJobQueueIsFull = errors.New("workerPool JobQueue is full")
1816
// ErrWorkerPoolIsClosed WorkerPool Is Closed
1917
ErrWorkerPoolIsClosed = errors.New("workerPool is closed")
2018
// ErrWorkerPoolScheduleTimeout WorkerPool Schedule Timeout
@@ -80,6 +78,9 @@ type DefaultWorkerPool struct {
8078
spawnWorkerCh fpgo.ChannelQueue[int]
8179
lastAliveTime time.Time
8280

81+
scheduleWaitCh fpgo.ChannelQueue[int]
82+
scheduleWaitCount int
83+
8384
// Settings
8485
DefaultWorkerPoolSettings
8586
}
@@ -92,7 +93,8 @@ func NewDefaultWorkerPool(jobQueue *fpgo.BufferedChannelQueue[func()], settings
9293
workerPool := &DefaultWorkerPool{
9394
jobQueue: jobQueue,
9495

95-
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
96+
spawnWorkerCh: fpgo.NewChannelQueue[int](1),
97+
scheduleWaitCh: fpgo.NewChannelQueue[int](1),
9698

9799
// Settings
98100
DefaultWorkerPoolSettings: *settings,
@@ -196,6 +198,9 @@ func (workerPoolSelf *DefaultWorkerPool) generateWorkerWithMaximum(maximum int)
196198
loopLabel:
197199
for {
198200
workerPoolSelf.lastAliveTime = time.Now()
201+
if workerPoolSelf.scheduleWaitCount > 0 {
202+
workerPoolSelf.scheduleWaitCh.Offer(1)
203+
}
199204

200205
select {
201206
case job := <-workerPoolSelf.jobQueue.GetChannel():
@@ -308,17 +313,49 @@ func (workerPoolSelf *DefaultWorkerPool) Schedule(fn func()) error {
308313
}
309314
defer workerPoolSelf.spawnWorkerCh.Offer(1)
310315

311-
return workerPoolSelf.jobQueue.Offer(fn)
316+
err := workerPoolSelf.jobQueue.Offer(fn)
317+
if err == fpgo.ErrQueueIsFull {
318+
return ErrWorkerPoolJobQueueIsFull
319+
}
320+
321+
return err
312322
}
313323

314324
// ScheduleWithTimeout Schedule the Job with timeout
315325
func (workerPoolSelf *DefaultWorkerPool) ScheduleWithTimeout(fn func(), timeout time.Duration) error {
316-
if workerPoolSelf.IsClosed() {
317-
return ErrWorkerPoolIsClosed
326+
err := workerPoolSelf.Schedule(fn)
327+
if err != ErrWorkerPoolJobQueueIsFull {
328+
return err
318329
}
319-
defer workerPoolSelf.spawnWorkerCh.Offer(1)
320330

321-
return workerPoolSelf.jobQueue.Offer(fn)
331+
deadline := time.Now().Add(timeout)
332+
workerPoolSelf.lock.Lock()
333+
workerPoolSelf.scheduleWaitCount++
334+
workerPoolSelf.lock.Unlock()
335+
go func() {
336+
workerPoolSelf.lock.Lock()
337+
workerPoolSelf.scheduleWaitCount--
338+
workerPoolSelf.lock.Unlock()
339+
}()
340+
341+
for {
342+
if workerPoolSelf.IsClosed() {
343+
return ErrWorkerPoolIsClosed
344+
}
345+
346+
err = workerPoolSelf.Schedule(fn)
347+
if err != ErrWorkerPoolJobQueueIsFull {
348+
return err
349+
}
350+
351+
select {
352+
case <-workerPoolSelf.scheduleWaitCh:
353+
continue
354+
case <-time.After(deadline.Sub(time.Now())):
355+
return ErrWorkerPoolScheduleTimeout
356+
}
357+
}
358+
return err
322359
}
323360

324361
// Invokable

0 commit comments

Comments
 (0)