Skip to content

Commit 1a57f52

Browse files
xutongNVclaude
andcommitted
Migrate from bidi streaming to unary RPC
Add SendListenerMessage unary RPC to ListenerService, enabling NodeListener to use simple request-reply instead of bidirectional streaming. This improves AWS ALB compatibility (no HTTP/2 PING needed), simplifies the code (2 goroutines instead of 3, no unacked message queue), and enables better load balancing. Changes: - Proto: add SendListenerMessage RPC to ListenerService - Server: add SendListenerMessage handler with Redis push and ACK - BaseListener: add RunUnary (2-goroutine lifecycle), SendUnaryMessage, and gRPC built-in retry config for transient failures - NodeListener: switch from Run/SendMessage to RunUnary/SendUnaryMessage - Tests: add coverage for unary handler, base listener RunUnary, and node listener sendMessages paths Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent a9a750d commit 1a57f52

File tree

15 files changed

+912
-311
lines changed

15 files changed

+912
-311
lines changed

src/operator/event_listener.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ import (
3333
pb "go.corp.nvidia.com/osmo/proto/operator"
3434
)
3535

36-
// EventListener manages the bidirectional gRPC stream connection for Kubernetes events
36+
// EventListener manages unary RPC calls for Kubernetes events
3737
type EventListener struct {
3838
*utils.BaseListener
3939
args utils.ListenerArgs
@@ -51,12 +51,12 @@ func NewEventListener(args utils.ListenerArgs, inst *utils.Instruments) *EventLi
5151
return el
5252
}
5353

