Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ vendor
.vscode
.devcontainer
# MacOSX
.DS_Store
.DS_Store
*.test
42 changes: 28 additions & 14 deletions pkg/common/publisher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,10 @@ import (
)

const (
topic = "test-topic"
subEndpoint = "tcp://*:5557"
pubEndpoint = "tcp://localhost:5557"
data = "Hello"
retries = 0
endpoint = "tcp://*:*"
topic = "test-topic"
data = "Hello"
retries = 0
)

var _ = Describe("Publisher", func() {
Expand All @@ -42,7 +41,9 @@ var _ = Describe("Publisher", func() {
Expect(err).NotTo(HaveOccurred())
sub, err := zctx.NewSocket(zmq.SUB)
Expect(err).NotTo(HaveOccurred())
err = sub.Bind(subEndpoint)
err = sub.Bind(endpoint)
Expect(err).NotTo(HaveOccurred())
endpoint, err := sub.GetLastEndpoint()
Expect(err).NotTo(HaveOccurred())
err = sub.SetSubscribe(topic)
Expect(err).NotTo(HaveOccurred())
Expand All @@ -51,7 +52,7 @@ var _ = Describe("Publisher", func() {

time.Sleep(100 * time.Millisecond)

pub, err := NewPublisher(pubEndpoint, retries)
pub, err := NewPublisher(endpoint, retries)
Expect(err).NotTo(HaveOccurred())

ctx, cancel := context.WithCancel(context.Background())
Expand Down Expand Up @@ -94,24 +95,37 @@ var _ = Describe("Publisher", func() {
}
})
It("should retry connection successfully", func() {
// Get ephemeral endpoint
sub, err := zmq.NewSocket(zmq.SUB)
Expect(err).NotTo(HaveOccurred())
err = sub.Bind(endpoint)
Expect(err).NotTo(HaveOccurred())
endpoint, err := sub.GetLastEndpoint()
Expect(err).NotTo(HaveOccurred())

// Step 1: Try to connect to a temporarily non-existent service
// This will trigger the retry mechanism
go func() {
go func(sub *zmq.Socket, endpoint string) {
// Delay releasing the ephemeral addr
time.Sleep(1950 * time.Millisecond)
err := sub.Close()
Expect(err).NotTo(HaveOccurred())

// Delay starting the server to simulate service recovery
time.Sleep(2 * time.Second)

// Start subscriber as server
sub, err := zmq.NewSocket(zmq.SUB)
sub, err = zmq.NewSocket(zmq.SUB)
Expect(err).NotTo(HaveOccurred())
//nolint
defer sub.Close()
err = sub.Bind(subEndpoint)
Expect(err).NotTo(HaveOccurred())
}()

err = sub.Bind(endpoint)
Expect(err).NotTo(HaveOccurred())
}(sub, endpoint)
// Step 2: Publisher will retry connection and eventually succeed
pub, err := NewPublisher(pubEndpoint, 5) // 5 retries
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
pub, err := NewPublisher(endpoint, 5) // 5 retries
Expect(err).NotTo(HaveOccurred()) // Should eventually succeed
//nolint
defer pub.Close()
})
Expand Down
10 changes: 7 additions & 3 deletions pkg/kv-cache/block_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,13 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach
// TODO read size of channel from config
eChan := make(chan EventData, 10000)

publisher, err := common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
if err != nil {
return nil, err
var publisher *common.Publisher
var err error
if config.ZMQEndpoint != "" {
publisher, err = common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts)
if err != nil {
return nil, err
}
}

return &blockCache{
Expand Down
5 changes: 5 additions & 0 deletions pkg/kv-cache/kv_cache_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ func (s *KVEventSender) publishHelper(ctx context.Context) error {
DataParallelRank: &dpRank,
}

if s.publisher == nil {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be in Run() in two cases:
case eventData, ok := <-s.eventChan:
after if !ok{} please add in line 90
if s.publisher == nil { continue }
and
case <-timer.C:
please add the same check first thing in this case.

I don't think we need a log message here, there can be too many.

s.logger.Info("No publisher configured, skip publishing event batch", "topic", s.topic)
return nil
}

err := s.publisher.PublishEvent(ctx, s.topic, eventBatch)

// reset batch
Expand Down
18 changes: 9 additions & 9 deletions pkg/kv-cache/kv_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ import (
)

const (
req1ID = "req1"
req2ID = "req2"
req3ID = "req3"
subEndpoint = "tcp://*:5557"
pubEndpoint = "tcp://localhost:5557"
req1ID = "req1"
req2ID = "req2"
req3ID = "req3"
endpoint = "tcp://*:*"
)

type ActionType int
Expand Down Expand Up @@ -204,7 +203,6 @@ var _ = Describe("KV cache", Ordered, func() {
Port: 1234,
Model: "model",
KVCacheSize: test.cacheSize,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
EventBatchSize: 1,
}
Expand Down Expand Up @@ -308,7 +306,6 @@ var _ = Describe("KV cache", Ordered, func() {
Port: 1234,
Model: "model",
KVCacheSize: 4,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
}

Expand Down Expand Up @@ -418,7 +415,6 @@ var _ = Describe("KV cache", Ordered, func() {
Port: 1234,
Model: "model",
KVCacheSize: testCase.cacheSize,
ZMQEndpoint: pubEndpoint,
ZMQMaxConnectAttempts: 3,
}
blockCache, err := newBlockCache(&config, GinkgoLogr)
Expand Down Expand Up @@ -535,8 +531,12 @@ func createSub(config *common.Configuration) (*zmq.Socket, string) {
Expect(err).NotTo(HaveOccurred())
sub, err := zctx.NewSocket(zmq.SUB)
Expect(err).NotTo(HaveOccurred())
err = sub.Bind(subEndpoint)
err = sub.Bind(endpoint)
Expect(err).NotTo(HaveOccurred())
// get the actual port
endpoint, err := sub.GetLastEndpoint()
Expect(err).NotTo(HaveOccurred())
config.ZMQEndpoint = endpoint
topic := createTopic(config)
err = sub.SetSubscribe(topic)
Expect(err).NotTo(HaveOccurred())
Expand Down