@@ -13,7 +13,7 @@ var _ Worker = (*Consumer)(nil)
13
13
14
14
var errMaxCapacity = errors .New ("max capacity reached" )
15
15
16
- // Consumer for simple queue using channel
16
+ // Consumer for simple queue using buffer channel
17
17
type Consumer struct {
18
18
taskQueue chan QueuedMessage
19
19
runFunc func (context.Context , QueuedMessage ) error
@@ -74,7 +74,7 @@ func (s *Consumer) handle(job Job) error {
74
74
}
75
75
}
76
76
77
- // Run start the worker
77
+ // Run to execute new task
78
78
func (s * Consumer ) Run (task QueuedMessage ) error {
79
79
var data Job
80
80
_ = json .Unmarshal (task .Bytes (), & data )
@@ -90,7 +90,7 @@ func (s *Consumer) Run(task QueuedMessage) error {
90
90
return nil
91
91
}
92
92
93
- // Shutdown worker
93
+ // Shutdown the worker
94
94
func (s * Consumer ) Shutdown () error {
95
95
if ! atomic .CompareAndSwapInt32 (& s .stopFlag , 0 , 1 ) {
96
96
return ErrQueueShutdown
@@ -103,7 +103,7 @@ func (s *Consumer) Shutdown() error {
103
103
return nil
104
104
}
105
105
106
- // Queue send notification to queue
106
+ // Queue send task to the buffer channel
107
107
func (s * Consumer ) Queue (task QueuedMessage ) error {
108
108
if atomic .LoadInt32 (& s .stopFlag ) == 1 {
109
109
return ErrQueueShutdown
@@ -117,6 +117,7 @@ func (s *Consumer) Queue(task QueuedMessage) error {
117
117
}
118
118
}
119
119
120
+ // Request a new task from channel
120
121
func (s * Consumer ) Request () (QueuedMessage , error ) {
121
122
select {
122
123
case task , ok := <- s .taskQueue :
@@ -129,7 +130,7 @@ func (s *Consumer) Request() (QueuedMessage, error) {
129
130
}
130
131
}
131
132
132
- // NewConsumer for struc
133
+ // NewConsumer for create new consumer instance
133
134
func NewConsumer (opts ... Option ) * Consumer {
134
135
o := NewOptions (opts ... )
135
136
w := & Consumer {
0 commit comments