Skip to content

Commit 34c29ca

Browse files
authored
Add ZMQ connection retry configuration (#152)
* Add ZMQ connection retry configuration Signed-off-by: zhengkezhou1 <[email protected]> * add test & update readme Signed-off-by: zhengkezhou1 <[email protected]> * add retries test Signed-off-by: zhengkezhou1 <[email protected]> * more tests & rename Command line parameters Signed-off-by: zhengkezhou1 <[email protected]> --------- Signed-off-by: zhengkezhou1 <[email protected]>
1 parent 03050c7 commit 34c29ca

File tree

8 files changed

+104
-26
lines changed

8 files changed

+104
-26
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ For more details see the <a href="https://docs.vllm.ai/en/stable/getting_started
122122
- `tokenizers-cache-dir`: the directory for caching tokenizers
123123
- `hash-seed`: seed for hash generation (if not set, is read from PYTHONHASHSEED environment variable)
124124
- `zmq-endpoint`: ZMQ address to publish events
125+
- `zmq-max-connect-attempts`: the maximum number of ZMQ connection attempts. defaults to 0. maximum: 10
125126
- `event-batch-size`: the maximum number of kv-cache events to be sent together, defaults to 16
126127
- `fake-metrics`: represents a predefined set of metrics to be sent to Prometheus as a substitute for the actual data. When specified, only these fake metrics will be reported — real metrics and fake metrics will never be reported simultaneously. The set should include values for
127128
- `running-requests`

manifests/invalid-config.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
port: 8001
2+
model: "Qwen/Qwen2-0.5B"
3+
max-num-seqs: 5
4+
mode: "random"
5+
time-to-first-token: 2000
6+
inter-token-latency: 1000
7+
kv-cache-transfer-latency: 100
8+
seed: 100100100
9+
zmq-max-connect-attempts: -111

pkg/common/config.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ type Configuration struct {
126126

127127
// ZMQEndpoint is the ZMQ address to publish events, the default value is tcp://localhost:5557
128128
ZMQEndpoint string `yaml:"zmq-endpoint"`
129+
// ZMQMaxConnectAttempts defines the maximum number (10) of retries when ZMQ connection fails
130+
ZMQMaxConnectAttempts uint `yaml:"zmq-max-connect-attempts"`
129131
// EventBatchSize is the maximum number of kv-cache events to be sent together, defaults to 16
130132
EventBatchSize int `yaml:"event-batch-size"`
131133

@@ -354,6 +356,9 @@ func (c *Configuration) validate() error {
354356
if c.EventBatchSize < 1 {
355357
return errors.New("event batch size cannot less than 1")
356358
}
359+
if c.ZMQMaxConnectAttempts > 10 {
360+
return errors.New("zmq retries times cannot be more than 10")
361+
}
357362

358363
if c.FakeMetrics != nil {
359364
if c.FakeMetrics.RunningRequests < 0 || c.FakeMetrics.WaitingRequests < 0 {
@@ -415,6 +420,7 @@ func ParseCommandParamsAndLoadConfig() (*Configuration, error) {
415420
f.StringVar(&config.TokenizersCacheDir, "tokenizers-cache-dir", config.TokenizersCacheDir, "Directory for caching tokenizers")
416421
f.StringVar(&config.HashSeed, "hash-seed", config.HashSeed, "Seed for hash generation (if not set, is read from PYTHONHASHSEED environment variable)")
417422
f.StringVar(&config.ZMQEndpoint, "zmq-endpoint", config.ZMQEndpoint, "ZMQ address to publish events")
423+
f.UintVar(&config.ZMQMaxConnectAttempts, "zmq-max-connect-attempts", config.ZMQMaxConnectAttempts, "Maximum number of times to retry ZMQ requests")
418424
f.IntVar(&config.EventBatchSize, "event-batch-size", config.EventBatchSize, "Maximum number of kv-cache events to be sent together")
419425

420426
// These values were manually parsed above in getParamValueFromArgs, we leave this in order to get these flags in --help

pkg/common/config_test.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,12 +103,14 @@ var _ = Describe("Simulator configuration", func() {
103103
"{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
104104
}
105105
c.EventBatchSize = 5
106+
c.ZMQMaxConnectAttempts = 1
106107
test = testCase{
107108
name: "config file with command line args",
108109
args: []string{"cmd", "--model", model, "--config", "../../manifests/config.yaml", "--port", "8002",
109110
"--served-model-name", "alias1", "alias2", "--seed", "100",
110111
"--lora-modules", "{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}", "{\"name\":\"lora4\",\"path\":\"/path/to/lora4\"}",
111112
"--event-batch-size", "5",
113+
"--zmq-max-connect-attempts", "1",
112114
},
113115
expectedConfig: c,
114116
}
@@ -121,6 +123,7 @@ var _ = Describe("Simulator configuration", func() {
121123
c.LoraModulesString = []string{
122124
"{\"name\":\"lora3\",\"path\":\"/path/to/lora3\"}",
123125
}
126+
c.ZMQMaxConnectAttempts = 0
124127
test = testCase{
125128
name: "config file with command line args with different format",
126129
args: []string{"cmd", "--model", model, "--config", "../../manifests/config.yaml", "--port", "8002",
@@ -377,6 +380,14 @@ var _ = Describe("Simulator configuration", func() {
377380
args: []string{"cmd", "--fake-metrics", "{\"running-requests\":10,\"waiting-requests\":30,\"kv-cache-usage\":40}",
378381
"--config", "../../manifests/config.yaml"},
379382
},
383+
{
384+
name: "invalid (negative) zmq-max-connect-attempts for argument",
385+
args: []string{"cmd", "zmq-max-connect-attempts", "-1", "--config", "../../manifests/config.yaml"},
386+
},
387+
{
388+
name: "invalid (negative) zmq-max-connect-attempts for config file",
389+
args: []string{"cmd", "--config", "../../manifests/invalid-config.yaml"},
390+
},
380391
}
381392

382393
for _, test := range invalidTests {

pkg/common/publisher.go

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"errors"
2424
"fmt"
2525
"sync/atomic"
26+
"time"
2627

2728
zmq "github.com/pebbe/zmq4"
2829
"github.com/vmihailenco/msgpack/v5"
@@ -38,24 +39,34 @@ type Publisher struct {
3839

3940
// NewPublisher creates a new ZMQ publisher.
4041
// endpoint is the ZMQ address to bind to (e.g., "tcp://*:5557").
41-
func NewPublisher(endpoint string) (*Publisher, error) {
42+
// retries is the maximum number of connection attempts.
43+
func NewPublisher(endpoint string, retries uint) (*Publisher, error) {
4244
socket, err := zmq.NewSocket(zmq.PUB)
4345
if err != nil {
4446
return nil, fmt.Errorf("failed to create ZMQ PUB socket: %w", err)
4547
}
4648

47-
if err := socket.Connect(endpoint); err != nil {
48-
errClose := socket.Close()
49-
return nil, errors.Join(
50-
fmt.Errorf("failed to connect to %s: %w", endpoint, err),
51-
errClose,
52-
)
49+
// Retry connection with specified retry times and intervals
50+
for i := uint(0); i <= retries; i++ {
51+
err = socket.Connect(endpoint)
52+
if err == nil {
53+
return &Publisher{
54+
socket: socket,
55+
endpoint: endpoint,
56+
}, nil
57+
}
58+
59+
// If not the last attempt, wait before retrying
60+
if i < retries {
61+
time.Sleep(1 * time.Second)
62+
}
5363
}
5464

55-
return &Publisher{
56-
socket: socket,
57-
endpoint: endpoint,
58-
}, nil
65+
errClose := socket.Close()
66+
return nil, errors.Join(
67+
fmt.Errorf("failed to connect to %s after %d retries: %w", endpoint, retries+1, err),
68+
errClose,
69+
)
5970
}
6071

6172
// PublishEvent publishes a KV cache event batch to the ZMQ topic.

pkg/common/publisher_test.go

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const (
3333
subEndpoint = "tcp://*:5557"
3434
pubEndpoint = "tcp://localhost:5557"
3535
data = "Hello"
36+
retries = 0
3637
)
3738

3839
var _ = Describe("Publisher", func() {
@@ -50,7 +51,7 @@ var _ = Describe("Publisher", func() {
5051

5152
time.Sleep(100 * time.Millisecond)
5253

53-
pub, err := NewPublisher(pubEndpoint)
54+
pub, err := NewPublisher(pubEndpoint, retries)
5455
Expect(err).NotTo(HaveOccurred())
5556

5657
ctx, cancel := context.WithCancel(context.Background())
@@ -78,4 +79,40 @@ var _ = Describe("Publisher", func() {
7879
Expect(err).NotTo(HaveOccurred())
7980
Expect(payload).To(Equal(data))
8081
})
82+
It("should fail when connection attempts exceed maximum retries", func() {
83+
// Use invalid address format, which will cause connection to fail
84+
invalidEndpoint := "invalid-address-format"
85+
86+
pub, err := NewPublisher(invalidEndpoint, 2)
87+
Expect(err).To(HaveOccurred())
88+
Expect(err.Error()).To(ContainSubstring("failed to connect"))
89+
Expect(err.Error()).To(ContainSubstring("after 3 retries")) // 2 retries = 3 total attempts
90+
91+
if pub != nil {
92+
//nolint
93+
pub.Close()
94+
}
95+
})
96+
It("should retry connection successfully", func() {
97+
// Step 1: Try to connect to a temporarily non-existent service
98+
// This will trigger the retry mechanism
99+
go func() {
100+
// Delay starting the server to simulate service recovery
101+
time.Sleep(2 * time.Second)
102+
103+
// Start subscriber as server
104+
sub, err := zmq.NewSocket(zmq.SUB)
105+
Expect(err).NotTo(HaveOccurred())
106+
//nolint
107+
defer sub.Close()
108+
err = sub.Bind(subEndpoint)
109+
Expect(err).NotTo(HaveOccurred())
110+
}()
111+
112+
// Step 2: Publisher will retry connection and eventually succeed
113+
pub, err := NewPublisher(pubEndpoint, 5) // 5 retries
114+
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
115+
//nolint
116+
defer pub.Close()
117+
})
81118
})

pkg/kv-cache/block_cache.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
4848
// TODO read size of channel from config
4949
eChan := make(chan EventData, 10000)
5050

51-
publisher, err := common.NewPublisher(config.ZMQEndpoint)
51+
publisher, err := common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
5252
if err != nil {
5353
return nil, err
5454
}

pkg/kv-cache/kv_cache_test.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,12 @@ var _ = Describe("KV cache", Ordered, func() {
201201
time.Sleep(300 * time.Millisecond)
202202

203203
config := &common.Configuration{
204-
Port: 1234,
205-
Model: "model",
206-
KVCacheSize: test.cacheSize,
207-
ZMQEndpoint: pubEndpoint,
208-
EventBatchSize: 1,
204+
Port: 1234,
205+
Model: "model",
206+
KVCacheSize: test.cacheSize,
207+
ZMQEndpoint: pubEndpoint,
208+
ZMQMaxConnectAttempts: 3,
209+
EventBatchSize: 1,
209210
}
210211

211212
sub, topic := createSub(config)
@@ -304,10 +305,11 @@ var _ = Describe("KV cache", Ordered, func() {
304305

305306
It("should send events correctly", func() {
306307
config := &common.Configuration{
307-
Port: 1234,
308-
Model: "model",
309-
KVCacheSize: 4,
310-
ZMQEndpoint: pubEndpoint,
308+
Port: 1234,
309+
Model: "model",
310+
KVCacheSize: 4,
311+
ZMQEndpoint: pubEndpoint,
312+
ZMQMaxConnectAttempts: 3,
311313
}
312314

313315
sub, topic := createSub(config)
@@ -413,10 +415,11 @@ var _ = Describe("KV cache", Ordered, func() {
413415
for _, testCase := range testCases {
414416
It(testCase.name, func() {
415417
config := common.Configuration{
416-
Port: 1234,
417-
Model: "model",
418-
KVCacheSize: testCase.cacheSize,
419-
ZMQEndpoint: pubEndpoint,
418+
Port: 1234,
419+
Model: "model",
420+
KVCacheSize: testCase.cacheSize,
421+
ZMQEndpoint: pubEndpoint,
422+
ZMQMaxConnectAttempts: 3,
420423
}
421424
blockCache, err := newBlockCache(&config, GinkgoLogr)
422425
Expect(err).NotTo(HaveOccurred())

0 commit comments

Comments
 (0)