Skip to content

Commit 33e7210

Browse files
authored
Use dynamic ports in zmq tests (#170)
* Use dynamic ports in zmq tests Signed-off-by: Qifan Deng <[email protected]> * Use consistent endpoint in zmq test cases Signed-off-by: Qifan Deng <[email protected]> * Make zmq endpoint constant in test cases Signed-off-by: Qifan Deng <[email protected]> * Not publish even if publisher is nil in kveventsender Signed-off-by: Qifan Deng <[email protected]> * Use endpoint in publisher retry test Signed-off-by: Qifan Deng <[email protected]> * Rename dynamic endpoint as wildcardEndpoint Signed-off-by: Qifan Deng <[email protected]> * Validae publisher inside switch cases Signed-off-by: Qifan Deng <[email protected]> --------- Signed-off-by: Qifan Deng <[email protected]>
1 parent b7d5873 commit 33e7210

File tree

5 files changed

+53
-27
lines changed

5 files changed

+53
-27
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ vendor
44
.vscode
55
.devcontainer
66
# MacOSX
7-
.DS_Store
7+
.DS_Store
8+
*.test

pkg/common/publisher_test.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,10 @@ import (
2929
)
3030

3131
const (
32-
topic = "test-topic"
33-
subEndpoint = "tcp://*:5557"
34-
pubEndpoint = "tcp://localhost:5557"
35-
data = "Hello"
36-
retries = 0
32+
wildcardEndpoint = "tcp://*:*"
33+
topic = "test-topic"
34+
data = "Hello"
35+
retries = 0
3736
)
3837

3938
var _ = Describe("Publisher", func() {
@@ -42,7 +41,9 @@ var _ = Describe("Publisher", func() {
4241
Expect(err).NotTo(HaveOccurred())
4342
sub, err := zctx.NewSocket(zmq.SUB)
4443
Expect(err).NotTo(HaveOccurred())
45-
err = sub.Bind(subEndpoint)
44+
err = sub.Bind(wildcardEndpoint)
45+
Expect(err).NotTo(HaveOccurred())
46+
endpoint, err := sub.GetLastEndpoint()
4647
Expect(err).NotTo(HaveOccurred())
4748
err = sub.SetSubscribe(topic)
4849
Expect(err).NotTo(HaveOccurred())
@@ -51,7 +52,7 @@ var _ = Describe("Publisher", func() {
5152

5253
time.Sleep(100 * time.Millisecond)
5354

54-
pub, err := NewPublisher(pubEndpoint, retries)
55+
pub, err := NewPublisher(endpoint, retries)
5556
Expect(err).NotTo(HaveOccurred())
5657

5758
ctx, cancel := context.WithCancel(context.Background())
@@ -94,24 +95,37 @@ var _ = Describe("Publisher", func() {
9495
}
9596
})
9697
It("should retry connection successfully", func() {
98+
// Get ephemeral endpoint
99+
sub, err := zmq.NewSocket(zmq.SUB)
100+
Expect(err).NotTo(HaveOccurred())
101+
err = sub.Bind(wildcardEndpoint)
102+
Expect(err).NotTo(HaveOccurred())
103+
endpoint, err := sub.GetLastEndpoint()
104+
Expect(err).NotTo(HaveOccurred())
105+
97106
// Step 1: Try to connect to a temporarily non-existent service
98107
// This will trigger the retry mechanism
99-
go func() {
108+
go func(sub *zmq.Socket, endpoint string) {
109+
// Delay releasing the ephemeral addr
110+
time.Sleep(1950 * time.Millisecond)
111+
err := sub.Close()
112+
Expect(err).NotTo(HaveOccurred())
113+
100114
// Delay starting the server to simulate service recovery
101115
time.Sleep(2 * time.Second)
102116

103117
// Start subscriber as server
104-
sub, err := zmq.NewSocket(zmq.SUB)
118+
sub, err = zmq.NewSocket(zmq.SUB)
105119
Expect(err).NotTo(HaveOccurred())
106120
//nolint
107121
defer sub.Close()
108-
err = sub.Bind(subEndpoint)
109122
Expect(err).NotTo(HaveOccurred())
110-
}()
111-
123+
err = sub.Bind(endpoint)
124+
Expect(err).NotTo(HaveOccurred())
125+
}(sub, endpoint)
112126
// Step 2: Publisher will retry connection and eventually succeed
113-
pub, err := NewPublisher(pubEndpoint, 5) // 5 retries
114-
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
127+
pub, err := NewPublisher(endpoint, 5) // 5 retries
128+
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
115129
//nolint
116130
defer pub.Close()
117131
})

pkg/kv-cache/block_cache.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,13 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
4848
// TODO read size of channel from config
4949
eChan := make(chan EventData, 10000)
5050

51-
publisher, err := common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
52-
if err != nil {
53-
return nil, err
51+
var publisher *common.Publisher
52+
var err error
53+
if config.ZMQEndpoint != "" {
54+
publisher, err = common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
55+
if err != nil {
56+
return nil, err
57+
}
5458
}
5559

5660
return &blockCache{

pkg/kv-cache/kv_cache_sender.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ func (s *KVEventSender) Run(ctx context.Context) error {
8888
return nil
8989
}
9090

91+
if s.publisher == nil {
92+
continue
93+
}
94+
9195
// Encode eventData's hash value to msgpack.RawMessage
9296
var payload []byte
9397
var err error
@@ -120,6 +124,9 @@ func (s *KVEventSender) Run(ctx context.Context) error {
120124
}
121125

122126
case <-timer.C:
127+
if s.publisher == nil {
128+
continue
129+
}
123130
if err := s.publishHelper(ctx); err != nil {
124131
return err
125132
}

pkg/kv-cache/kv_cache_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,10 @@ import (
3333
)
3434

3535
const (
36-
req1ID = "req1"
37-
req2ID = "req2"
38-
req3ID = "req3"
39-
subEndpoint = "tcp://*:5557"
40-
pubEndpoint = "tcp://localhost:5557"
36+
req1ID = "req1"
37+
req2ID = "req2"
38+
req3ID = "req3"
39+
wildcardEndpoint = "tcp://*:*"
4140
)
4241

4342
type ActionType int
@@ -204,7 +203,6 @@ var _ = Describe("KV cache", Ordered, func() {
204203
Port: 1234,
205204
Model: "model",
206205
KVCacheSize: test.cacheSize,
207-
ZMQEndpoint: pubEndpoint,
208206
ZMQMaxConnectAttempts: 3,
209207
EventBatchSize: 1,
210208
}
@@ -308,7 +306,6 @@ var _ = Describe("KV cache", Ordered, func() {
308306
Port: 1234,
309307
Model: "model",
310308
KVCacheSize: 4,
311-
ZMQEndpoint: pubEndpoint,
312309
ZMQMaxConnectAttempts: 3,
313310
}
314311

@@ -418,7 +415,6 @@ var _ = Describe("KV cache", Ordered, func() {
418415
Port: 1234,
419416
Model: "model",
420417
KVCacheSize: testCase.cacheSize,
421-
ZMQEndpoint: pubEndpoint,
422418
ZMQMaxConnectAttempts: 3,
423419
}
424420
blockCache, err := newBlockCache(&config, GinkgoLogr)
@@ -535,8 +531,12 @@ func createSub(config *common.Configuration) (*zmq.Socket, string) {
535531
Expect(err).NotTo(HaveOccurred())
536532
sub, err := zctx.NewSocket(zmq.SUB)
537533
Expect(err).NotTo(HaveOccurred())
538-
err = sub.Bind(subEndpoint)
534+
err = sub.Bind(wildcardEndpoint)
539535
Expect(err).NotTo(HaveOccurred())
536+
// get the actual port
537+
endpoint, err := sub.GetLastEndpoint()
538+
Expect(err).NotTo(HaveOccurred())
539+
config.ZMQEndpoint = endpoint
540540
topic := createTopic(config)
541541
err = sub.SetSubscribe(topic)
542542
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)