Skip to content

Commit b4e8cbd

Browse files
committed
feat: use unbuffered channel to handle new task
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 9f5bcfd commit b4e8cbd

File tree

5 files changed

+43
-19
lines changed

5 files changed

+43
-19
lines changed

_example/server-client/client/main.go

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import (
66
"fmt"
77
"time"
88

9+
"github.com/appleboy/graceful"
910
"github.com/golang-queue/nats"
1011
"github.com/golang-queue/queue"
12+
"github.com/golang-queue/queue/core"
1113
)
1214

1315
type job struct {
@@ -26,33 +28,51 @@ func main() {
2628
taskN := 10000
2729
rets := make(chan string, taskN)
2830

31+
m := graceful.NewManager()
32+
2933
// define the worker
3034
w := nats.NewWorker(
3135
nats.WithAddr("127.0.0.1:4222"),
3236
nats.WithSubj("example"),
3337
nats.WithQueue("foobar"),
34-
nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
38+
nats.WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
3539
var v *job
3640
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
3741
return err
3842
}
3943
rets <- v.Message
44+
time.Sleep(2 * time.Second)
4045
return nil
4146
}),
4247
)
43-
4448
// define the queue
4549
q := queue.NewPool(
46-
5,
50+
1,
4751
queue.WithWorker(w),
4852
)
4953

50-
// wait until all tasks done
51-
for i := 0; i < taskN; i++ {
52-
fmt.Println("message:", <-rets)
53-
time.Sleep(50 * time.Millisecond)
54-
}
54+
m.AddRunningJob(func(ctx context.Context) error {
55+
for {
56+
select {
57+
case <-ctx.Done():
58+
select {
59+
case m := <-rets:
60+
fmt.Println("message:", m)
61+
default:
62+
}
63+
return nil
64+
case m := <-rets:
65+
fmt.Println("message:", m)
66+
time.Sleep(50 * time.Millisecond)
67+
}
68+
}
69+
})
70+
71+
m.AddShutdownJob(func() error {
72+
// shutdown the service and notify all the worker
73+
q.Release()
74+
return nil
75+
})
5576

56-
// shutdown the service and notify all the worker
57-
q.Release()
77+
<-m.Done()
5878
}

_example/server-client/go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ module example
33
go 1.18
44

55
require (
6+
github.com/appleboy/graceful v0.0.4
67
github.com/golang-queue/nats v0.0.3-0.20210907015837-3e2e4b448b3d
7-
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f
8+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917
89
)
910

1011
require (

_example/server-client/go.sum

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
github.com/appleboy/graceful v0.0.4 h1:Q4LCeq4DFy59qiACLtuH+mSqDERtUzwkQbCWpRaWwvQ=
2+
github.com/appleboy/graceful v0.0.4/go.mod h1:Q2mVx0t+N0lCDZc5MJudbcpTm6cgGM/J2gZCZIqD9dc=
13
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
24
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
3-
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f h1:Wioq3g97ssizNPQsPwdL61DIjePabPaq+XYo7z2t2Oc=
4-
github.com/golang-queue/queue v0.0.13-0.20220403053548-d431277d570f/go.mod h1:KD9age1s6nk8Evz3tfKHsk8k4LwA0htxQ7MS7rJPJzA=
5+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917 h1:+khkGHxQPsad/mfmgizoTc3Jh5UVdCp1OfUkDfW+uDQ=
6+
github.com/golang-queue/queue v0.0.13-0.20220423025512-c4a8df54c917/go.mod h1:g1yxxDl8JMo4gUfxt11fjjU3SXU1ah61EvwshmDoSIs=
7+
github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc=
58
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
69
github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A=
710
github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=

_example/server-client/server/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ func (j *job) Bytes() []byte {
2323
}
2424

2525
func main() {
26-
taskN := 100
26+
taskN := 10
2727

2828
// define the worker
2929
w := nats.NewWorker(

nats.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func NewWorker(opts ...Option) *Worker {
3434
opts: newOptions(opts...),
3535
stop: make(chan struct{}),
3636
exit: make(chan struct{}),
37-
tasks: make(chan *nats.Msg, 1),
37+
tasks: make(chan *nats.Msg),
3838
}
3939

4040
w.client, err = nats.Connect(w.opts.addr)
@@ -59,13 +59,13 @@ func (w *Worker) startConsumer() error {
5959
case w.tasks <- msg:
6060
case <-w.stop:
6161
if msg != nil {
62-
// re-queue the job if worker has been shutdown.
63-
w.opts.logger.Info("re-queue the old job")
62+
// re-queue the task if worker has been shutdown.
63+
w.opts.logger.Info("re-queue the current task")
6464
if err := w.client.Publish(w.opts.subj, msg.Data); err != nil {
65-
panic(err)
65+
w.opts.logger.Errorf("error to re-queue the current task: %s", err.Error())
6666
}
67-
close(w.exit)
6867
}
68+
close(w.exit)
6969
}
7070
})
7171

0 commit comments

Comments
 (0)