Skip to content

Commit 3c20ffd

Browse files
committed
docs: Add redis, nats and NSQ example
Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent e77a348 commit 3c20ffd

File tree

1 file changed

+253
-0
lines changed

1 file changed

+253
-0
lines changed

README.md

Lines changed: 253 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,256 @@ func main() {
143143
}
144144
}
145145
```
146+
147+
## Using NSQ as Queue
148+
149+
See the [NSQ documentation](https://github.com/golang-queue/nsq).
150+
151+
```go
152+
package main
153+
154+
import (
155+
"context"
156+
"encoding/json"
157+
"fmt"
158+
"log"
159+
"time"
160+
161+
"github.com/golang-queue/nsq"
162+
"github.com/golang-queue/queue"
163+
)
164+
165+
type job struct {
166+
Message string
167+
}
168+
169+
func (j *job) Bytes() []byte {
170+
b, err := json.Marshal(j)
171+
if err != nil {
172+
panic(err)
173+
}
174+
return b
175+
}
176+
177+
func main() {
178+
taskN := 100
179+
rets := make(chan string, taskN)
180+
181+
// define the worker
182+
w := nsq.NewWorker(
183+
nsq.WithAddr("127.0.0.1:4150"),
184+
nsq.WithTopic("example"),
185+
nsq.WithChannel("foobar"),
186+
// concurrent job number
187+
nsq.WithMaxInFlight(10),
188+
nsq.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
189+
v, ok := m.(*job)
190+
if !ok {
191+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
192+
return err
193+
}
194+
}
195+
196+
rets <- v.Message
197+
return nil
198+
}),
199+
)
200+
201+
// define the queue
202+
q, err := queue.NewQueue(
203+
queue.WithWorkerCount(10),
204+
queue.WithWorker(w),
205+
)
206+
if err != nil {
207+
log.Fatal(err)
208+
}
209+
210+
// start the five worker
211+
q.Start()
212+
213+
// assign tasks in queue
214+
for i := 0; i < taskN; i++ {
215+
go func(i int) {
216+
q.Queue(&job{
217+
Message: fmt.Sprintf("handle the job: %d", i+1),
218+
})
219+
}(i)
220+
}
221+
222+
// wait until all tasks done
223+
for i := 0; i < taskN; i++ {
224+
fmt.Println("message:", <-rets)
225+
time.Sleep(50 * time.Millisecond)
226+
}
227+
228+
// shutdown the service and notify all the worker
229+
q.Release()
230+
}
231+
```
232+
233+
## Using NATs as Queue
234+
235+
See the [NATs documentation](https://github.com/golang-queue/nats)
236+
237+
```go
238+
package main
239+
240+
import (
241+
"context"
242+
"encoding/json"
243+
"fmt"
244+
"log"
245+
"time"
246+
247+
"github.com/golang-queue/nats"
248+
"github.com/golang-queue/queue"
249+
)
250+
251+
type job struct {
252+
Message string
253+
}
254+
255+
func (j *job) Bytes() []byte {
256+
b, err := json.Marshal(j)
257+
if err != nil {
258+
panic(err)
259+
}
260+
return b
261+
}
262+
263+
func main() {
264+
taskN := 100
265+
rets := make(chan string, taskN)
266+
267+
// define the worker
268+
w := nats.NewWorker(
269+
nats.WithAddr("127.0.0.1:4222"),
270+
nats.WithSubj("example"),
271+
nats.WithQueue("foobar"),
272+
nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
273+
v, ok := m.(*job)
274+
if !ok {
275+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
276+
return err
277+
}
278+
}
279+
280+
rets <- v.Message
281+
return nil
282+
}),
283+
)
284+
285+
// define the queue
286+
q, err := queue.NewQueue(
287+
queue.WithWorkerCount(10),
288+
queue.WithWorker(w),
289+
)
290+
if err != nil {
291+
log.Fatal(err)
292+
}
293+
294+
// start the five worker
295+
q.Start()
296+
297+
// assign tasks in queue
298+
for i := 0; i < taskN; i++ {
299+
go func(i int) {
300+
q.Queue(&job{
301+
Message: fmt.Sprintf("handle the job: %d", i+1),
302+
})
303+
}(i)
304+
}
305+
306+
// wait until all tasks done
307+
for i := 0; i < taskN; i++ {
308+
fmt.Println("message:", <-rets)
309+
time.Sleep(50 * time.Millisecond)
310+
}
311+
312+
// shutdown the service and notify all the worker
313+
q.Release()
314+
}
315+
```
316+
317+
## Using Redis(Pub/Sub) as Queue
318+
319+
See the [redis documentation](https://github.com/golang-queue/redisdb)
320+
321+
```go
322+
package main
323+
324+
import (
325+
"context"
326+
"encoding/json"
327+
"fmt"
328+
"log"
329+
"time"
330+
331+
"github.com/golang-queue/queue"
332+
"github.com/golang-queue/redisdb"
333+
)
334+
335+
type job struct {
336+
Message string
337+
}
338+
339+
func (j *job) Bytes() []byte {
340+
b, err := json.Marshal(j)
341+
if err != nil {
342+
panic(err)
343+
}
344+
return b
345+
}
346+
347+
func main() {
348+
taskN := 100
349+
rets := make(chan string, taskN)
350+
351+
// define the worker
352+
w := redisdb.NewWorker(
353+
redisdb.WithAddr("127.0.0.1:6379"),
354+
redisdb.WithChannel("foobar"),
355+
redisdb.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error {
356+
v, ok := m.(*job)
357+
if !ok {
358+
if err := json.Unmarshal(m.Bytes(), &v); err != nil {
359+
return err
360+
}
361+
}
362+
363+
rets <- v.Message
364+
return nil
365+
}),
366+
)
367+
368+
// define the queue
369+
q, err := queue.NewQueue(
370+
queue.WithWorkerCount(10),
371+
queue.WithWorker(w),
372+
)
373+
if err != nil {
374+
log.Fatal(err)
375+
}
376+
377+
// start the five worker
378+
q.Start()
379+
380+
// assign tasks in queue
381+
for i := 0; i < taskN; i++ {
382+
go func(i int) {
383+
q.Queue(&job{
384+
Message: fmt.Sprintf("handle the job: %d", i+1),
385+
})
386+
}(i)
387+
}
388+
389+
// wait until all tasks done
390+
for i := 0; i < taskN; i++ {
391+
fmt.Println("message:", <-rets)
392+
time.Sleep(50 * time.Millisecond)
393+
}
394+
395+
// shutdown the service and notify all the worker
396+
q.Release()
397+
}
398+
```

0 commit comments

Comments
 (0)