@@ -42,81 +42,81 @@ func (j *job) Bytes() []byte {
42
42
The second step to create the new worker, use the buffered channel as an example, you can use the ` stop ` channel to terminate the job immediately after shutdown the queue service if need.
43
43
44
44
``` go
45
- // define the worker
46
- w := simple.NewWorker (
47
- simple.WithQueueNum (taskN),
48
- simple.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
49
- v , ok := m.(*job)
50
- if !ok {
51
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
52
- return err
53
- }
45
+ // define the worker
46
+ w := simple.NewWorker (
47
+ simple.WithQueueNum (taskN),
48
+ simple.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
49
+ v , ok := m.(*job)
50
+ if !ok {
51
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
52
+ return err
54
53
}
54
+ }
55
55
56
- rets <- v.Message
57
- return nil
58
- }),
59
- )
56
+ rets <- v.Message
57
+ return nil
58
+ }),
59
+ )
60
60
```
61
61
62
62
or use the [ NSQ] ( https://nsq.io/ ) as backend, see the worker example:
63
63
64
64
``` go
65
- // define the worker
66
- w := nsq.NewWorker (
67
- nsq.WithAddr (" 127.0.0.1:4150" ),
68
- nsq.WithTopic (" example" ),
69
- nsq.WithChannel (" foobar" ),
70
- // concurrent job number
71
- nsq.WithMaxInFlight (10 ),
72
- nsq.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
73
- v , ok := m.(*job)
74
- if !ok {
75
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
76
- return err
77
- }
65
+ // define the worker
66
+ w := nsq.NewWorker (
67
+ nsq.WithAddr (" 127.0.0.1:4150" ),
68
+ nsq.WithTopic (" example" ),
69
+ nsq.WithChannel (" foobar" ),
70
+ // concurrent job number
71
+ nsq.WithMaxInFlight (10 ),
72
+ nsq.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
73
+ v , ok := m.(*job)
74
+ if !ok {
75
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
76
+ return err
78
77
}
78
+ }
79
79
80
- rets <- v.Message
81
- return nil
82
- }),
83
- )
80
+ rets <- v.Message
81
+ return nil
82
+ }),
83
+ )
84
84
```
85
85
86
86
The third step to create a queue and initialize multiple workers, receive all job messages:
87
87
88
88
``` go
89
- // define the queue
90
- q , err := queue.NewQueue (
91
- queue.WithWorkerCount (5 ),
92
- queue.WithWorker (w),
93
- )
94
- if err != nil {
95
- log.Fatal (err)
96
- }
89
+ // define the queue
90
+ q , err := queue.NewQueue (
91
+ queue.WithWorkerCount (5 ),
92
+ queue.WithWorker (w),
93
+ )
94
+ if err != nil {
95
+ log.Fatal (err)
96
+ }
97
97
98
- // start the five worker
99
- q.Start ()
98
+ // start the five worker
99
+ q.Start ()
100
100
101
- // assign tasks in queue
102
- for i := 0 ; i < taskN; i++ {
103
- go func (i int ) {
104
- q.Queue (&job{
105
- Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
106
- })
107
- }(i)
108
- }
101
+ // assign tasks in queue
102
+ for i := 0 ; i < taskN; i++ {
103
+ go func (i int ) {
104
+ q.Queue (&job{
105
+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
106
+ })
107
+ }(i)
108
+ }
109
109
110
- // wait until all tasks done
111
- for i := 0 ; i < taskN; i++ {
112
- fmt.Println (" message:" , <- rets)
113
- time.Sleep (50 * time.Millisecond )
114
- }
110
+ // wait until all tasks done
111
+ for i := 0 ; i < taskN; i++ {
112
+ fmt.Println (" message:" , <- rets)
113
+ time.Sleep (50 * time.Millisecond )
114
+ }
115
115
116
- // shutdown the service and notify all the worker
117
- q.Shutdown ()
118
- // wait all jobs are complete.
119
- q.Wait ()
116
+ // shutdown the service and notify all the worker
117
+ q.Shutdown ()
118
+ // wait all jobs are complete.
119
+ q.Wait ()
120
120
```
121
121
122
122
Full example code as below or [ try it in playground] ( https://play.golang.org/p/yaTUoYxdcaK ) .
0 commit comments