diff --git a/.gitignore b/.gitignore index ac6e6006..3906cfb9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,4 +4,5 @@ vendor .vscode .devcontainer # MacOSX -.DS_Store \ No newline at end of file +.DS_Store +*.test \ No newline at end of file diff --git a/pkg/common/publisher_test.go b/pkg/common/publisher_test.go index a9d6582b..b2c96603 100644 --- a/pkg/common/publisher_test.go +++ b/pkg/common/publisher_test.go @@ -29,11 +29,10 @@ import ( ) const ( - topic = "test-topic" - subEndpoint = "tcp://*:5557" - pubEndpoint = "tcp://localhost:5557" - data = "Hello" - retries = 0 + wildcardEndpoint = "tcp://*:*" + topic = "test-topic" + data = "Hello" + retries = 0 ) var _ = Describe("Publisher", func() { @@ -42,7 +41,9 @@ var _ = Describe("Publisher", func() { Expect(err).NotTo(HaveOccurred()) sub, err := zctx.NewSocket(zmq.SUB) Expect(err).NotTo(HaveOccurred()) - err = sub.Bind(subEndpoint) + err = sub.Bind(wildcardEndpoint) + Expect(err).NotTo(HaveOccurred()) + endpoint, err := sub.GetLastEndpoint() Expect(err).NotTo(HaveOccurred()) err = sub.SetSubscribe(topic) Expect(err).NotTo(HaveOccurred()) @@ -51,7 +52,7 @@ var _ = Describe("Publisher", func() { time.Sleep(100 * time.Millisecond) - pub, err := NewPublisher(pubEndpoint, retries) + pub, err := NewPublisher(endpoint, retries) Expect(err).NotTo(HaveOccurred()) ctx, cancel := context.WithCancel(context.Background()) @@ -94,24 +95,37 @@ var _ = Describe("Publisher", func() { } }) It("should retry connection successfully", func() { + // Get ephemeral endpoint + sub, err := zmq.NewSocket(zmq.SUB) + Expect(err).NotTo(HaveOccurred()) + err = sub.Bind(wildcardEndpoint) + Expect(err).NotTo(HaveOccurred()) + endpoint, err := sub.GetLastEndpoint() + Expect(err).NotTo(HaveOccurred()) + // Step 1: Try to connect to a temporarily non-existent service // This will trigger the retry mechanism - go func() { + go func(sub *zmq.Socket, endpoint string) { + // Delay releasing the ephemeral addr + time.Sleep(1950 * time.Millisecond) + err := sub.Close() + Expect(err).NotTo(HaveOccurred()) + // Delay starting the server to simulate service recovery time.Sleep(2 * time.Second) // Start subscriber as server - sub, err := zmq.NewSocket(zmq.SUB) + sub, err = zmq.NewSocket(zmq.SUB) Expect(err).NotTo(HaveOccurred()) //nolint defer sub.Close() - err = sub.Bind(subEndpoint) Expect(err).NotTo(HaveOccurred()) - }() - + err = sub.Bind(endpoint) + Expect(err).NotTo(HaveOccurred()) + }(sub, endpoint) // Step 2: Publisher will retry connection and eventually succeed - pub, err := NewPublisher(pubEndpoint, 5) // 5 retries - Expect(err).NotTo(HaveOccurred()) // Should eventually succeed + pub, err := NewPublisher(endpoint, 5) // 5 retries + Expect(err).NotTo(HaveOccurred()) // Should eventually succeed //nolint defer pub.Close() }) diff --git a/pkg/kv-cache/block_cache.go b/pkg/kv-cache/block_cache.go index 56d2253b..9b358c4c 100644 --- a/pkg/kv-cache/block_cache.go +++ b/pkg/kv-cache/block_cache.go @@ -48,9 +48,13 @@ func newBlockCache(config *common.Configuration, logger logr.Logger) (*blockCach // TODO read size of channel from config eChan := make(chan EventData, 10000) - publisher, err := common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts) - if err != nil { - return nil, err + var publisher *common.Publisher + var err error + if config.ZMQEndpoint != "" { + publisher, err = common.NewPublisher(config.ZMQEndpoint, config.ZMQMaxConnectAttempts) + if err != nil { + return nil, err + } } return &blockCache{ diff --git a/pkg/kv-cache/kv_cache_sender.go b/pkg/kv-cache/kv_cache_sender.go index 2b7bee14..1c0c970a 100644 --- a/pkg/kv-cache/kv_cache_sender.go +++ b/pkg/kv-cache/kv_cache_sender.go @@ -88,6 +88,10 @@ func (s *KVEventSender) Run(ctx context.Context) error { return nil } + if s.publisher == nil { + continue + } + // Encode eventData's hash value to msgpack.RawMessage var payload []byte var err error @@ -120,6 +124,9 @@ func (s *KVEventSender) Run(ctx context.Context) error { } case <-timer.C: + if s.publisher == nil { + continue + } if err := s.publishHelper(ctx); err != nil { return err } diff --git a/pkg/kv-cache/kv_cache_test.go b/pkg/kv-cache/kv_cache_test.go index 7731196e..fce1b44e 100644 --- a/pkg/kv-cache/kv_cache_test.go +++ b/pkg/kv-cache/kv_cache_test.go @@ -33,11 +33,10 @@ import ( ) const ( - req1ID = "req1" - req2ID = "req2" - req3ID = "req3" - subEndpoint = "tcp://*:5557" - pubEndpoint = "tcp://localhost:5557" + req1ID = "req1" + req2ID = "req2" + req3ID = "req3" + wildcardEndpoint = "tcp://*:*" ) type ActionType int @@ -204,7 +203,6 @@ var _ = Describe("KV cache", Ordered, func() { Port: 1234, Model: "model", KVCacheSize: test.cacheSize, - ZMQEndpoint: pubEndpoint, ZMQMaxConnectAttempts: 3, EventBatchSize: 1, } @@ -308,7 +306,6 @@ var _ = Describe("KV cache", Ordered, func() { Port: 1234, Model: "model", KVCacheSize: 4, - ZMQEndpoint: pubEndpoint, ZMQMaxConnectAttempts: 3, } @@ -418,7 +415,6 @@ var _ = Describe("KV cache", Ordered, func() { Port: 1234, Model: "model", KVCacheSize: testCase.cacheSize, - ZMQEndpoint: pubEndpoint, ZMQMaxConnectAttempts: 3, } blockCache, err := newBlockCache(&config, GinkgoLogr) @@ -535,8 +531,12 @@ func createSub(config *common.Configuration) (*zmq.Socket, string) { Expect(err).NotTo(HaveOccurred()) sub, err := zctx.NewSocket(zmq.SUB) Expect(err).NotTo(HaveOccurred()) - err = sub.Bind(subEndpoint) + err = sub.Bind(wildcardEndpoint) Expect(err).NotTo(HaveOccurred()) + // get the actual port + endpoint, err := sub.GetLastEndpoint() + Expect(err).NotTo(HaveOccurred()) + config.ZMQEndpoint = endpoint topic := createTopic(config) err = sub.SetSubscribe(topic) Expect(err).NotTo(HaveOccurred())