11// Copyright (c) Zefchain Labs, Inc.
22// SPDX-License-Identifier: Apache-2.0
33
4- use std:: { fmt, future:: Future , iter} ;
4+ use std:: { fmt, future:: Future , iter, sync :: Arc } ;
55
66use futures:: { future, stream, StreamExt } ;
77use linera_base:: {
@@ -29,6 +29,7 @@ use tracing::{debug, info, instrument, warn, Level};
2929
3030use super :: {
3131 api:: { self , validator_node_client:: ValidatorNodeClient , SubscriptionRequest } ,
32+ pool:: GrpcConnectionPool ,
3233 transport, GRPC_MAX_MESSAGE_SIZE ,
3334} ;
3435use crate :: {
@@ -39,57 +40,74 @@ use crate::{
3940#[ derive( Clone ) ]
4041pub struct GrpcClient {
4142 address : String ,
42- client : ValidatorNodeClient < transport :: Channel > ,
43+ pool : Arc < GrpcConnectionPool > ,
4344 retry_delay : Duration ,
4445 max_retries : u32 ,
4546}
4647
4748impl GrpcClient {
4849 pub fn new (
4950 address : String ,
50- channel : transport :: Channel ,
51+ pool : Arc < GrpcConnectionPool > ,
5152 retry_delay : Duration ,
5253 max_retries : u32 ,
53- ) -> Self {
54- let client = ValidatorNodeClient :: new ( channel)
55- . max_encoding_message_size ( GRPC_MAX_MESSAGE_SIZE )
56- . max_decoding_message_size ( GRPC_MAX_MESSAGE_SIZE ) ;
57- Self {
54+ ) -> Result < Self , super :: GrpcError > {
55+ // Just verify we can get a channel to this address
56+ let _ = pool. channel ( address. clone ( ) ) ?;
57+ Ok ( Self {
5858 address,
59- client ,
59+ pool ,
6060 retry_delay,
6161 max_retries,
62- }
62+ } )
6363 }
6464
6565 pub fn address ( & self ) -> & str {
6666 & self . address
6767 }
6868
69+ fn make_client ( & self ) -> Result < ValidatorNodeClient < transport:: Channel > , super :: GrpcError > {
70+ let channel = self . pool . channel ( self . address . clone ( ) ) ?;
71+ Ok ( ValidatorNodeClient :: new ( channel)
72+ . max_encoding_message_size ( GRPC_MAX_MESSAGE_SIZE )
73+ . max_decoding_message_size ( GRPC_MAX_MESSAGE_SIZE ) )
74+ }
75+
6976 /// Returns whether this gRPC status means the server stream should be reconnected to, or not.
7077 /// Logs a warning on unexpected status codes.
71- fn is_retryable ( status : & Status ) -> bool {
78+ fn is_retryable_needs_reconnect ( status : & Status ) -> ( bool , bool ) {
7279 match status. code ( ) {
7380 Code :: DeadlineExceeded | Code :: Aborted | Code :: Unavailable | Code :: Unknown => {
7481 info ! ( "gRPC request interrupted: {}; retrying" , status) ;
75- true
82+ ( true , false )
7683 }
7784 Code :: Ok | Code :: Cancelled | Code :: ResourceExhausted => {
7885 info ! ( "Unexpected gRPC status: {}; retrying" , status) ;
79- true
86+ ( true , false )
87+ }
88+ Code :: NotFound => ( false , false ) , // This code is used if e.g. the validator is missing blobs.
89+ Code :: Internal => {
90+ let error_string = status. to_string ( ) ;
91+ if error_string. contains ( "GoAway" ) && error_string. contains ( "max_age" ) {
92+ info ! (
93+ "gRPC connection hit max_age and got a GoAway: {}; reconnecting then retrying" ,
94+ status
95+ ) ;
96+ return ( true , true ) ;
97+ }
98+ info ! ( "Unexpected gRPC status: {}" , status) ;
99+ ( false , false )
80100 }
81- Code :: NotFound => false , // This code is used if e.g. the validator is missing blobs.
82101 Code :: InvalidArgument
83102 | Code :: AlreadyExists
84103 | Code :: PermissionDenied
85104 | Code :: FailedPrecondition
86105 | Code :: OutOfRange
87106 | Code :: Unimplemented
88- | Code :: Internal
89107 | Code :: DataLoss
90108 | Code :: Unauthenticated => {
91109 info ! ( "Unexpected gRPC status: {}" , status) ;
92- false
110+ ( false , false )
93111 }
94112 }
95113 }
@@ -109,15 +127,36 @@ impl GrpcClient {
109127 let request_inner = request. try_into ( ) . map_err ( |_| NodeError :: GrpcError {
110128 error : "could not convert request to proto" . to_string ( ) ,
111129 } ) ?;
130+
131+ let mut reconnected = false ;
112132 loop {
113- match f ( self . client . clone ( ) , Request :: new ( request_inner. clone ( ) ) ) . await {
114- Err ( s) if Self :: is_retryable ( & s) && retry_count < self . max_retries => {
115- let delay = self . retry_delay . saturating_mul ( retry_count) ;
116- retry_count += 1 ;
117- linera_base:: time:: timer:: sleep ( delay) . await ;
118- continue ;
133+ // Create client on-demand for each attempt
134+ let client = match self . make_client ( ) {
135+ Ok ( client) => client,
136+ Err ( e) => {
137+ return Err ( NodeError :: GrpcError {
138+ error : format ! ( "Failed to create client: {}" , e) ,
139+ } ) ;
119140 }
141+ } ;
142+
143+ match f ( client, Request :: new ( request_inner. clone ( ) ) ) . await {
120144 Err ( s) => {
145+ let ( is_retryable, needs_reconnect) = Self :: is_retryable_needs_reconnect ( & s) ;
146+ if is_retryable && retry_count < self . max_retries {
147+ // If this error indicates we need a connection refresh and we haven't already tried, do it
148+ if needs_reconnect && !reconnected {
149+ info ! ( "Connection error detected, invalidating channel: {}" , s) ;
150+ self . pool . invalidate_channel ( & self . address ) ;
151+ reconnected = true ;
152+ }
153+
154+ let delay = self . retry_delay . saturating_mul ( retry_count) ;
155+ retry_count += 1 ;
156+ linera_base:: time:: timer:: sleep ( delay) . await ;
157+ continue ;
158+ }
159+
121160 return Err ( NodeError :: GrpcError {
122161 error : format ! ( "remote request [{handler}] failed with status: {s:?}" ) ,
123162 } ) ;
@@ -270,32 +309,56 @@ impl ValidatorNode for GrpcClient {
270309 let subscription_request = SubscriptionRequest {
271310 chain_ids : chains. into_iter ( ) . map ( |chain| chain. into ( ) ) . collect ( ) ,
272311 } ;
273- let mut client = self . client . clone ( ) ;
312+ let pool = self . pool . clone ( ) ;
313+ let address = self . address . clone ( ) ;
274314
275315 // Make the first connection attempt before returning from this method.
276- let mut stream = Some (
316+ let mut stream = Some ( {
317+ let mut client = self
318+ . make_client ( )
319+ . map_err ( |e| NodeError :: SubscriptionFailed {
320+ status : format ! ( "Failed to create client: {}" , e) ,
321+ } ) ?;
277322 client
278323 . subscribe ( subscription_request. clone ( ) )
279324 . await
280325 . map_err ( |status| NodeError :: SubscriptionFailed {
281326 status : status. to_string ( ) ,
282327 } ) ?
283- . into_inner ( ) ,
284- ) ;
328+ . into_inner ( )
329+ } ) ;
285330
286331 // A stream of `Result<grpc::Notification, tonic::Status>` that keeps calling
287332 // `client.subscribe(request)` endlessly and without delay.
288333 let endlessly_retrying_notification_stream = stream:: unfold ( ( ) , move |( ) | {
289- let mut client = client. clone ( ) ;
334+ let pool = pool. clone ( ) ;
335+ let address = address. clone ( ) ;
290336 let subscription_request = subscription_request. clone ( ) ;
291337 let mut stream = stream. take ( ) ;
292338 async move {
293339 let stream = if let Some ( stream) = stream. take ( ) {
294340 future:: Either :: Right ( stream)
295341 } else {
296- match client. subscribe ( subscription_request. clone ( ) ) . await {
297- Err ( err) => future:: Either :: Left ( stream:: iter ( iter:: once ( Err ( err) ) ) ) ,
298- Ok ( response) => future:: Either :: Right ( response. into_inner ( ) ) ,
342+ // Create a new client for each reconnection attempt
343+ match pool. channel ( address. clone ( ) ) {
344+ Ok ( channel) => {
345+ let mut client = ValidatorNodeClient :: new ( channel)
346+ . max_encoding_message_size ( GRPC_MAX_MESSAGE_SIZE )
347+ . max_decoding_message_size ( GRPC_MAX_MESSAGE_SIZE ) ;
348+ match client. subscribe ( subscription_request. clone ( ) ) . await {
349+ Err ( err) => {
350+ future:: Either :: Left ( stream:: iter ( iter:: once ( Err ( err) ) ) )
351+ }
352+ Ok ( response) => future:: Either :: Right ( response. into_inner ( ) ) ,
353+ }
354+ }
355+ Err ( e) => {
356+ let status = tonic:: Status :: unavailable ( format ! (
357+ "Failed to create channel: {}" ,
358+ e
359+ ) ) ;
360+ future:: Either :: Left ( stream:: iter ( iter:: once ( Err ( status) ) ) )
361+ }
299362 }
300363 } ;
301364 Some ( ( stream, ( ) ) )
@@ -319,7 +382,9 @@ impl ValidatorNode for GrpcClient {
319382 return future:: Either :: Left ( future:: ready ( true ) ) ;
320383 } ;
321384
322- if !span. in_scope ( || Self :: is_retryable ( status) ) || retry_count >= max_retries {
385+ let ( is_retryable, _) =
386+ span. in_scope ( || Self :: is_retryable_needs_reconnect ( status) ) ;
387+ if !is_retryable || retry_count >= max_retries {
323388 return future:: Either :: Left ( future:: ready ( false ) ) ;
324389 }
325390 let delay = retry_delay. saturating_mul ( retry_count) ;
0 commit comments