@@ -35,8 +35,11 @@ import (
3535 tpb "github.com/openconfig/gnmi/proto/target"
3636)
3737
38- // AddrSeparator delimits the chain of addresses used to connect to a target.
39- const AddrSeparator = ";"
38+ const (
39+ // AddrSeparator delimits the chain of addresses used to connect to a target.
40+ AddrSeparator = ";"
41+ metaReceiveTimeout = "receive_timeout"
42+ )
4043
4144var (
4245 // ErrPending indicates a pending subscription attempt for the target exists.
@@ -81,13 +84,14 @@ type Config struct {
8184}
8285
8386type target struct {
84- name string
85- t * tpb.Target
86- sr * gpb.SubscribeRequest
87- cancel func ()
88- finished chan struct {}
89- mu sync.Mutex
90- reconnect func ()
87+ name string
88+ t * tpb.Target
89+ sr * gpb.SubscribeRequest
90+ cancel func ()
91+ finished chan struct {}
92+ mu sync.Mutex
93+ reconnect func ()
94+ receiveTimeout time.Duration
9195}
9296
9397// Manager provides functionality for making gNMI subscriptions to targets and
@@ -167,74 +171,78 @@ func (m *Manager) handleGNMIUpdate(name string, resp *gpb.SubscribeResponse) err
167171 return nil
168172}
169173
170- func addrChains (addrs []string ) [][] string {
171- ac := make ([][] string , len ( addrs ))
172- for idx , addrLine := range addrs {
173- ac [ idx ] = strings .Split (addrLine , AddrSeparator )
174+ func uniqueNextHops (addrs []string ) map [ string ] struct {} {
175+ nhs := map [ string ] struct {}{}
176+ for _ , addrLine := range addrs {
177+ nhs [ strings .Split (addrLine , AddrSeparator )[ 0 ]] = struct {}{}
174178 }
175- return ac
179+ return nhs
176180}
177181
178182func (m * Manager ) createConn (ctx context.Context , name string , t * tpb.Target ) (conn * grpc.ClientConn , done func (), err error ) {
179- nhs := addrChains (t .GetAddresses ())
183+ nhs := uniqueNextHops (t .GetAddresses ())
180184 if len (nhs ) == 0 {
181185 return nil , func () {}, errors .New ("target has no addresses for next hop connection" )
182186 }
183- // A single next-hop dial is assumed.
184- nh := nhs [0 ][0 ]
185- select {
186- case <- ctx .Done ():
187- return nil , func () {}, ctx .Err ()
188- default :
189- connCtx := ctx
190- if m .timeout > 0 {
191- c , cancel := context .WithTimeout (ctx , m .timeout )
192- connCtx = c
193- defer cancel ()
187+ for nh := range nhs {
188+ select {
189+ case <- ctx .Done ():
190+ return nil , func () {}, ctx .Err ()
191+ default :
192+ connCtx := ctx
193+ if m .timeout > 0 {
194+ c , cancel := context .WithTimeout (ctx , m .timeout )
195+ connCtx = c
196+ defer cancel ()
197+ }
198+ conn , done , err = m .connectionManager .Connection (connCtx , nh , t .GetDialer ())
199+ if err == nil {
200+ return
201+ }
194202 }
195- return m .connectionManager .Connection (connCtx , nh , t .GetDialer ())
196203 }
204+ return
197205}
198206
199- func (m * Manager ) handleUpdates (ctx context.Context , name string , sc gpb.GNMI_SubscribeClient ) error {
207+ func (m * Manager ) handleUpdates (ctx context.Context , ta * target , sc gpb.GNMI_SubscribeClient ) error {
200208 defer m .testSync ()
201209 connected := false
202210 var recvTimer * time.Timer
203- if m .receiveTimeout .Nanoseconds () > 0 {
204- recvTimer = time .NewTimer (m .receiveTimeout )
211+ if ta .receiveTimeout .Nanoseconds () > 0 {
212+ recvTimer = time .NewTimer (ta .receiveTimeout )
205213 recvTimer .Stop ()
206214 go func () {
207215 select {
208216 case <- ctx .Done ():
209217 case <- recvTimer .C :
210- log .Errorf ("Timed out waiting to receive from %q after %v" , name , m .receiveTimeout )
211- m .Reconnect (name )
218+ log .Errorf ("Timed out waiting to receive from %q after %v" , ta . name , ta .receiveTimeout )
219+ m .Reconnect (ta . name )
212220 }
213221 }()
214222 }
215223 for {
216224 if recvTimer != nil {
217- recvTimer .Reset (m .receiveTimeout )
225+ recvTimer .Reset (ta .receiveTimeout )
218226 }
219227 resp , err := sc .Recv ()
220228 if recvTimer != nil {
221229 recvTimer .Stop ()
222230 }
223231 if err != nil {
224232 if m .reset != nil {
225- m .reset (name )
233+ m .reset (ta . name )
226234 }
227235 return err
228236 }
229237 if ! connected {
230238 if m .connect != nil {
231- m .connect (name )
239+ m .connect (ta . name )
232240 }
233241 connected = true
234- log .Infof ("Target %q successfully subscribed" , name )
242+ log .Infof ("Target %q successfully subscribed" , ta . name )
235243 }
236- if err := m .handleGNMIUpdate (name , resp ); err != nil {
237- log .Errorf ("Error processing request %v for target %q: %v" , resp , name , err )
244+ if err := m .handleGNMIUpdate (ta . name , resp ); err != nil {
245+ log .Errorf ("Error processing request %v for target %q: %v" , resp , ta . name , err )
238246 }
239247 m .testSync ()
240248 }
@@ -245,25 +253,25 @@ var subscribeClient = func(ctx context.Context, conn *grpc.ClientConn) (gpb.GNMI
245253 return gpb .NewGNMIClient (conn ).Subscribe (ctx )
246254}
247255
248- func (m * Manager ) subscribe (ctx context.Context , name string , conn * grpc.ClientConn , sr * gpb. SubscribeRequest ) error {
256+ func (m * Manager ) subscribe (ctx context.Context , ta * target , conn * grpc.ClientConn ) error {
249257 select {
250258 case <- ctx .Done ():
251259 return ctx .Err ()
252260 default :
253261 }
254262
255- log .Infof ("Attempting to open stream to target %q" , name )
263+ log .Infof ("Attempting to open stream to target %q" , ta . name )
256264 sc , err := subscribeClient (ctx , conn )
257265 if err != nil {
258- return fmt .Errorf ("error opening stream to target %q: %v" , name , err )
266+ return fmt .Errorf ("error opening stream to target %q: %v" , ta . name , err )
259267 }
260- cr := customizeRequest (name , sr )
261- log .V (2 ).Infof ("Sending subscription request to target %q: %v" , name , cr )
268+ cr := customizeRequest (ta . name , ta . sr )
269+ log .V (2 ).Infof ("Sending subscription request to target %q: %v" , ta . name , cr )
262270 if err := sc .Send (cr ); err != nil {
263- return fmt .Errorf ("error sending subscription request to target %q: %v" , name , err )
271+ return fmt .Errorf ("error sending subscription request to target %q: %v" , ta . name , err )
264272 }
265- if err = m .handleUpdates (ctx , name , sc ); err != nil {
266- return fmt .Errorf ("stream failed for target %q: %v" , name , err )
273+ if err = m .handleUpdates (ctx , ta , sc ); err != nil {
274+ return fmt .Errorf ("stream failed for target %q: %v" , ta . name , err )
267275 }
268276 return nil
269277}
@@ -337,8 +345,18 @@ func (m *Manager) monitor(ctx context.Context, ta *target) (err error) {
337345 return
338346 }
339347 defer done ()
340- return m .subscribe (sCtx , ta .name , conn , ta .sr )
348+ return m .subscribe (sCtx , ta , conn )
349+ }
341350
351+ func (m * Manager ) targetRecvTimeout (name string , t * tpb.Target ) time.Duration {
352+ if timeout := t .GetMeta ()[metaReceiveTimeout ]; timeout != "" {
353+ recvTimeout , err := time .ParseDuration (timeout )
354+ if err == nil {
355+ return recvTimeout
356+ }
357+ log .Warningf ("Wrong receive_timeout %q specified for %q: %v" , timeout , name , err )
358+ }
359+ return m .receiveTimeout
342360}
343361
344362// Add adds the target to Manager and starts a streaming subscription that
@@ -363,13 +381,15 @@ func (m *Manager) Add(name string, t *tpb.Target, sr *gpb.SubscribeRequest) erro
363381 if len (t .GetAddresses ()) == 0 {
364382 return fmt .Errorf ("no addresses for target %q" , name )
365383 }
384+
366385 ctx , cancel := context .WithCancel (context .Background ())
367386 ta := & target {
368- name : name ,
369- t : t ,
370- sr : sr ,
371- cancel : cancel ,
372- finished : make (chan struct {}),
387+ name : name ,
388+ t : t ,
389+ sr : sr ,
390+ cancel : cancel ,
391+ finished : make (chan struct {}),
392+ receiveTimeout : m .targetRecvTimeout (name , t ),
373393 }
374394 m .targets [name ] = ta
375395 go m .retryMonitor (ctx , ta )
0 commit comments