diff --git a/metric.go b/metric.go index e9ff253..a75fbbd 100644 --- a/metric.go +++ b/metric.go @@ -6,22 +6,25 @@ import "sync/atomic" type Metric interface { IncBusyWorker() DecBusyWorker() - BusyWorkers() uint64 + BusyWorkers() int64 SuccessTasks() uint64 FailureTasks() uint64 SubmittedTasks() uint64 + CompletedTasks() uint64 IncSuccessTask() IncFailureTask() IncSubmittedTask() + IncCompletedTask() } var _ Metric = (*metric)(nil) type metric struct { - busyWorkers uint64 + busyWorkers int64 successTasks uint64 failureTasks uint64 submittedTasks uint64 + completedTasks uint64 } // NewMetric for default metric structure @@ -30,15 +33,15 @@ func NewMetric() Metric { } func (m *metric) IncBusyWorker() { - atomic.AddUint64(&m.busyWorkers, 1) + atomic.AddInt64(&m.busyWorkers, 1) } func (m *metric) DecBusyWorker() { - atomic.AddUint64(&m.busyWorkers, ^uint64(0)) + atomic.AddInt64(&m.busyWorkers, ^int64(0)) } -func (m *metric) BusyWorkers() uint64 { - return atomic.LoadUint64(&m.busyWorkers) +func (m *metric) BusyWorkers() int64 { + return atomic.LoadInt64(&m.busyWorkers) } func (m *metric) IncSuccessTask() { @@ -53,6 +56,10 @@ func (m *metric) IncSubmittedTask() { atomic.AddUint64(&m.submittedTasks, 1) } +func (m *metric) IncCompletedTask() { + atomic.AddUint64(&m.completedTasks, 1) +} + func (m *metric) SuccessTasks() uint64 { return atomic.LoadUint64(&m.successTasks) } @@ -64,3 +71,7 @@ func (m *metric) FailureTasks() uint64 { func (m *metric) SubmittedTasks() uint64 { return atomic.LoadUint64(&m.submittedTasks) } + +func (m *metric) CompletedTasks() uint64 { + return atomic.LoadUint64(&m.completedTasks) +} diff --git a/metric_test.go b/metric_test.go index 1105756..4073633 100644 --- a/metric_test.go +++ b/metric_test.go @@ -44,8 +44,8 @@ func TestMetricData(t *testing.T) { })) q.Start() time.Sleep(50 * time.Millisecond) - assert.Equal(t, 4, q.SubmittedTasks()) - assert.Equal(t, 2, q.SuccessTasks()) - assert.Equal(t, 2, q.FailureTasks()) + assert.Equal(t, uint64(4), q.SubmittedTasks()) + assert.Equal(t, uint64(2), q.SuccessTasks()) + assert.Equal(t, uint64(2), q.FailureTasks()) q.Release() } diff --git a/options.go b/options.go index 9fa2492..0690d17 100644 --- a/options.go +++ b/options.go @@ -9,7 +9,7 @@ import ( var ( defaultCapacity = 0 - defaultWorkerCount = runtime.NumCPU() + defaultWorkerCount = int64(runtime.NumCPU()) defaultNewLogger = NewLogger() defaultFn = func(context.Context, core.QueuedMessage) error { return nil } defaultMetric = NewMetric() @@ -29,7 +29,7 @@ func (f OptionFunc) apply(option *Options) { } // WithWorkerCount set worker count -func WithWorkerCount(num int) Option { +func WithWorkerCount(num int64) Option { return OptionFunc(func(q *Options) { if num <= 0 { num = defaultWorkerCount @@ -82,7 +82,7 @@ func WithAfterFn(afterFn func()) Option { // Options for custom args in Queue type Options struct { - workerCount int + workerCount int64 logger Logger queueSize int worker core.Worker diff --git a/pool.go b/pool.go index 6790b76..49dcd33 100644 --- a/pool.go +++ b/pool.go @@ -1,7 +1,7 @@ package queue // NewPool initializes a new pool -func NewPool(size int, opts ...Option) *Queue { +func NewPool(size int64, opts ...Option) *Queue { o := []Option{ WithWorkerCount(size), WithWorker(NewRing(opts...)), diff --git a/pool_test.go b/pool_test.go index e4e2626..f7816c6 100644 --- a/pool_test.go +++ b/pool_test.go @@ -8,7 +8,7 @@ import ( ) func TestNewPoolWithQueueTask(t *testing.T) { - totalN := 5 + totalN := int64(5) taskN := 100 rets := make(chan struct{}, taskN) @@ -26,7 +26,7 @@ func TestNewPoolWithQueueTask(t *testing.T) { // shutdown all, and now running worker is 0 p.Release() - assert.Equal(t, 0, p.BusyWorkers()) + assert.Equal(t, int64(0), p.BusyWorkers()) } func TestPoolNumber(t *testing.T) { @@ -34,5 +34,5 @@ func TestPoolNumber(t *testing.T) { p.Start() // shutdown all, and now running worker is 0 p.Release() - assert.Equal(t, 0, p.BusyWorkers()) + assert.Equal(t, int64(0), p.BusyWorkers()) } diff --git a/queue.go b/queue.go index 9dc1df8..78223cd 100644 --- a/queue.go +++ b/queue.go @@ -22,7 +22,7 @@ type ( sync.Mutex metric *metric logger Logger - workerCount int + workerCount int64 routineGroup *routineGroup quit chan struct{} ready chan struct{} @@ -95,23 +95,28 @@ func (q *Queue) Release() { } // BusyWorkers returns the numbers of workers in the running process. -func (q *Queue) BusyWorkers() int { - return int(q.metric.BusyWorkers()) +func (q *Queue) BusyWorkers() int64 { + return q.metric.BusyWorkers() } // BusyWorkers returns the numbers of success tasks. -func (q *Queue) SuccessTasks() int { - return int(q.metric.SuccessTasks()) +func (q *Queue) SuccessTasks() uint64 { + return q.metric.SuccessTasks() } // BusyWorkers returns the numbers of failure tasks. -func (q *Queue) FailureTasks() int { - return int(q.metric.FailureTasks()) +func (q *Queue) FailureTasks() uint64 { + return q.metric.FailureTasks() } // BusyWorkers returns the numbers of submitted tasks. -func (q *Queue) SubmittedTasks() int { - return int(q.metric.SubmittedTasks()) +func (q *Queue) SubmittedTasks() uint64 { + return q.metric.SubmittedTasks() +} + +// CompletedTasks returns the numbers of completed tasks. +func (q *Queue) CompletedTasks() uint64 { + return q.metric.CompletedTasks() } // Wait all process @@ -269,7 +274,7 @@ func (q *Queue) handle(m *job.Message) error { } // UpdateWorkerCount to update worker number dynamically. -func (q *Queue) UpdateWorkerCount(num int) { +func (q *Queue) UpdateWorkerCount(num int64) { q.Lock() q.workerCount = num q.Unlock() diff --git a/queue_test.go b/queue_test.go index c97e221..d9ede90 100644 --- a/queue_test.go +++ b/queue_test.go @@ -48,7 +48,7 @@ func TestNewQueueWithZeroWorker(t *testing.T) { q.Start() time.Sleep(50 * time.Millisecond) - assert.Equal(t, 0, q.BusyWorkers()) + assert.Equal(t, int64(0), q.BusyWorkers()) q.Release() } @@ -74,7 +74,7 @@ func TestNewQueueWithDefaultWorker(t *testing.T) { q.Start() q.Release() - assert.Equal(t, 0, q.BusyWorkers()) + assert.Equal(t, int64(0), q.BusyWorkers()) } func TestHandleTimeout(t *testing.T) { diff --git a/ring_test.go b/ring_test.go index 996e9f8..aa93fd7 100644 --- a/ring_test.go +++ b/ring_test.go @@ -142,7 +142,7 @@ func TestCancelJobAfterShutdown(t *testing.T) { assert.NoError(t, q.Queue(m, job.AllowOption{Timeout: job.Time(100 * time.Millisecond)})) q.Start() time.Sleep(10 * time.Millisecond) - assert.Equal(t, 2, q.BusyWorkers()) + assert.Equal(t, int64(2), q.BusyWorkers()) q.Release() } @@ -231,10 +231,10 @@ func TestIncreaseWorkerCount(t *testing.T) { q.Start() time.Sleep(100 * time.Millisecond) - assert.Equal(t, 5, q.BusyWorkers()) + assert.Equal(t, int64(5), q.BusyWorkers()) q.UpdateWorkerCount(10) time.Sleep(100 * time.Millisecond) - assert.Equal(t, 10, q.BusyWorkers()) + assert.Equal(t, int64(10), q.BusyWorkers()) q.Release() } @@ -261,12 +261,12 @@ func TestDecreaseWorkerCount(t *testing.T) { q.Start() time.Sleep(20 * time.Millisecond) - assert.Equal(t, 5, q.BusyWorkers()) + assert.Equal(t, int64(5), q.BusyWorkers()) q.UpdateWorkerCount(3) time.Sleep(100 * time.Millisecond) - assert.Equal(t, 3, q.BusyWorkers()) + assert.Equal(t, int64(3), q.BusyWorkers()) time.Sleep(100 * time.Millisecond) - assert.Equal(t, 2, q.BusyWorkers()) + assert.Equal(t, int64(2), q.BusyWorkers()) q.Release() }