@@ -31,92 +31,92 @@ The first step to create a new job as `QueueMessage` interface:
31
31
32
32
``` go
33
33
type job struct {
34
- Message string
34
+ Message string
35
35
}
36
36
37
37
func (j *job ) Bytes () []byte {
38
- return []byte (j.Message )
38
+ return []byte (j.Message )
39
39
}
40
40
```
41
41
42
42
The second step to create the new worker, use the buffered channel as an example, you can use the ` stop ` channel to terminate the job immediately after shutdown the queue service if need.
43
43
44
44
``` go
45
- // define the worker
46
- w := simple.NewWorker (
47
- simple.WithQueueNum (taskN),
48
- simple.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
49
- v , ok := m.(*job)
50
- if !ok {
51
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
52
- return err
53
- }
54
- }
55
-
56
- rets <- v.Message
57
- return nil
58
- }),
59
- )
45
+ // define the worker
46
+ w := simple.NewWorker (
47
+ simple.WithQueueNum (taskN),
48
+ simple.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
49
+ v , ok := m.(*job)
50
+ if !ok {
51
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
52
+ return err
53
+ }
54
+ }
55
+
56
+ rets <- v.Message
57
+ return nil
58
+ }),
59
+ )
60
60
```
61
61
62
62
or use the [ NSQ] ( https://nsq.io/ ) as backend, see the worker example:
63
63
64
64
``` go
65
- // define the worker
66
- w := nsq.NewWorker (
67
- nsq.WithAddr (" 127.0.0.1:4150" ),
68
- nsq.WithTopic (" example" ),
69
- nsq.WithChannel (" foobar" ),
70
- // concurrent job number
71
- nsq.WithMaxInFlight (10 ),
72
- nsq.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
73
- v , ok := m.(*job)
74
- if !ok {
75
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
76
- return err
77
- }
78
- }
79
-
80
- rets <- v.Message
81
- return nil
82
- }),
83
- )
65
+ // define the worker
66
+ w := nsq.NewWorker (
67
+ nsq.WithAddr (" 127.0.0.1:4150" ),
68
+ nsq.WithTopic (" example" ),
69
+ nsq.WithChannel (" foobar" ),
70
+ // concurrent job number
71
+ nsq.WithMaxInFlight (10 ),
72
+ nsq.WithRunFunc (func (m queue.QueuedMessage , stop <- chan struct {}) error {
73
+ v , ok := m.(*job)
74
+ if !ok {
75
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
76
+ return err
77
+ }
78
+ }
79
+
80
+ rets <- v.Message
81
+ return nil
82
+ }),
83
+ )
84
84
```
85
85
86
86
The third step to create a queue and initialize multiple workers, receive all job messages:
87
87
88
88
``` go
89
- // define the queue
90
- q , err := queue.NewQueue (
91
- queue.WithWorkerCount (5 ),
92
- queue.WithWorker (w),
93
- )
94
- if err != nil {
95
- log.Fatal (err)
96
- }
97
-
98
- // start the five worker
99
- q.Start ()
100
-
101
- // assign tasks in queue
102
- for i := 0 ; i < taskN; i++ {
103
- go func (i int ) {
104
- q.Queue (&job{
105
- Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
106
- })
107
- }(i)
108
- }
109
-
110
- // wait until all tasks done
111
- for i := 0 ; i < taskN; i++ {
112
- fmt.Println (" message:" , <- rets)
113
- time.Sleep (50 * time.Millisecond )
114
- }
115
-
116
- // shutdown the service and notify all the worker
117
- q.Shutdown ()
118
- // wait all jobs are complete.
119
- q.Wait ()
89
+ // define the queue
90
+ q , err := queue.NewQueue (
91
+ queue.WithWorkerCount (5 ),
92
+ queue.WithWorker (w),
93
+ )
94
+ if err != nil {
95
+ log.Fatal (err)
96
+ }
97
+
98
+ // start the five worker
99
+ q.Start ()
100
+
101
+ // assign tasks in queue
102
+ for i := 0 ; i < taskN; i++ {
103
+ go func (i int ) {
104
+ q.Queue (&job{
105
+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
106
+ })
107
+ }(i)
108
+ }
109
+
110
+ // wait until all tasks done
111
+ for i := 0 ; i < taskN; i++ {
112
+ fmt.Println (" message:" , <- rets)
113
+ time.Sleep (50 * time.Millisecond )
114
+ }
115
+
116
+ // shutdown the service and notify all the worker
117
+ q.Shutdown ()
118
+ // wait all jobs are complete.
119
+ q.Wait ()
120
120
```
121
121
122
122
Full example code as below or [ try it in playground] ( https://play.golang.org/p/yaTUoYxdcaK ) .
@@ -125,73 +125,73 @@ Full example code as below or [try it in playground](https://play.golang.org/p/y
125
125
package main
126
126
127
127
import (
128
- " encoding/json"
129
- " fmt"
130
- " log"
131
- " time"
128
+ " encoding/json"
129
+ " fmt"
130
+ " log"
131
+ " time"
132
132
133
- " github.com/appleboy/queue"
134
- " github.com/appleboy/queue/simple"
133
+ " github.com/appleboy/queue"
134
+ " github.com/appleboy/queue/simple"
135
135
)
136
136
137
137
type job struct {
138
- Message string
138
+ Message string
139
139
}
140
140
141
141
func (j *job ) Bytes () []byte {
142
- return []byte (j.Message )
142
+ return []byte (j.Message )
143
143
}
144
144
145
145
func main () {
146
- taskN := 100
147
- rets := make (chan string , taskN)
148
-
149
- // define the worker
150
- w := simple.NewWorker (
151
- simple.WithQueueNum (taskN),
152
- simple.WithRunFunc (func (m queue.QueuedMessage , _ <- chan struct {}) error {
153
- v , ok := m.(*job)
154
- if !ok {
155
- if err := json.Unmarshal (m.Bytes (), &v); err != nil {
156
- return err
157
- }
158
- }
159
-
160
- rets <- v.Message
161
- return nil
162
- }),
163
- )
164
-
165
- // define the queue
166
- q , err := queue.NewQueue (
167
- queue.WithWorkerCount (5 ),
168
- queue.WithWorker (w),
169
- )
170
- if err != nil {
171
- log.Fatal (err)
172
- }
173
-
174
- // start the five worker
175
- q.Start ()
176
-
177
- // assign tasks in queue
178
- for i := 0 ; i < taskN; i++ {
179
- go func (i int ) {
180
- q.Queue (&job{
181
- Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
182
- })
183
- }(i)
184
- }
185
-
186
- // wait until all tasks done
187
- for i := 0 ; i < taskN; i++ {
188
- fmt.Println (" message:" , <- rets)
189
- time.Sleep (50 * time.Millisecond )
190
- }
191
-
192
- // shutdown the service and notify all the worker
193
- q.Shutdown ()
194
- // wait all jobs are complete.
195
- q.Wait ()
146
+ taskN := 100
147
+ rets := make (chan string , taskN)
148
+
149
+ // define the worker
150
+ w := simple.NewWorker (
151
+ simple.WithQueueNum (taskN),
152
+ simple.WithRunFunc (func (m queue.QueuedMessage , _ <- chan struct {}) error {
153
+ v , ok := m.(*job)
154
+ if !ok {
155
+ if err := json.Unmarshal (m.Bytes (), &v); err != nil {
156
+ return err
157
+ }
158
+ }
159
+
160
+ rets <- v.Message
161
+ return nil
162
+ }),
163
+ )
164
+
165
+ // define the queue
166
+ q , err := queue.NewQueue (
167
+ queue.WithWorkerCount (5 ),
168
+ queue.WithWorker (w),
169
+ )
170
+ if err != nil {
171
+ log.Fatal (err)
172
+ }
173
+
174
+ // start the five worker
175
+ q.Start ()
176
+
177
+ // assign tasks in queue
178
+ for i := 0 ; i < taskN; i++ {
179
+ go func (i int ) {
180
+ q.Queue (&job{
181
+ Message: fmt.Sprintf (" handle the job: %d " , i+1 ),
182
+ })
183
+ }(i)
184
+ }
185
+
186
+ // wait until all tasks done
187
+ for i := 0 ; i < taskN; i++ {
188
+ fmt.Println (" message:" , <- rets)
189
+ time.Sleep (50 * time.Millisecond )
190
+ }
191
+
192
+ // shutdown the service and notify all the worker
193
+ q.Shutdown ()
194
+ // wait all jobs are complete.
195
+ q.Wait ()
196
196
}
197
197
```
0 commit comments