|
4 | 4 | [](https://codecov.io/gh/golang-queue/nats)
|
5 | 5 |
|
6 | 6 | NATS as backend with [Queue package](https://github.com/golang-queue/queue) (Connective Technology for Adaptive Edge & Distributed Systems)
|
| 7 | + |
| 8 | +## Testing |
| 9 | + |
| 10 | +```sh |
| 11 | +go test -v ./... |
| 12 | +``` |
| 13 | + |
| 14 | +## Example |
| 15 | + |
| 16 | +```go |
| 17 | +package main |
| 18 | + |
| 19 | +import ( |
| 20 | + "context" |
| 21 | + "encoding/json" |
| 22 | + "fmt" |
| 23 | + "log" |
| 24 | + "time" |
| 25 | + |
| 26 | + "github.com/golang-queue/nats" |
| 27 | + "github.com/golang-queue/queue" |
| 28 | +) |
| 29 | + |
| 30 | +type job struct { |
| 31 | + Message string |
| 32 | +} |
| 33 | + |
| 34 | +func (j *job) Bytes() []byte { |
| 35 | + b, err := json.Marshal(j) |
| 36 | + if err != nil { |
| 37 | + panic(err) |
| 38 | + } |
| 39 | + return b |
| 40 | +} |
| 41 | + |
| 42 | +func main() { |
| 43 | + taskN := 100 |
| 44 | + rets := make(chan string, taskN) |
| 45 | + |
| 46 | + // define the worker |
| 47 | + w := nats.NewWorker( |
| 48 | + nats.WithAddr("127.0.0.1:4222"), |
| 49 | + nats.WithSubj("example"), |
| 50 | + nats.WithQueue("foobar"), |
| 51 | + nats.WithRunFunc(func(ctx context.Context, m queue.QueuedMessage) error { |
| 52 | + var v *job |
| 53 | + if err := json.Unmarshal(m.Bytes(), &v); err != nil { |
| 54 | + return err |
| 55 | + } |
| 56 | + rets <- v.Message |
| 57 | + return nil |
| 58 | + }), |
| 59 | + ) |
| 60 | + |
| 61 | + // define the queue |
| 62 | + q, err := queue.NewQueue( |
| 63 | + queue.WithWorkerCount(10), |
| 64 | + queue.WithWorker(w), |
| 65 | + ) |
| 66 | + if err != nil { |
| 67 | + log.Fatal(err) |
| 68 | + } |
| 69 | + |
| 70 | + // start the five worker |
| 71 | + q.Start() |
| 72 | + |
| 73 | + // assign tasks in queue |
| 74 | + for i := 0; i < taskN; i++ { |
| 75 | + go func(i int) { |
| 76 | + if err := q.Queue(&job{ |
| 77 | + Message: fmt.Sprintf("handle the job: %d", i+1), |
| 78 | + }); err != nil { |
| 79 | + log.Fatal(err) |
| 80 | + } |
| 81 | + }(i) |
| 82 | + } |
| 83 | + |
| 84 | + // wait until all tasks done |
| 85 | + for i := 0; i < taskN; i++ { |
| 86 | + fmt.Println("message:", <-rets) |
| 87 | + time.Sleep(50 * time.Millisecond) |
| 88 | + } |
| 89 | + |
| 90 | + // shutdown the service and notify all the worker |
| 91 | + q.Release() |
| 92 | +} |
| 93 | +``` |
0 commit comments