54-
// Run manages the bidirectional streaming lifecycle
54+
// Run manages the unary RPC lifecycle
5555
func (el *EventListener) Run(ctx context.Context) error {
5656
ch := make(chan *pb.ListenerMessage, el.args.EventChanSize)
5757
return el.BaseListener.Run(
5858
ctx,
59-
"Connected to the service, event listener stream established",
59+
"Connected to operator service, unary event listener established",
6060
ch,
6161
el.watchEvents,
6262
el.sendMessages,
@@ -70,6 +70,15 @@ func (el *EventListener) sendMessages(
7070
) error {
7171
el.Logf("Starting message sender for event channel")
7272
defer el.Logf("Stopping event message sender")
73+
defer el.DrainMessageChannel(ch)
74+
75+
// Resend any unacked messages from a previous connection before processing new ones
76+
err := el.GetUnackedMessages().ResendAll(
77+
ctx, el.SendMessage)
78+
if err != nil {
79+
el.Logf("Failed to resend unacked messages: %v", err)
80+
return fmt.Errorf("failed to resend unacked messages: %w", err)
81+
}
7382

7483
progressTicker := time.NewTicker(time.Duration(el.args.ProgressFrequencySec) * time.Second)
7584
defer progressTicker.Stop()
@@ -90,7 +99,8 @@ func (el *EventListener) sendMessages(
9099
el.inst.MessageChannelClosedUnexpectedly.Add(ctx, 1, el.MetricAttrs)
91100
return fmt.Errorf("event watcher stopped")
92101
}
93-
if err := el.BaseListener.SendMessage(ctx, msg); err != nil {
102+
if err := el.SendMessage(ctx, msg); err != nil {
103+
el.GetUnackedMessages().AddMessageDropOldest(msg)
94104
return fmt.Errorf("failed to send message: %w", err)
95105
}
96106
}

src/operator/event_listener_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package main
1818

1919
import (
20+
"context"
21+
"fmt"
2022
"testing"
2123
"time"
2224

2325
corev1 "k8s.io/api/core/v1"
2426
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2527

28+
"go.corp.nvidia.com/osmo/operator/utils"
2629
pb "go.corp.nvidia.com/osmo/proto/operator"
2730
)
2831

@@ -227,6 +230,107 @@ func TestEventKeyComposite(t *testing.T) {
227230
}
228231
}
229232

233+
// TestEventListenerDrainPopulatesUnackedQueue verifies that drainMessageChannel
234+
// moves pending channel messages into the unacked queue using drop-oldest semantics.
235+
func TestEventListenerDrainPopulatesUnackedQueue(t *testing.T) {
236+
args := utils.ListenerArgs{
237+
EventChanSize: 10,
238+
ProgressFrequencySec: 60,
239+
}
240+
inst := utils.NewNoopInstruments()
241+
el := NewEventListener(args, inst)
242+
243+
ch := make(chan *pb.ListenerMessage, 5)
244+
// Pre-fill channel with messages
245+
for i := 0; i < 3; i++ {
246+
ch <- &pb.ListenerMessage{Uuid: fmt.Sprintf("drain-%d", i)}
247+
}
248+
249+
el.DrainMessageChannel(ch)
250+
251+
unacked := el.GetUnackedMessages()
252+
if unacked.Qsize() != 3 {
253+
t.Errorf("Qsize() = %d, expected 3 after drain", unacked.Qsize())
254+
}
255+
256+
messages := unacked.ListMessages()
257+
for i, msg := range messages {
258+
expected := fmt.Sprintf("drain-%d", i)
259+
if msg.Uuid != expected {
260+
t.Errorf("messages[%d].Uuid = %s, expected %s", i, msg.Uuid, expected)
261+
}
262+
}
263+
}
264+
265+
// TestEventListenerDrainRespectsCapacity verifies that drainMessageChannel
266+
// uses drop-oldest semantics when the queue is at capacity.
267+
func TestEventListenerDrainRespectsCapacity(t *testing.T) {
268+
args := utils.ListenerArgs{
269+
EventChanSize: 10,
270+
MaxUnackedMessages: 3,
271+
ProgressFrequencySec: 60,
272+
}
273+
inst := utils.NewNoopInstruments()
274+
el := NewEventListener(args, inst)
275+
276+
ch := make(chan *pb.ListenerMessage, 10)
277+
// Pre-fill channel with 5 messages (exceeds capacity of 3)
278+
for i := 0; i < 5; i++ {
279+
ch <- &pb.ListenerMessage{Uuid: fmt.Sprintf("drain-%d", i)}
280+
}
281+
282+
el.DrainMessageChannel(ch)
283+
284+
unacked := el.GetUnackedMessages()
285+
if unacked.Qsize() != 3 {
286+
t.Errorf("Qsize() = %d, expected 3 (capped at capacity)", unacked.Qsize())
287+
}
288+
289+
// Oldest messages should have been evicted, newest 3 remain
290+
messages := unacked.ListMessages()
291+
for i, msg := range messages {
292+
expected := fmt.Sprintf("drain-%d", i+2)
293+
if msg.Uuid != expected {
294+
t.Errorf("messages[%d].Uuid = %s, expected %s", i, msg.Uuid, expected)
295+
}
296+
}
297+
}
298+
299+
// TestEventListenerResendBeforeNewMessages verifies that ResendAll is invoked
300+
// on the unacked queue at the start of sendMessages, before reading from the channel.
301+
func TestEventListenerResendBeforeNewMessages(t *testing.T) {
302+
// This test verifies the ResendAll integration at the UnackMessages level,
303+
// since sendMessages requires a live gRPC connection for SendMessage.
304+
unacked := utils.NewUnackMessages(10)
305+
unacked.AddMessageDropOldest(&pb.ListenerMessage{Uuid: "old-1"})
306+
unacked.AddMessageDropOldest(&pb.ListenerMessage{Uuid: "old-2"})
307+
308+
var sentOrder []string
309+
sendFunc := func(_ context.Context, msg *pb.ListenerMessage) error {
310+
sentOrder = append(sentOrder, msg.Uuid)
311+
return nil
312+
}
313+
314+
ctx := context.Background()
315+
err := unacked.ResendAll(ctx, sendFunc)
316+
if err != nil {
317+
t.Fatalf("ResendAll failed: %v", err)
318+
}
319+
320+
// Verify old messages were sent in order
321+
if len(sentOrder) != 2 {
322+
t.Fatalf("Expected 2 messages sent, got %d", len(sentOrder))
323+
}
324+
if sentOrder[0] != "old-1" || sentOrder[1] != "old-2" {
325+
t.Errorf("Expected send order [old-1, old-2], got %v", sentOrder)
326+
}
327+
328+
// Queue should be empty after successful resend
329+
if unacked.Qsize() != 0 {
330+
t.Errorf("Qsize() = %d, expected 0 after successful resend", unacked.Qsize())
331+
}
332+
}
333+
230334
// TestListenerMessageTypeAssertion verifies the message is correctly typed
231335
func TestListenerMessageTypeAssertion(t *testing.T) {
232336
event := &corev1.Event{

src/operator/node_listener.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,12 @@ func NewNodeListener(
5959
return nl
6060
}
6161

62-
// Run manages the bidirectional streaming lifecycle
62+
// Run manages the unary RPC lifecycle for node events
6363
func (nl *NodeListener) Run(ctx context.Context) error {
6464
ch := make(chan *pb.ListenerMessage, nl.args.NodeUpdateChanSize)
6565
return nl.BaseListener.Run(
6666
ctx,
67-
"Connected to operator service, node stream established",
67+
"Connected to operator service, unary node listener established",
6868
ch,
6969
nl.watchNodes,
7070
nl.sendMessages,
@@ -96,7 +96,7 @@ func (nl *NodeListener) sendMessages(
9696
nl.inst.MessageChannelClosedUnexpectedly.Add(ctx, 1, nl.MetricAttrs)
9797
return fmt.Errorf("node watcher stopped")
9898
}
99-
if err := nl.BaseListener.SendMessage(ctx, msg); err != nil {
99+
if err := nl.SendMessage(ctx, msg); err != nil {
100100
return fmt.Errorf("failed to send node message: %w", err)
101101
}
102102
}

src/operator/node_listener_test.go

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package main
1818

1919
import (
20+
"context"
2021
"testing"
2122
"time"
2223

2324
"go.corp.nvidia.com/osmo/operator/utils"
25+
pb "go.corp.nvidia.com/osmo/proto/operator"
2426
corev1 "k8s.io/api/core/v1"
2527
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2628
)
@@ -93,3 +95,154 @@ func TestNewNodeListener(t *testing.T) {
9395
t.Error("Expected unackedMessages to be initialized")
9496
}
9597
}
98+
99+
func TestNodeListener_SendMessages_ChannelClosed(t *testing.T) {
100+
args := utils.ListenerArgs{
101+
ServiceURL: "http://localhost:8000",
102+
Backend: "test-backend",
103+
Namespace: "osmo",
104+
NodeUpdateChanSize: 100,
105+
MaxUnackedMessages: 100,
106+
NodeConditionPrefix: "osmo.nvidia.com/",
107+
ProgressDir: "/tmp/osmo/operator/",
108+
ProgressFrequencySec: 15,
109+
}
110+
111+
nodeConditionRules := utils.NewNodeConditionRules()
112+
listener := NewNodeListener(args, nodeConditionRules, utils.NewNoopInstruments())
113+
114+
ch := make(chan *pb.ListenerMessage, 10)
115+
close(ch)
116+
117+
ctx := context.Background()
118+
err := listener.sendMessages(ctx, ch)
119+
if err == nil {
120+
t.Fatal("expected error when channel is closed, got nil")
121+
}
122+
expectedMsg := "node watcher stopped"
123+
if err.Error() != expectedMsg {
124+
t.Errorf("expected error %q, got %q", expectedMsg, err.Error())
125+
}
126+
}
127+
128+
func TestNodeListener_SendMessages_ContextCancelled(t *testing.T) {
129+
args := utils.ListenerArgs{
130+
ServiceURL: "http://localhost:8000",
131+
Backend: "test-backend",
132+
Namespace: "osmo",
133+
NodeUpdateChanSize: 100,
134+
MaxUnackedMessages: 100,
135+
NodeConditionPrefix: "osmo.nvidia.com/",
136+
ProgressDir: "/tmp/osmo/operator/",
137+
ProgressFrequencySec: 15,
138+
}
139+
140+
nodeConditionRules := utils.NewNodeConditionRules()
141+
listener := NewNodeListener(args, nodeConditionRules, utils.NewNoopInstruments())
142+
143+
ch := make(chan *pb.ListenerMessage, 10)
144+
ctx, cancel := context.WithCancel(context.Background())
145+
cancel()
146+
147+
err := listener.sendMessages(ctx, ch)
148+
if err != nil {
149+
t.Fatalf("expected nil error on context cancellation, got: %v", err)
150+
}
151+
}
152+
153+
func TestNodeListener_SendMessages_ProgressReport(t *testing.T) {
154+
args := utils.ListenerArgs{
155+
ServiceURL: "http://localhost:8000",
156+
Backend: "test-backend",
157+
Namespace: "osmo",
158+
NodeUpdateChanSize: 100,
159+
MaxUnackedMessages: 100,
160+
NodeConditionPrefix: "osmo.nvidia.com/",
161+
ProgressDir: "/tmp/osmo/operator/",
162+
ProgressFrequencySec: 1,
163+
}
164+
165+
nodeConditionRules := utils.NewNodeConditionRules()
166+
listener := NewNodeListener(args, nodeConditionRules, utils.NewNoopInstruments())
167+
168+
ch := make(chan *pb.ListenerMessage, 10)
169+
170+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
171+
defer cancel()
172+
173+
errChan := make(chan error, 1)
174+
go func() {
175+
errChan <- listener.sendMessages(ctx, ch)
176+
}()
177+
178+
select {
179+
case err := <-errChan:
180+
if err != nil {
181+
t.Fatalf("expected nil error, got: %v", err)
182+
}
183+
case <-time.After(3 * time.Second):
184+
t.Fatal("test timed out")
185+
}
186+
}
187+
188+
func TestNodeListener_BuildResourceMessage(t *testing.T) {
189+
args := utils.ListenerArgs{
190+
ServiceURL: "http://localhost:8000",
191+
Backend: "test-backend",
192+
Namespace: "osmo",
193+
NodeUpdateChanSize: 100,
194+
MaxUnackedMessages: 100,
195+
NodeConditionPrefix: "osmo.nvidia.com/",
196+
ProgressDir: "/tmp/osmo/operator/",
197+
ProgressFrequencySec: 15,
198+
}
199+
200+
nodeConditionRules := utils.NewNodeConditionRules()
201+
listener := NewNodeListener(args, nodeConditionRules, utils.NewNoopInstruments())
202+
203+
tracker := utils.NewNodeStateTracker(1 * time.Minute)
204+
node := &corev1.Node{
205+
ObjectMeta: metav1.ObjectMeta{
206+
Labels: map[string]string{
207+
"kubernetes.io/hostname": "test-node",
208+
},
209+
},
210+
Status: corev1.NodeStatus{
211+
Conditions: []corev1.NodeCondition{
212+
{
213+
Type: corev1.NodeReady,
214+
Status: corev1.ConditionTrue,
215+
},
216+
},
217+
},
218+
}
219+
220+
msg := listener.buildResourceMessage(node, tracker, false, nil)
221+
if msg == nil {
222+
t.Fatal("expected non-nil message for new node")
223+
}
224+
225+
updateNode := msg.GetUpdateNode()
226+
if updateNode == nil {
227+
t.Fatal("expected UpdateNode body")
228+
}
229+
if updateNode.Hostname != "test-node" {
230+
t.Errorf("expected hostname test-node, got %s", updateNode.Hostname)
231+
}
232+
233+
// Second call should return nil (no change)
234+
msg2 := listener.buildResourceMessage(node, tracker, false, nil)
235+
if msg2 != nil {
236+
t.Error("expected nil message for unchanged node")
237+
}
238+
239+
// Delete should always return a message
240+
msg3 := listener.buildResourceMessage(node, tracker, true, nil)
241+
if msg3 == nil {
242+
t.Fatal("expected non-nil message for delete event")
243+
}
244+
if !msg3.GetUpdateNode().Delete {
245+
t.Error("expected Delete to be true")
246+
}
247+
}
248+

src/operator/node_usage_listener.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ import (
3232
pb "go.corp.nvidia.com/osmo/proto/operator"
3333
)
3434

35-
// NodeUsageListener manages the bidirectional gRPC stream for pod resource usage
35+
// NodeUsageListener manages unary RPC calls for pod resource usage
3636
type NodeUsageListener struct {
3737
*utils.BaseListener
3838
args utils.ListenerArgs
@@ -52,12 +52,12 @@ func NewNodeUsageListener(args utils.ListenerArgs, inst *utils.Instruments) *Nod
5252
return nul
5353
}
5454

55-
// Run manages the bidirectional streaming lifecycle
55+
// Run manages the unary RPC lifecycle
5656
func (nul *NodeUsageListener) Run(ctx context.Context) error {
5757
ch := make(chan *pb.ListenerMessage, nul.args.UsageChanSize)
5858
return nul.BaseListener.Run(
5959
ctx,
60-
"Connected to the service, node usage listener stream established",
60+
"Connected to operator service, unary node usage listener established",
6161
ch,
6262
nul.watchPods,
6363
nul.sendMessages,
@@ -88,7 +88,7 @@ func (nul *NodeUsageListener) sendMessages(
8888
nul.inst.MessageChannelClosedUnexpectedly.Add(ctx, 1, nul.MetricAttrs)
8989
return fmt.Errorf("usage watcher stopped")
9090
}
91-
if err := nul.BaseListener.SendMessage(ctx, msg); err != nil {
91+
if err := nul.SendMessage(ctx, msg); err != nil {
9292
return fmt.Errorf("failed to send UpdateNodeUsageBody message: %w", err)
9393
}
9494
}

0 commit comments

Comments
 (0)