Skip to content

Commit ad50152

Browse files
committed
fix(address): support clustered usage (#40)
* feat: refactor `WithAddr` function to accept multiple addresses - Add `github.com/nats-io/nats.go` as an import - Change the `WithAddr` function signature to accept multiple addresses - Modify the `WithAddr` function to join multiple addresses with commas - Change the default address value to `nats.DefaultURL` Signed-off-by: Bo-Yi Wu <[email protected]>
1 parent 808f40b commit ad50152

File tree

3 files changed

+45
-14
lines changed

3 files changed

+45
-14
lines changed

.github/workflows/go.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,14 @@ jobs:
3333

3434
# Service containers to run with `container-job`
3535
services:
36-
nats:
36+
nats01:
3737
image: nats
3838
ports:
3939
- 4222:4222
40+
nats02:
41+
image: nats
42+
ports:
43+
- 4223:4222
4044

4145
steps:
4246
- name: Set up Go ${{ matrix.go }}

nats_test.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ import (
1212
"github.com/golang-queue/queue"
1313
"github.com/golang-queue/queue/core"
1414
"github.com/golang-queue/queue/job"
15+
"github.com/nats-io/nats.go"
1516

1617
"github.com/stretchr/testify/assert"
1718
"go.uber.org/goleak"
1819
)
1920

20-
var host = "127.0.0.1"
21+
var host = nats.DefaultURL
2122

2223
func TestMain(m *testing.M) {
2324
goleak.VerifyTestMain(m)
@@ -36,7 +37,28 @@ func TestDefaultFlow(t *testing.T) {
3637
Message: "foo",
3738
}
3839
w := NewWorker(
39-
WithAddr(host+":4222"),
40+
WithAddr(host),
41+
WithSubj("test"),
42+
WithQueue("test"),
43+
)
44+
q, err := queue.NewQueue(
45+
queue.WithWorker(w),
46+
queue.WithWorkerCount(1),
47+
)
48+
assert.NoError(t, err)
49+
assert.NoError(t, q.Queue(m))
50+
assert.NoError(t, q.Queue(m))
51+
q.Start()
52+
time.Sleep(500 * time.Millisecond)
53+
q.Release()
54+
}
55+
56+
func TestClusteredHost(t *testing.T) {
57+
m := &mockMessage{
58+
Message: "foo",
59+
}
60+
w := NewWorker(
61+
WithAddr(host, "nats://localhost:4223"),
4062
WithSubj("test"),
4163
WithQueue("test"),
4264
)
@@ -54,7 +76,7 @@ func TestDefaultFlow(t *testing.T) {
5476

5577
func TestShutdown(t *testing.T) {
5678
w := NewWorker(
57-
WithAddr(host+":4222"),
79+
WithAddr(host),
5880
WithSubj("test"),
5981
WithQueue("test"),
6082
)
@@ -76,7 +98,7 @@ func TestCustomFuncAndWait(t *testing.T) {
7698
Message: "foo",
7799
}
78100
w := NewWorker(
79-
WithAddr(host+":4222"),
101+
WithAddr(host),
80102
WithSubj("test"),
81103
WithQueue("test"),
82104
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
@@ -107,7 +129,7 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
107129
Message: "foo",
108130
}
109131
w := NewWorker(
110-
WithAddr(host + ":4222"),
132+
WithAddr(host),
111133
)
112134
q, err := queue.NewQueue(
113135
queue.WithWorker(w),
@@ -129,7 +151,7 @@ func TestJobReachTimeout(t *testing.T) {
129151
Message: "foo",
130152
}
131153
w := NewWorker(
132-
WithAddr(host+":4222"),
154+
WithAddr(host),
133155
WithSubj("JobReachTimeout"),
134156
WithQueue("test"),
135157
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
@@ -167,7 +189,7 @@ func TestCancelJobAfterShutdown(t *testing.T) {
167189
Message: "test",
168190
}
169191
w := NewWorker(
170-
WithAddr(host+":4222"),
192+
WithAddr(host),
171193
WithSubj("CancelJob"),
172194
WithQueue("test"),
173195
WithLogger(queue.NewLogger()),
@@ -206,7 +228,7 @@ func TestGoroutineLeak(t *testing.T) {
206228
Message: "foo",
207229
}
208230
w := NewWorker(
209-
WithAddr(host+":4222"),
231+
WithAddr(host),
210232
WithSubj("GoroutineLeak"),
211233
WithQueue("test"),
212234
WithLogger(queue.NewEmptyLogger()),
@@ -252,7 +274,7 @@ func TestGoroutinePanic(t *testing.T) {
252274
Message: "foo",
253275
}
254276
w := NewWorker(
255-
WithAddr(host+":4222"),
277+
WithAddr(host),
256278
WithSubj("GoroutinePanic"),
257279
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
258280
panic("missing something")
@@ -278,7 +300,7 @@ func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
278300
Payload: []byte("foo"),
279301
}
280302
w := NewWorker(
281-
WithAddr(host+":4222"),
303+
WithAddr(host),
282304
WithSubj("test02"),
283305
WithQueue("test02"),
284306
)

options.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,12 @@ package nats
22

33
import (
44
"context"
5+
"strings"
56

67
"github.com/golang-queue/queue"
78
"github.com/golang-queue/queue/core"
9+
10+
"github.com/nats-io/nats.go"
811
)
912

1013
// Option for queue system
@@ -19,9 +22,11 @@ type options struct {
1922
}
2023

2124
// WithAddr setup the addr of NATS
22-
func WithAddr(addr string) Option {
25+
func WithAddr(addrs ...string) Option {
2326
return func(w *options) {
24-
w.addr = "nats://" + addr
27+
if len(addrs) > 0 {
28+
w.addr = strings.Join(addrs, ",")
29+
}
2530
}
2631
}
2732

@@ -55,7 +60,7 @@ func WithLogger(l queue.Logger) Option {
5560

5661
func newOptions(opts ...Option) options {
5762
defaultOpts := options{
58-
addr: "127.0.0.1:4222",
63+
addr: nats.DefaultURL,
5964
subj: "foobar",
6065
queue: "foobar",
6166
logger: queue.NewLogger(),

0 commit comments

Comments
 (0)