File tree Expand file tree Collapse file tree 3 files changed +8
-11
lines changed Expand file tree Collapse file tree 3 files changed +8
-11
lines changed File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change @@ -33,13 +33,10 @@ func main() {
33
33
nats .WithSubj ("example" ),
34
34
nats .WithQueue ("foobar" ),
35
35
nats .WithRunFunc (func (ctx context.Context , m queue.QueuedMessage ) error {
36
- v , ok := m .(* job )
37
- if ! ok {
38
- if err := json .Unmarshal (m .Bytes (), & v ); err != nil {
39
- return err
40
- }
36
+ var v * job
37
+ if err := json .Unmarshal (m .Bytes (), & v ); err != nil {
38
+ return err
41
39
}
42
-
43
40
rets <- v .Message
44
41
return nil
45
42
}),
@@ -60,9 +57,11 @@ func main() {
60
57
// assign tasks in queue
61
58
for i := 0 ; i < taskN ; i ++ {
62
59
go func (i int ) {
63
- q .Queue (& job {
60
+ if err := q .Queue (& job {
64
61
Message : fmt .Sprintf ("handle the job: %d" , i + 1 ),
65
- })
62
+ }); err != nil {
63
+ log .Fatal (err )
64
+ }
66
65
}(i )
67
66
}
68
67
@@ -73,7 +72,5 @@ func main() {
73
72
}
74
73
75
74
// shutdown the service and notify all the worker
76
- q .Shutdown ()
77
- // wait all jobs are complete.
78
- q .Wait ()
75
+ q .Release ()
79
76
}
You can’t perform that action at this time.
0 commit comments