@@ -125,9 +125,6 @@ pub struct RequestsScheduler<Env: Environment> {
125125 /// Thread-safe map of validator nodes indexed by their public keys.
126126 /// Each node is wrapped with EMA-based performance tracking information.
127127 nodes : Arc < tokio:: sync:: RwLock < BTreeMap < ValidatorPublicKey , NodeInfo < Env > > > > ,
128- /// Maximum number of concurrent requests allowed per node.
129- /// Prevents overwhelming individual validators with too many parallel requests.
130- max_requests_per_node : usize ,
131128 /// Default scoring weights applied to new nodes.
132129 weights : ScoringWeights ,
133130 /// Default EMA smoothing factor for new nodes.
@@ -148,7 +145,6 @@ impl<Env: Environment> RequestsScheduler<Env> {
148145 ) -> Self {
149146 Self :: with_config (
150147 nodes,
151- config. max_in_flight_requests ,
152148 ScoringWeights :: default ( ) ,
153149 config. alpha ,
154150 config. max_accepted_latency_ms ,
@@ -169,10 +165,8 @@ impl<Env: Environment> RequestsScheduler<Env> {
169165 /// - `cache_ttl`: Time-to-live for cached responses
170166 /// - `max_cache_size`: Maximum number of entries in the cache
171167 /// - `max_request_ttl`: Maximum latency for an in-flight request before we stop deduplicating it
172- #[ expect( clippy:: too_many_arguments) ]
173168 pub fn with_config (
174169 nodes : impl IntoIterator < Item = RemoteNode < Env :: ValidatorNode > > ,
175- max_requests_per_node : usize ,
176170 weights : ScoringWeights ,
177171 alpha : f64 ,
178172 max_expected_latency_ms : f64 ,
@@ -188,18 +182,11 @@ impl<Env: Environment> RequestsScheduler<Env> {
188182 . map ( |node| {
189183 (
190184 node. public_key ,
191- NodeInfo :: with_config (
192- node,
193- weights,
194- alpha,
195- max_expected_latency_ms,
196- max_requests_per_node,
197- ) ,
185+ NodeInfo :: with_config ( node, weights, alpha, max_expected_latency_ms) ,
198186 )
199187 } )
200188 . collect ( ) ,
201189 ) ) ,
202- max_requests_per_node,
203190 weights,
204191 alpha,
205192 max_expected_latency : max_expected_latency_ms,
@@ -474,19 +461,11 @@ impl<Env: Environment> RequestsScheduler<Env> {
474461 let start_time = Instant :: now ( ) ;
475462 let public_key = peer. public_key ;
476463
477- // Acquire request slot
478- let nodes = self . nodes . read ( ) . await ;
479- let node = nodes. get ( & public_key) . expect ( "Node must exist" ) ;
480- let semaphore = node. in_flight_semaphore . clone ( ) ;
481- let permit = semaphore. acquire ( ) . await . unwrap ( ) ;
482- drop ( nodes) ;
483-
484464 // Execute the operation
485465 let result = operation ( peer) . await ;
486466
487467 // Update metrics and release slot
488468 let response_time_ms = start_time. elapsed ( ) . as_millis ( ) as u64 ;
489- drop ( permit) ; // Explicitly drop the permit to release the slot
490469 let is_success = result. is_ok ( ) ;
491470 {
492471 let mut nodes = self . nodes . write ( ) . await ;
@@ -752,13 +731,7 @@ impl<Env: Environment> RequestsScheduler<Env> {
752731 let mut nodes = self . nodes . write ( ) . await ;
753732 let public_key = node. public_key ;
754733 nodes. entry ( public_key) . or_insert_with ( || {
755- NodeInfo :: with_config (
756- node,
757- self . weights ,
758- self . alpha ,
759- self . max_expected_latency ,
760- self . max_requests_per_node ,
761- )
734+ NodeInfo :: with_config ( node, self . weights , self . alpha , self . max_expected_latency )
762735 } ) ;
763736 }
764737}
@@ -791,7 +764,6 @@ mod tests {
791764 ) -> Arc < RequestsScheduler < TestEnvironment > > {
792765 let mut manager = RequestsScheduler :: with_config (
793766 vec ! [ ] , // No actual nodes needed for these tests
794- 10 ,
795767 ScoringWeights :: default ( ) ,
796768 0.1 ,
797769 1000.0 ,
@@ -1020,160 +992,6 @@ mod tests {
1020992 assert_eq ! ( execution_count. load( Ordering :: SeqCst ) , 2 ) ;
1021993 }
1022994
1023- #[ tokio:: test]
1024- async fn test_slot_limiting_blocks_excess_requests ( ) {
1025- // Tests the slot limiting mechanism:
1026- // - Creates a RequestsScheduler with max_requests_per_node = 2
1027- // - Starts two slow requests that acquire both available slots
1028- // - Starts a third request and verifies it's blocked waiting for a slot (execution count stays at 2)
1029- // - Completes the first request to release a slot
1030- // - Verifies the third request now acquires the freed slot and executes (execution count becomes 3)
1031- // - Confirms all requests complete successfully
1032- use linera_base:: identifiers:: BlobType ;
1033-
1034- use crate :: test_utils:: { MemoryStorageBuilder , TestBuilder } ;
1035-
1036- // Create a test environment with one validator
1037- let mut builder = TestBuilder :: new (
1038- MemoryStorageBuilder :: default ( ) ,
1039- 1 ,
1040- 0 ,
1041- InMemorySigner :: new ( None ) ,
1042- )
1043- . await
1044- . unwrap ( ) ;
1045-
1046- // Get the validator node
1047- let validator_node = builder. node ( 0 ) ;
1048- let validator_public_key = validator_node. name ( ) ;
1049-
1050- // Create a RemoteNode wrapper
1051- let remote_node = RemoteNode {
1052- public_key : validator_public_key,
1053- node : validator_node,
1054- } ;
1055-
1056- // Create a RequestsScheduler with max_requests_per_node = 2
1057- let max_slots = 2 ;
1058- let mut manager: RequestsScheduler < TestEnvironment > = RequestsScheduler :: with_config (
1059- vec ! [ remote_node. clone( ) ] ,
1060- max_slots,
1061- ScoringWeights :: default ( ) ,
1062- 0.1 ,
1063- 1000.0 ,
1064- Duration :: from_secs ( 60 ) ,
1065- 100 ,
1066- Duration :: from_secs ( 60 ) ,
1067- ) ;
1068- // Replace the tracker with one using a longer timeout for this test
1069- manager. in_flight_tracker = InFlightTracker :: new ( Duration :: from_secs ( 60 ) ) ;
1070- let manager = Arc :: new ( manager) ;
1071-
1072- // Track execution state
1073- let execution_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
1074- let completion_count = Arc :: new ( AtomicUsize :: new ( 0 ) ) ;
1075-
1076- // Create channels to control when operations complete
1077- let ( tx1, rx1) = oneshot:: channel ( ) ;
1078- let ( tx2, rx2) = oneshot:: channel ( ) ;
1079-
1080- // Start first request using with_peer (will block until signaled)
1081- let manager_clone1 = Arc :: clone ( & manager) ;
1082- let remote_node_clone1 = remote_node. clone ( ) ;
1083- let execution_count_clone1 = execution_count. clone ( ) ;
1084- let completion_count_clone1 = completion_count. clone ( ) ;
1085- let key1 = RequestKey :: Blob ( BlobId :: new ( CryptoHash :: test_hash ( "blob1" ) , BlobType :: Data ) ) ;
1086-
1087- let first_request = tokio:: spawn ( async move {
1088- manager_clone1
1089- . with_peer ( key1, remote_node_clone1, |_peer| async move {
1090- execution_count_clone1. fetch_add ( 1 , Ordering :: SeqCst ) ;
1091- // Simulate work by waiting for signal
1092- let _ = rx1. await ;
1093- completion_count_clone1. fetch_add ( 1 , Ordering :: SeqCst ) ;
1094- Ok ( None ) // Return Option<Blob>
1095- } )
1096- . await
1097- } ) ;
1098-
1099- // Give first request time to start and acquire a slot
1100- tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1101- assert_eq ! ( execution_count. load( Ordering :: SeqCst ) , 1 ) ;
1102-
1103- // Start second request using with_peer (will block until signaled)
1104- let manager_clone2 = Arc :: clone ( & manager) ;
1105- let remote_node_clone2 = remote_node. clone ( ) ;
1106- let execution_count_clone2 = execution_count. clone ( ) ;
1107- let completion_count_clone2 = completion_count. clone ( ) ;
1108- let key2 = RequestKey :: Blob ( BlobId :: new ( CryptoHash :: test_hash ( "blob2" ) , BlobType :: Data ) ) ;
1109-
1110- let second_request = tokio:: spawn ( async move {
1111- manager_clone2
1112- . with_peer ( key2, remote_node_clone2, |_peer| async move {
1113- execution_count_clone2. fetch_add ( 1 , Ordering :: SeqCst ) ;
1114- // Simulate work by waiting for signal
1115- let _ = rx2. await ;
1116- completion_count_clone2. fetch_add ( 1 , Ordering :: SeqCst ) ;
1117- Ok ( None ) // Return Option<Blob>
1118- } )
1119- . await
1120- } ) ;
1121-
1122- // Give second request time to start and acquire the second slot
1123- tokio:: time:: sleep ( Duration :: from_millis ( 50 ) ) . await ;
1124- assert_eq ! ( execution_count. load( Ordering :: SeqCst ) , 2 ) ;
1125-
1126- // Start third request - this should be blocked waiting for a slot
1127- let remote_node_clone3 = remote_node. clone ( ) ;
1128- let execution_count_clone3 = execution_count. clone ( ) ;
1129- let completion_count_clone3 = completion_count. clone ( ) ;
1130- let key3 = RequestKey :: Blob ( BlobId :: new ( CryptoHash :: test_hash ( "blob3" ) , BlobType :: Data ) ) ;
1131-
1132- let third_request = tokio:: spawn ( async move {
1133- manager
1134- . with_peer ( key3, remote_node_clone3, |_peer| async move {
1135- execution_count_clone3. fetch_add ( 1 , Ordering :: SeqCst ) ;
1136- completion_count_clone3. fetch_add ( 1 , Ordering :: SeqCst ) ;
1137- Ok ( None ) // Return Option<Blob>
1138- } )
1139- . await
1140- } ) ;
1141-
1142- // Give third request time to try acquiring a slot
1143- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1144-
1145- // Third request should still be waiting (not executed yet)
1146- assert_eq ! (
1147- execution_count. load( Ordering :: SeqCst ) ,
1148- 2 ,
1149- "Third request should be waiting for a slot"
1150- ) ;
1151-
1152- // Complete the first request to release a slot
1153- tx1. send ( ( ) ) . unwrap ( ) ;
1154-
1155- // Wait for first request to complete and third request to start
1156- tokio:: time:: sleep ( Duration :: from_millis ( 100 ) ) . await ;
1157-
1158- // Now the third request should have acquired the freed slot and started executing
1159- assert_eq ! (
1160- execution_count. load( Ordering :: SeqCst ) ,
1161- 3 ,
1162- "Third request should now be executing"
1163- ) ;
1164-
1165- // Complete remaining requests
1166- tx2. send ( ( ) ) . unwrap ( ) ;
1167-
1168- // Wait for all requests to complete
1169- let _result1 = first_request. await . unwrap ( ) ;
1170- let _result2 = second_request. await . unwrap ( ) ;
1171- let _result3 = third_request. await . unwrap ( ) ;
1172-
1173- // Verify all completed
1174- assert_eq ! ( completion_count. load( Ordering :: SeqCst ) , 3 ) ;
1175- }
1176-
1177995 #[ tokio:: test]
1178996 async fn test_alternative_peers_registered_on_deduplication ( ) {
1179997 use linera_base:: identifiers:: BlobType ;
@@ -1203,7 +1021,6 @@ mod tests {
12031021 let manager: Arc < RequestsScheduler < TestEnvironment > > =
12041022 Arc :: new ( RequestsScheduler :: with_config (
12051023 nodes. clone ( ) ,
1206- 1 ,
12071024 ScoringWeights :: default ( ) ,
12081025 0.1 ,
12091026 1000.0 ,
0 commit comments