Skip to content

Commit 77eb8e7

Browse files
authored
chore(metric): support busy workers count. (#3)
1 parent eef7072 commit 77eb8e7

File tree

2 files changed

+82
-37
lines changed

2 files changed

+82
-37
lines changed

redis.go

Lines changed: 53 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,8 @@ import (
88
"sync/atomic"
99
"time"
1010

11-
"github.com/golang-queue/queue"
12-
1311
"github.com/go-redis/redis/v8"
12+
"github.com/golang-queue/queue"
1413
)
1514

1615
var _ queue.Worker = (*Worker)(nil)
@@ -30,11 +29,12 @@ type Worker struct {
3029
channel string
3130
channelSize int
3231

33-
stopOnce sync.Once
34-
stop chan struct{}
35-
runFunc func(context.Context, queue.QueuedMessage) error
36-
logger queue.Logger
37-
stopFlag int32
32+
stopOnce sync.Once
33+
stop chan struct{}
34+
runFunc func(context.Context, queue.QueuedMessage) error
35+
logger queue.Logger
36+
stopFlag int32
37+
busyWorkers uint64
3838
}
3939

4040
// WithAddr setup the addr of redis
@@ -143,23 +143,39 @@ func NewWorker(opts ...Option) *Worker {
143143
return w
144144
}
145145

146+
func (w *Worker) incBusyWorker() {
147+
atomic.AddUint64(&w.busyWorkers, 1)
148+
}
149+
150+
func (w *Worker) decBusyWorker() {
151+
atomic.AddUint64(&w.busyWorkers, ^uint64(0))
152+
}
153+
154+
func (w *Worker) BusyWorkers() uint64 {
155+
return atomic.LoadUint64(&w.busyWorkers)
156+
}
157+
146158
// BeforeRun run script before start worker
147-
func (s *Worker) BeforeRun() error {
159+
func (w *Worker) BeforeRun() error {
148160
return nil
149161
}
150162

151163
// AfterRun run script after start worker
152-
func (s *Worker) AfterRun() error {
164+
func (w *Worker) AfterRun() error {
153165
return nil
154166
}
155167

156-
func (s *Worker) handle(job queue.Job) error {
168+
func (w *Worker) handle(job queue.Job) error {
157169
// create channel with buffer size 1 to avoid goroutine leak
158170
done := make(chan error, 1)
159171
panicChan := make(chan interface{}, 1)
160172
startTime := time.Now()
161173
ctx, cancel := context.WithTimeout(context.Background(), job.Timeout)
162-
defer cancel()
174+
w.incBusyWorker()
175+
defer func() {
176+
cancel()
177+
w.decBusyWorker()
178+
}()
163179

164180
// run the job
165181
go func() {
@@ -171,15 +187,15 @@ func (s *Worker) handle(job queue.Job) error {
171187
}()
172188

173189
// run custom process function
174-
done <- s.runFunc(ctx, job)
190+
done <- w.runFunc(ctx, job)
175191
}()
176192

177193
select {
178194
case p := <-panicChan:
179195
panic(p)
180196
case <-ctx.Done(): // timeout reached
181197
return ctx.Err()
182-
case <-s.stop: // shutdown service
198+
case <-w.stop: // shutdown service
183199
// cancel job
184200
cancel()
185201

@@ -199,39 +215,39 @@ func (s *Worker) handle(job queue.Job) error {
199215
}
200216

201217
// Shutdown worker
202-
func (s *Worker) Shutdown() error {
203-
if !atomic.CompareAndSwapInt32(&s.stopFlag, 0, 1) {
218+
func (w *Worker) Shutdown() error {
219+
if !atomic.CompareAndSwapInt32(&w.stopFlag, 0, 1) {
204220
return queue.ErrQueueShutdown
205221
}
206222

207-
s.stopOnce.Do(func() {
208-
s.pubsub.Close()
209-
s.rdb.Close()
210-
close(s.stop)
223+
w.stopOnce.Do(func() {
224+
w.pubsub.Close()
225+
w.rdb.Close()
226+
close(w.stop)
211227
})
212228
return nil
213229
}
214230

215231
// Capacity for channel
216-
func (s *Worker) Capacity() int {
232+
func (w *Worker) Capacity() int {
217233
return 0
218234
}
219235

220236
// Usage for count of channel usage
221-
func (s *Worker) Usage() int {
237+
func (w *Worker) Usage() int {
222238
return 0
223239
}
224240

225241
// Queue send notification to queue
226-
func (s *Worker) Queue(job queue.QueuedMessage) error {
227-
if atomic.LoadInt32(&s.stopFlag) == 1 {
242+
func (w *Worker) Queue(job queue.QueuedMessage) error {
243+
if atomic.LoadInt32(&w.stopFlag) == 1 {
228244
return queue.ErrQueueShutdown
229245
}
230246

231247
ctx := context.Background()
232248

233249
// Publish a message.
234-
err := s.rdb.Publish(ctx, s.channel, job.Bytes()).Err()
250+
err := w.rdb.Publish(ctx, w.channel, job.Bytes()).Err()
235251
if err != nil {
236252
return err
237253
}
@@ -240,57 +256,57 @@ func (s *Worker) Queue(job queue.QueuedMessage) error {
240256
}
241257

242258
// Run start the worker
243-
func (s *Worker) Run() error {
259+
func (w *Worker) Run() error {
244260
// check queue status
245261
select {
246-
case <-s.stop:
262+
case <-w.stop:
247263
return nil
248264
default:
249265
}
250266

251267
var options []redis.ChannelOption
252268
ctx := context.Background()
253269

254-
if s.channelSize > 1 {
255-
options = append(options, redis.WithChannelSize(s.channelSize))
270+
if w.channelSize > 1 {
271+
options = append(options, redis.WithChannelSize(w.channelSize))
256272
}
257273

258-
ch := s.pubsub.Channel(options...)
274+
ch := w.pubsub.Channel(options...)
259275
// make sure the connection is successful
260-
err := s.pubsub.Ping(ctx)
276+
err := w.pubsub.Ping(ctx)
261277
if err != nil {
262278
return err
263279
}
264280

265281
for {
266282
// check queue status
267283
select {
268-
case <-s.stop:
284+
case <-w.stop:
269285
return nil
270286
default:
271287
}
272288

273289
select {
274290
case m, ok := <-ch:
275291
select {
276-
case <-s.stop:
292+
case <-w.stop:
277293
return nil
278294
default:
279295
}
280296

281297
if !ok {
282-
return fmt.Errorf("redis pubsub: channel=%s closed", s.channel)
298+
return fmt.Errorf("redis pubsub: channel=%s closed", w.channel)
283299
}
284300

285301
var data queue.Job
286302
if err := json.Unmarshal([]byte(m.Payload), &data); err != nil {
287-
s.logger.Error("json unmarshal error: ", err)
303+
w.logger.Error("json unmarshal error: ", err)
288304
continue
289305
}
290-
if err := s.handle(data); err != nil {
291-
s.logger.Error("handle job error: ", err)
306+
if err := w.handle(data); err != nil {
307+
w.logger.Error("handle job error: ", err)
292308
}
293-
case <-s.stop:
309+
case <-w.stop:
294310
return nil
295311
}
296312
}

redis_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -362,3 +362,32 @@ func TestJobComplete(t *testing.T) {
362362
assert.Error(t, err)
363363
assert.Equal(t, errors.New("job completed"), err)
364364
}
365+
366+
func TestBusyWorkerCount(t *testing.T) {
367+
job := queue.Job{
368+
Timeout: 500 * time.Millisecond,
369+
Body: []byte("foo"),
370+
}
371+
372+
w := NewWorker(
373+
WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
374+
time.Sleep(200 * time.Millisecond)
375+
return nil
376+
}),
377+
)
378+
379+
assert.Equal(t, uint64(0), w.BusyWorkers())
380+
go func() {
381+
assert.NoError(t, w.handle(job))
382+
}()
383+
go func() {
384+
assert.NoError(t, w.handle(job))
385+
}()
386+
387+
time.Sleep(50 * time.Millisecond)
388+
assert.Equal(t, uint64(2), w.BusyWorkers())
389+
time.Sleep(200 * time.Millisecond)
390+
assert.Equal(t, uint64(0), w.BusyWorkers())
391+
392+
assert.NoError(t, w.Shutdown())
393+
}

0 commit comments

Comments
 (0)