Skip to content

Commit 532bcd9

Browse files
authored
Merge pull request #130 from drev74/test/events
test: add events test
2 parents bce47ac + 6c72201 commit 532bcd9

File tree

3 files changed

+177
-12
lines changed

3 files changed

+177
-12
lines changed

event_test.go

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
package asyncjobs
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"github.com/nats-io/jsm.go"
11+
"github.com/nats-io/nats.go"
12+
. "github.com/onsi/ginkgo/v2"
13+
. "github.com/onsi/gomega"
14+
)
15+
16+
const (
17+
timeout = 1 // sec
18+
topic = "test"
19+
)
20+
21+
var _ = Describe("Events", func() {
22+
BeforeEach(func() {
23+
log.SetOutput(GinkgoWriter)
24+
})
25+
26+
It("Should occur for a compelted task", func() {
27+
withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) {
28+
client, err := NewClient(NatsConn(nc))
29+
Expect(err).ToNot(HaveOccurred())
30+
31+
testCount := 1
32+
firstID := ""
33+
34+
ctx, cancel := context.WithTimeout(context.Background(), timeout*time.Second)
35+
defer cancel()
36+
37+
for range testCount {
38+
task, err := NewTask(topic, map[string]string{"hello": "world"})
39+
Expect(err).ToNot(HaveOccurred())
40+
41+
if firstID == "" {
42+
firstID = task.ID
43+
}
44+
45+
err = client.EnqueueTask(ctx, task)
46+
Expect(err).ToNot(HaveOccurred())
47+
}
48+
49+
router := NewTaskRouter()
50+
router.HandleFunc(topic, func(ctx context.Context, _ Logger, _ *Task) (any, error) {
51+
return "success", nil
52+
})
53+
54+
subscription, err := nc.SubscribeSync(EventsSubjectWildcard)
55+
Expect(err).ToNot(HaveOccurred())
56+
57+
// run client
58+
go func() {
59+
err = client.Run(ctx, router)
60+
Expect(err).ToNot(HaveOccurred())
61+
}()
62+
63+
// run event listener
64+
for {
65+
msg, err := subscription.NextMsg(timeout * time.Second)
66+
if err != nil {
67+
if errors.Is(err, nats.ErrTimeout) {
68+
// it's okay
69+
break
70+
} else {
71+
panic(err)
72+
}
73+
}
74+
75+
event, kind, err := ParseEventJSON(msg.Data)
76+
Expect(err).ToNot(HaveOccurred())
77+
78+
switch e := event.(type) {
79+
case TaskStateChangeEvent:
80+
if e.LastErr == "" {
81+
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State)
82+
} else {
83+
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr)
84+
}
85+
86+
case LeaderElectedEvent:
87+
fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component)
88+
89+
default:
90+
fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind)
91+
}
92+
}
93+
})
94+
})
95+
96+
It("Should occur for a terminated task", func() {
97+
withJetStream(func(nc *nats.Conn, mgr *jsm.Manager) {
98+
client, err := NewClient(NatsConn(nc))
99+
Expect(err).ToNot(HaveOccurred())
100+
101+
testCount := 1
102+
firstID := ""
103+
104+
ctx, cancel := context.WithTimeout(context.Background(), timeout*time.Second)
105+
defer cancel()
106+
107+
for range testCount {
108+
task, err := NewTask(topic, map[string]string{"hello": "world"})
109+
Expect(err).ToNot(HaveOccurred())
110+
111+
if firstID == "" {
112+
firstID = task.ID
113+
}
114+
115+
err = client.EnqueueTask(ctx, task)
116+
Expect(err).ToNot(HaveOccurred())
117+
}
118+
119+
router := NewTaskRouter()
120+
router.HandleFunc(topic, func(ctx context.Context, _ Logger, _ *Task) (any, error) {
121+
return nil, ErrTerminateTask
122+
})
123+
124+
subscription, err := nc.SubscribeSync(EventsSubjectWildcard)
125+
Expect(err).ToNot(HaveOccurred())
126+
127+
// run client
128+
go func() {
129+
err = client.Run(ctx, router)
130+
Expect(err).ToNot(HaveOccurred())
131+
}()
132+
133+
// run event listener
134+
for {
135+
msg, err := subscription.NextMsg(timeout * time.Second)
136+
if err != nil {
137+
if errors.Is(err, nats.ErrTimeout) {
138+
// it's okay
139+
break
140+
} else {
141+
panic(err)
142+
}
143+
}
144+
145+
event, kind, err := ParseEventJSON(msg.Data)
146+
Expect(err).ToNot(HaveOccurred())
147+
148+
switch e := event.(type) {
149+
case TaskStateChangeEvent:
150+
if e.LastErr == "" {
151+
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State)
152+
} else {
153+
fmt.Printf("[%s] %s: queue: %s type: %s tries: %d state: %s error: %s\n", e.TimeStamp.Format("15:04:05"), e.TaskID, e.Queue, e.TaskType, e.Tries, e.State, e.LastErr)
154+
}
155+
156+
case LeaderElectedEvent:
157+
fmt.Printf("[%s] %s: new %s leader\n", e.TimeStamp.Format("15:04:05"), e.Name, e.Component)
158+
159+
default:
160+
fmt.Printf("[%s] Unknown event type %s\n", time.Now().UTC().Format("15:04:05"), kind)
161+
}
162+
}
163+
})
164+
})
165+
})

