Skip to content

Commit d5360e3

Browse files
Googlermarcushines
authored andcommitted
Project import generated by Copybara.
FolderOrigin-RevId: /usr/local/google/home/hines/copybara/temp/folder-destination3580118442091374722/.
1 parent c1b4525 commit d5360e3

File tree

5 files changed

+200
-91
lines changed

5 files changed

+200
-91
lines changed

manager/manager.go

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ type Config struct {
6969
Sync func(string)
7070
// Timeout defines the optional duration to wait for a gRPC dial.
7171
Timeout time.Duration
72+
// ReceiveTimeout defines the optional duration to wait for receiving from
73+
// a target. 0 means timeout is disabled.
74+
ReceiveTimeout time.Duration
7275
// Update will be invoked in response to gNMI updates.
7376
Update func(string, *gpb.Notification)
7477
// ConnectionManager is used to create gRPC connections.
@@ -99,6 +102,7 @@ type Manager struct {
99102
sync func(string)
100103
testSync func() // exposed for test synchronization
101104
timeout time.Duration
105+
receiveTimeout time.Duration
102106
update func(string, *gpb.Notification)
103107

104108
mu sync.Mutex
@@ -134,6 +138,7 @@ func NewManager(cfg Config) (*Manager, error) {
134138
targets: make(map[string]*target),
135139
testSync: func() {},
136140
timeout: cfg.Timeout,
141+
receiveTimeout: cfg.ReceiveTimeout,
137142
update: cfg.Update,
138143
}, nil
139144
}
@@ -191,11 +196,30 @@ func (m *Manager) createConn(ctx context.Context, name string, t *tpb.Target) (c
191196
}
192197
}
193198

194-
func (m *Manager) handleUpdates(name string, sc gpb.GNMI_SubscribeClient) error {
199+
func (m *Manager) handleUpdates(ctx context.Context, name string, sc gpb.GNMI_SubscribeClient) error {
195200
defer m.testSync()
196201
connected := false
202+
var recvTimer *time.Timer
203+
if m.receiveTimeout.Nanoseconds() > 0 {
204+
recvTimer = time.NewTimer(m.receiveTimeout)
205+
recvTimer.Stop()
206+
go func() {
207+
select {
208+
case <-ctx.Done():
209+
case <-recvTimer.C:
210+
log.Errorf("Timed out waiting to receive from %q after %v", name, m.receiveTimeout)
211+
m.Reconnect(name)
212+
}
213+
}()
214+
}
197215
for {
216+
if recvTimer != nil {
217+
recvTimer.Reset(m.receiveTimeout)
218+
}
198219
resp, err := sc.Recv()
220+
if recvTimer != nil {
221+
recvTimer.Stop()
222+
}
199223
if err != nil {
200224
if m.reset != nil {
201225
m.reset(name)
@@ -238,7 +262,7 @@ func (m *Manager) subscribe(ctx context.Context, name string, conn *grpc.ClientC
238262
if err := sc.Send(cr); err != nil {
239263
return fmt.Errorf("error sending subscription request to target %q: %v", name, err)
240264
}
241-
if err = m.handleUpdates(name, sc); err != nil {
265+
if err = m.handleUpdates(ctx, name, sc); err != nil {
242266
return fmt.Errorf("stream failed for target %q: %v", name, err)
243267
}
244268
return nil

manager/manager_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,87 @@ func TestRetrySubscribe(t *testing.T) {
554554
r.assertLast(t, "no recovery after remove", nil, nil, nil, 0)
555555
}
556556

557+
func TestReceiveTimeout(t *testing.T) {
558+
f := newFakeConnection(t)
559+
addr, stop := newDevice(t, &fakeServer{})
560+
defer stop()
561+
562+
origSubscribeClient := subscribeClient
563+
defer func() { subscribeClient = origSubscribeClient }()
564+
fc := newFakeSubscribeClient()
565+
subscribeClient = func(ctx context.Context, conn *grpc.ClientConn) (gpb.GNMI_SubscribeClient, error) {
566+
go func() {
567+
select {
568+
case <-ctx.Done():
569+
fc.sendRecvErr() // Simulate stream closure.
570+
}
571+
}()
572+
return fc, nil
573+
}
574+
575+
r := record{}
576+
m, err := NewManager(Config{
577+
Credentials: &fakeCreds{},
578+
Timeout: time.Minute,
579+
ReceiveTimeout: 2 * time.Second,
580+
Connect: func(s string) {
581+
r.connects = append(r.connects, s)
582+
},
583+
ConnectionManager: f,
584+
Sync: func(s string) {
585+
r.syncs = append(r.syncs, s)
586+
},
587+
Reset: func(s string) {
588+
r.resets = append(r.resets, s)
589+
},
590+
Update: func(_ string, g *gpb.Notification) {
591+
r.updates = append(r.updates, g)
592+
},
593+
})
594+
if err != nil {
595+
t.Fatal("could not initialize Manager")
596+
}
597+
handleUpdateDoneOnce := make(chan struct{})
598+
handleUpdateCalled := make(chan struct{}, 1)
599+
m.testSync = func() {
600+
select {
601+
case <-handleUpdateDoneOnce:
602+
default:
603+
close(handleUpdateDoneOnce)
604+
}
605+
handleUpdateCalled <- struct{}{}
606+
}
607+
608+
name := "device1"
609+
err = m.Add(name, &tpb.Target{Addresses: []string{addr}}, validSubscribeRequest)
610+
if err != nil {
611+
t.Fatalf("got error adding: %v, want no error", err)
612+
}
613+
defer m.Remove(name)
614+
615+
_, ok := m.targets[name]
616+
if !ok {
617+
t.Fatalf("missing internal target")
618+
}
619+
620+
<-handleUpdateDoneOnce // receive has timed out
621+
<-handleUpdateCalled
622+
r.assertLast(t, "after receive timeout", nil, nil, []string{name}, 0) // reset once
623+
624+
// Verify manager will try reconnecting to target
625+
fc.sendSync()
626+
<-handleUpdateCalled
627+
r.assertLast(t, "sync after reconnect", []string{name}, []string{name}, nil, 0)
628+
629+
updateCount := 10
630+
for i := 0; i < updateCount; i++ {
631+
fc.sendUpdate()
632+
<-handleUpdateCalled
633+
}
634+
r.assertLast(t, "updates sent after reconnect", nil, nil, nil, updateCount)
635+
assertTargets(t, m, 1)
636+
}
637+
557638
func TestRemoveDuringBackoff(t *testing.T) {
558639
origBaseDelay, origMaxDelay := RetryBaseDelay, RetryMaxDelay
559640
defer func() { RetryBaseDelay, RetryMaxDelay = origBaseDelay, origMaxDelay }()

proto/gnmi/gnmi.pb.go

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)