File tree Expand file tree Collapse file tree 4 files changed +8
-3
lines changed Expand file tree Collapse file tree 4 files changed +8
-3
lines changed Original file line number Diff line number Diff line change @@ -13,7 +13,7 @@ var _ Worker = (*Consumer)(nil)
13
13
14
14
var errMaxCapacity = errors .New ("max capacity reached" )
15
15
16
- // Worker for simple queue using channel
16
+ // Consumer for simple queue using channel
17
17
type Consumer struct {
18
18
taskQueue chan QueuedMessage
19
19
runFunc func (context.Context , QueuedMessage ) error
@@ -32,6 +32,7 @@ func (s *Consumer) decBusyWorker() {
32
32
s .metric .DecBusyWorker ()
33
33
}
34
34
35
+ // BusyWorkers returns the numbers of workers has been busy.
35
36
func (s * Consumer ) BusyWorkers () uint64 {
36
37
return s .metric .BusyWorkers ()
37
38
}
Original file line number Diff line number Diff line change @@ -67,6 +67,7 @@ func WithTimeOut(t time.Duration) Option {
67
67
}
68
68
}
69
69
70
+ // Options for custom args in Queue
70
71
type Options struct {
71
72
workerCount int
72
73
timeout time.Duration
@@ -77,6 +78,7 @@ type Options struct {
77
78
metric Metric
78
79
}
79
80
81
+ // NewOptions initialize the default value for the options
80
82
func NewOptions (opts ... Option ) * Options {
81
83
o := & Options {
82
84
workerCount : defaultWorkerCount ,
Original file line number Diff line number Diff line change 1
1
package queue
2
2
3
+ // NewPool initializes a new pool
3
4
func NewPool (size int , opts ... Option ) * Queue {
4
5
o := []Option {
5
6
WithWorkerCount (size ),
Original file line number Diff line number Diff line change @@ -47,6 +47,7 @@ func (j Job) Bytes() []byte {
47
47
return j .Payload
48
48
}
49
49
50
+ // Encode for encoding the structure
50
51
func (j Job ) Encode () []byte {
51
52
b , _ := json .Marshal (j )
52
53
return b
@@ -107,7 +108,7 @@ func (q *Queue) Shutdown() {
107
108
})
108
109
}
109
110
110
- // Workers returns the numbers of workers has been created .
111
+ // Release for graceful shutdown .
111
112
func (q * Queue ) Release () {
112
113
q .Shutdown ()
113
114
q .Wait ()
@@ -143,7 +144,7 @@ func (q *Queue) Queue(job QueuedMessage) error {
143
144
return q .handleQueue (q .timeout , job )
144
145
}
145
146
146
- // Queue to queue all job
147
+ // QueueWithTimeout to queue all job with specified timeout.
147
148
func (q * Queue ) QueueWithTimeout (timeout time.Duration , job QueuedMessage ) error {
148
149
return q .handleQueue (timeout , job )
149
150
}
You can’t perform that action at this time.
0 commit comments