go.mod

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@ require (
77
github.com/choria-io/fisk v0.7.0
88
github.com/dustin/go-humanize v1.0.1
99
github.com/nats-io/jsm.go v0.2.2
10-
github.com/nats-io/nats-server/v2 v2.11.2
11-
github.com/nats-io/nats.go v1.41.2
10+
github.com/nats-io/nats-server/v2 v2.11.3
11+
github.com/nats-io/nats.go v1.42.0
1212
github.com/onsi/ginkgo/v2 v2.23.4
1313
github.com/onsi/gomega v1.36.3
1414
github.com/prometheus/client_golang v1.22.0
@@ -27,8 +27,8 @@ require (
2727
github.com/go-logr/logr v1.4.2 // indirect
2828
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
2929
github.com/google/go-cmp v0.7.0 // indirect
30-
github.com/google/go-tpm v0.9.3 // indirect
31-
github.com/google/pprof v0.0.0-20250423184734-337e5dd93bb4 // indirect
30+
github.com/google/go-tpm v0.9.4 // indirect
31+
github.com/google/pprof v0.0.0-20250501235452-c0086092b71a // indirect
3232
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
3333
github.com/klauspost/compress v1.18.0 // indirect
3434
github.com/mattn/go-colorable v0.1.14 // indirect

go.sum

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@ github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1v
2525
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
2626
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
2727
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
28-
github.com/google/go-tpm v0.9.3 h1:+yx0/anQuGzi+ssRqeD6WpXjW2L/V0dItUayO0i9sRc=
29-
github.com/google/go-tpm v0.9.3/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
30-
github.com/google/pprof v0.0.0-20250423184734-337e5dd93bb4 h1:gD0vax+4I+mAj+jEChEf25Ia07Jq7kYOFO5PPhAxFl4=
31-
github.com/google/pprof v0.0.0-20250423184734-337e5dd93bb4/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA=
28+
github.com/google/go-tpm v0.9.4 h1:awZRf9FwOeTunQmHoDYSHJps3ie6f1UlhS1fOdPEt1I=
29+
github.com/google/go-tpm v0.9.4/go.mod h1:h9jEsEECg7gtLis0upRBQU+GhYVH6jMjrFxI8u6bVUY=
30+
github.com/google/pprof v0.0.0-20250501235452-c0086092b71a h1:rDA3FfmxwXR+BVKKdz55WwMJ1pD2hJQNW31d+l3mPk4=
31+
github.com/google/pprof v0.0.0-20250501235452-c0086092b71a/go.mod h1:5hDyRhoBCxViHszMt12TnOpEI4VVi+U8Gm9iphldiMA=
3232
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec h1:qv2VnGeEQHchGaZ/u7lxST/RaJw+cv273q79D81Xbog=
3333
github.com/hinshun/vt10x v0.0.0-20220119200601-820417d04eec/go.mod h1:Q48J4R4DvxnHolD5P8pOtXigYlRuPLGl6moFx3ulM68=
3434
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
@@ -58,10 +58,10 @@ github.com/nats-io/jsm.go v0.2.2 h1:mjbAH1AbXFVd1JJqKnC8j7hc4BQw+Tjfayc0+q9nFFc=
5858
github.com/nats-io/jsm.go v0.2.2/go.mod h1:nx5vB1NGm6ilwphW/9lQcGWjcl3wASGQs3xql8UosM0=
5959
github.com/nats-io/jwt/v2 v2.7.4 h1:jXFuDDxs/GQjGDZGhNgH4tXzSUK6WQi2rsj4xmsNOtI=
6060
github.com/nats-io/jwt/v2 v2.7.4/go.mod h1:me11pOkwObtcBNR8AiMrUbtVOUGkqYjMQZ6jnSdVUIA=
61-
github.com/nats-io/nats-server/v2 v2.11.2 h1:k5KBAuRpJW9qAF11Io2txNhR5m1KUmqVkalLAw2yLfk=
62-
github.com/nats-io/nats-server/v2 v2.11.2/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
63-
github.com/nats-io/nats.go v1.41.2 h1:5UkfLAtu/036s99AhFRlyNDI1Ieylb36qbGjJzHixos=
64-
github.com/nats-io/nats.go v1.41.2/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
61+
github.com/nats-io/nats-server/v2 v2.11.3 h1:AbGtXxuwjo0gBroLGGr/dE0vf24kTKdRnBq/3z/Fdoc=
62+
github.com/nats-io/nats-server/v2 v2.11.3/go.mod h1:6Z6Fd+JgckqzKig7DYwhgrE7bJ6fypPHnGPND+DqgMY=
63+
github.com/nats-io/nats.go v1.42.0 h1:ynIMupIOvf/ZWH/b2qda6WGKGNSjwOUutTpWRvAmhaM=
64+
github.com/nats-io/nats.go v1.42.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
6565
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
6666
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
6767
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=

0 commit comments

Comments
 (0)