@@ -28,6 +28,8 @@ pub struct ZMQNetworker {
2828}
2929
3030const POOL_CON_ACTIVE_TO : i64 = 5 ;
31+ const POOL_ACK_TIMEOUT : i64 = 10 ;
32+ const POOL_REPLY_TIMEOUT : i64 = 50 ;
3133const MAX_REQ_PER_POOL_CON : usize = 5 ;
3234
3335impl Networker for ZMQNetworker {
@@ -214,11 +216,12 @@ impl PoolConnection {
214216
215217 fn get_timeout ( & self ) -> ( ( String , String ) , i64 ) {
216218 if let Some ( ( & ( ref req_id, ref node_alias) , timeout) ) = self . timeouts . borrow ( ) . iter ( )
217- . map ( |( key, value) | ( key, ( Duration :: seconds ( 10 ) - ( time:: now ( ) - * value ) ) . num_milliseconds ( ) ) )
219+ . map ( |( key, value) | ( key, ( * value - time:: now ( ) ) . num_milliseconds ( ) ) )
218220 . min_by ( |& ( _, ref val1) , & ( _, ref val2) | val1. cmp ( & val2) ) {
219221 ( ( req_id. to_string ( ) , node_alias. to_string ( ) ) , timeout)
220222 } else {
221- ( ( "" . to_string ( ) , "" . to_string ( ) ) , POOL_CON_ACTIVE_TO * 1000 )
223+ let time_from_start: Duration = time:: now ( ) - self . time_created ;
224+ ( ( "" . to_string ( ) , "" . to_string ( ) ) , POOL_CON_ACTIVE_TO * 1000 - time_from_start. num_milliseconds ( ) )
222225 }
223226 }
224227
@@ -262,7 +265,9 @@ impl PoolConnection {
262265
263266 fn extend_timeout ( & self , req_id : & str , node_alias : & str ) {
264267 if let Some ( timeout) = self . timeouts . borrow_mut ( ) . get_mut ( & ( req_id. to_string ( ) , node_alias. to_string ( ) ) ) {
265- * timeout = time:: now ( ) ;
268+ * timeout = time:: now ( ) + Duration :: seconds ( POOL_REPLY_TIMEOUT ) ;
269+ } else {
270+ debug ! ( "late REQACK for req_id {}, node {}" , req_id, node_alias) ;
266271 }
267272 }
268273
@@ -293,7 +298,7 @@ impl PoolConnection {
293298 let s = self . _get_socket ( idx) ?;
294299 s. send_str ( & req, zmq:: DONTWAIT ) ?;
295300 }
296- self . timeouts . borrow_mut ( ) . insert ( ( req_id, self . nodes [ idx] . name . clone ( ) ) , time:: now ( ) ) ;
301+ self . timeouts . borrow_mut ( ) . insert ( ( req_id, self . nodes [ idx] . name . clone ( ) ) , time:: now ( ) + Duration :: seconds ( POOL_ACK_TIMEOUT ) ) ;
297302 trace ! ( "_send_msg_to_one_node <<" ) ;
298303 Ok ( ( ) )
299304 }
@@ -679,14 +684,18 @@ pub mod networker_tests {
679684
680685 let mut conn = PoolConnection :: new ( vec ! [ rn] ) ;
681686
682- let timeout = conn. get_timeout ( ) ;
683- assert_eq ! ( ( ( "" . to_string( ) , "" . to_string( ) ) , POOL_CON_ACTIVE_TO * 1000 ) , timeout) ;
687+ let ( ( req_id, node_alias) , timeout) = conn. get_timeout ( ) ;
688+ assert_eq ! ( req_id, "" . to_string( ) ) ;
689+ assert_eq ! ( node_alias, "" . to_string( ) ) ;
690+ assert ! ( POOL_CON_ACTIVE_TO * 1000 - 10 <= timeout) ;
691+ assert ! ( POOL_CON_ACTIVE_TO * 1000 >= timeout) ;
684692
685693 conn. send_request ( Some ( NetworkerEvent :: SendOneRequest ( MESSAGE . to_string ( ) , REQ_ID . to_string ( ) ) ) ) . unwrap ( ) ;
686694
687695 let ( id, timeout) = conn. get_timeout ( ) ;
688696 assert_eq ! ( ( REQ_ID . to_string( ) , NODE_NAME . to_string( ) ) , id) ;
689- assert_ne ! ( POOL_CON_ACTIVE_TO * 1000 , timeout)
697+ assert ! ( POOL_ACK_TIMEOUT * 1000 - 10 <= timeout) ;
698+ assert ! ( POOL_ACK_TIMEOUT * 1000 >= timeout) ;
690699 }
691700
692701 #[ test]
0 commit comments