@@ -28,6 +28,8 @@ pub struct ZMQNetworker {
2828}
2929
3030const POOL_CON_ACTIVE_TO : i64 = 5 ;
31+ const POOL_ACK_TIMEOUT : i64 = 10 ;
32+ const POOL_REPLY_TIMEOUT : i64 = 50 ;
3133const MAX_REQ_PER_POOL_CON : usize = 5 ;
3234
3335impl Networker for ZMQNetworker {
@@ -214,11 +216,11 @@ impl PoolConnection {
214216
215217 fn get_timeout ( & self ) -> ( ( String , String ) , i64 ) {
216218 if let Some ( ( & ( ref req_id, ref node_alias) , timeout) ) = self . timeouts . borrow ( ) . iter ( )
217- . map ( |( key, value) | ( key, ( Duration :: seconds ( 10 ) - ( time:: now ( ) - * value ) ) . num_milliseconds ( ) ) )
219+ . map ( |( key, value) | ( key, ( * value - time:: now ( ) ) . num_milliseconds ( ) ) )
218220 . min_by ( |& ( _, ref val1) , & ( _, ref val2) | val1. cmp ( & val2) ) {
219221 ( ( req_id. to_string ( ) , node_alias. to_string ( ) ) , timeout)
220222 } else {
221- ( ( "" . to_string ( ) , "" . to_string ( ) ) , POOL_CON_ACTIVE_TO * 1000 )
223+ ( ( "" . to_string ( ) , "" . to_string ( ) ) , POOL_ACK_TIMEOUT * 1000 )
222224 }
223225 }
224226
@@ -262,7 +264,9 @@ impl PoolConnection {
262264
263265 fn extend_timeout ( & self , req_id : & str , node_alias : & str ) {
264266 if let Some ( timeout) = self . timeouts . borrow_mut ( ) . get_mut ( & ( req_id. to_string ( ) , node_alias. to_string ( ) ) ) {
265- * timeout = time:: now ( ) ;
267+ * timeout = time:: now ( ) + Duration :: seconds ( POOL_REPLY_TIMEOUT ) ;
268+ } else {
269+ debug ! ( "late REQACK for req_id {}, node {}" , req_id, node_alias) ;
266270 }
267271 }
268272
@@ -293,7 +297,7 @@ impl PoolConnection {
293297 let s = self . _get_socket ( idx) ?;
294298 s. send_str ( & req, zmq:: DONTWAIT ) ?;
295299 }
296- self . timeouts . borrow_mut ( ) . insert ( ( req_id, self . nodes [ idx] . name . clone ( ) ) , time:: now ( ) ) ;
300+ self . timeouts . borrow_mut ( ) . insert ( ( req_id, self . nodes [ idx] . name . clone ( ) ) , time:: now ( ) + Duration :: seconds ( POOL_ACK_TIMEOUT ) ) ;
297301 trace ! ( "_send_msg_to_one_node <<" ) ;
298302 Ok ( ( ) )
299303 }
@@ -680,13 +684,13 @@ pub mod networker_tests {
680684 let mut conn = PoolConnection :: new ( vec ! [ rn] ) ;
681685
682686 let timeout = conn. get_timeout ( ) ;
683- assert_eq ! ( ( ( "" . to_string( ) , "" . to_string( ) ) , POOL_CON_ACTIVE_TO * 1000 ) , timeout) ;
687+ assert_eq ! ( ( ( "" . to_string( ) , "" . to_string( ) ) , POOL_ACK_TIMEOUT * 1000 ) , timeout) ;
684688
685689 conn. send_request ( Some ( NetworkerEvent :: SendOneRequest ( MESSAGE . to_string ( ) , REQ_ID . to_string ( ) ) ) ) . unwrap ( ) ;
686690
687691 let ( id, timeout) = conn. get_timeout ( ) ;
688692 assert_eq ! ( ( REQ_ID . to_string( ) , NODE_NAME . to_string( ) ) , id) ;
689- assert_ne ! ( POOL_CON_ACTIVE_TO * 1000 , timeout)
693+ assert_ne ! ( POOL_ACK_TIMEOUT * 1000 , timeout)
690694 }
691695
692696 #[ test]
0 commit comments