Skip to content

Commit cb13b03

Browse files
committed
test: refactor tests to use NATS container and update worker endpoints
- Add additional NATS ports for clustering and monitoring in `setupNatsContainer` - Initialize context and setup NATS container in `TestCustomFuncAndWait` - Update worker address to use endpoint in `TestCustomFuncAndWait` - Initialize context and setup NATS container in `TestEnqueueJobAfterShutdown` - Update worker address to use endpoint in `TestEnqueueJobAfterShutdown` - Initialize context and setup NATS container in `TestJobReachTimeout` - Update worker address to use endpoint in `TestJobReachTimeout` - Initialize context and setup NATS container in `TestCancelJobAfterShutdown` - Update worker address to use endpoint in `TestCancelJobAfterShutdown` - Initialize context and setup NATS container in `TestGoroutineLeak` - Update worker address to use endpoint in `TestGoroutineLeak` - Initialize context and setup NATS container in `TestGoroutinePanic` - Update worker address to use endpoint in `TestGoroutinePanic` - Initialize context and setup NATS container in `TestReQueueTaskInWorkerBeforeShutdown` - Update worker address to use endpoint in `TestReQueueTaskInWorkerBeforeShutdown` Signed-off-by: appleboy <[email protected]>
1 parent 372c48b commit cb13b03

File tree

1 file changed

+35
-10
lines changed

1 file changed

+35
-10
lines changed

nats_test.go

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,13 @@ func (m mockMessage) Bytes() []byte {
3737

3838
func setupNatsContainer(ctx context.Context, t *testing.T) (testcontainers.Container, string) {
3939
req := testcontainers.ContainerRequest{
40-
Image: "nats:2.10",
41-
ExposedPorts: []string{"4222/tcp"},
42-
WaitingFor: wait.ForLog("Server is ready"),
40+
Image: "nats:2.10",
41+
ExposedPorts: []string{
42+
"4222/tcp", // client port
43+
"6222/tcp", // cluster port
44+
"8222/tcp", // monitoring port
45+
},
46+
WaitingFor: wait.ForLog("Server is ready"),
4347
}
4448
redisC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
4549
ContainerRequest: req,
@@ -119,11 +123,14 @@ func TestShutdown(t *testing.T) {
119123
}
120124

121125
func TestCustomFuncAndWait(t *testing.T) {
126+
ctx := context.Background()
127+
natsC, endpoint := setupNatsContainer(ctx, t)
128+
defer testcontainers.CleanupContainer(t, natsC)
122129
m := &mockMessage{
123130
Message: "foo",
124131
}
125132
w := NewWorker(
126-
WithAddr(host),
133+
WithAddr(endpoint),
127134
WithSubj("test"),
128135
WithQueue("test"),
129136
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
@@ -150,11 +157,14 @@ func TestCustomFuncAndWait(t *testing.T) {
150157
}
151158

152159
func TestEnqueueJobAfterShutdown(t *testing.T) {
160+
ctx := context.Background()
161+
natsC, endpoint := setupNatsContainer(ctx, t)
162+
defer testcontainers.CleanupContainer(t, natsC)
153163
m := mockMessage{
154164
Message: "foo",
155165
}
156166
w := NewWorker(
157-
WithAddr(host),
167+
WithAddr(endpoint),
158168
)
159169
q, err := queue.NewQueue(
160170
queue.WithWorker(w),
@@ -172,11 +182,14 @@ func TestEnqueueJobAfterShutdown(t *testing.T) {
172182
}
173183

174184
func TestJobReachTimeout(t *testing.T) {
185+
ctx := context.Background()
186+
natsC, endpoint := setupNatsContainer(ctx, t)
187+
defer testcontainers.CleanupContainer(t, natsC)
175188
m := mockMessage{
176189
Message: "foo",
177190
}
178191
w := NewWorker(
179-
WithAddr(host),
192+
WithAddr(endpoint),
180193
WithSubj("JobReachTimeout"),
181194
WithQueue("test"),
182195
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
@@ -212,11 +225,14 @@ func TestJobReachTimeout(t *testing.T) {
212225
}
213226

214227
func TestCancelJobAfterShutdown(t *testing.T) {
228+
ctx := context.Background()
229+
natsC, endpoint := setupNatsContainer(ctx, t)
230+
defer testcontainers.CleanupContainer(t, natsC)
215231
m := mockMessage{
216232
Message: "test",
217233
}
218234
w := NewWorker(
219-
WithAddr(host),
235+
WithAddr(endpoint),
220236
WithSubj("CancelJob"),
221237
WithQueue("test"),
222238
WithLogger(queue.NewLogger()),
@@ -253,11 +269,14 @@ func TestCancelJobAfterShutdown(t *testing.T) {
253269
}
254270

255271
func TestGoroutineLeak(t *testing.T) {
272+
ctx := context.Background()
273+
natsC, endpoint := setupNatsContainer(ctx, t)
274+
defer testcontainers.CleanupContainer(t, natsC)
256275
m := mockMessage{
257276
Message: "foo",
258277
}
259278
w := NewWorker(
260-
WithAddr(host),
279+
WithAddr(endpoint),
261280
WithSubj("GoroutineLeak"),
262281
WithQueue("test"),
263282
WithLogger(queue.NewEmptyLogger()),
@@ -299,11 +318,14 @@ func TestGoroutineLeak(t *testing.T) {
299318
}
300319

301320
func TestGoroutinePanic(t *testing.T) {
321+
ctx := context.Background()
322+
natsC, endpoint := setupNatsContainer(ctx, t)
323+
defer testcontainers.CleanupContainer(t, natsC)
302324
m := mockMessage{
303325
Message: "foo",
304326
}
305327
w := NewWorker(
306-
WithAddr(host),
328+
WithAddr(endpoint),
307329
WithSubj("GoroutinePanic"),
308330
WithRunFunc(func(ctx context.Context, m core.QueuedMessage) error {
309331
panic("missing something")
@@ -325,11 +347,14 @@ func TestGoroutinePanic(t *testing.T) {
325347
}
326348

327349
func TestReQueueTaskInWorkerBeforeShutdown(t *testing.T) {
350+
ctx := context.Background()
351+
natsC, endpoint := setupNatsContainer(ctx, t)
352+
defer testcontainers.CleanupContainer(t, natsC)
328353
job := &job.Message{
329354
Payload: []byte("foo"),
330355
}
331356
w := NewWorker(
332-
WithAddr(host),
357+
WithAddr(endpoint),
333358
WithSubj("test02"),
334359
WithQueue("test02"),
335360
)

0 commit comments

Comments
 (0)