@@ -19,10 +19,9 @@ use common::{
1919 identity:: InertIdentity ,
2020 knobs:: {
2121 APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT ,
22+ APPLICATION_MAX_CONCURRENT_ACTIONS ,
2223 APPLICATION_MAX_CONCURRENT_MUTATIONS ,
23- APPLICATION_MAX_CONCURRENT_NODE_ACTIONS ,
2424 APPLICATION_MAX_CONCURRENT_QUERIES ,
25- APPLICATION_MAX_CONCURRENT_V8_ACTIONS ,
2625 ISOLATE_MAX_USER_HEAP_SIZE ,
2726 UDF_EXECUTOR_OCC_INITIAL_BACKOFF ,
2827 UDF_EXECUTOR_OCC_MAX_BACKOFF ,
@@ -226,19 +225,16 @@ impl<RT: Runtime> FunctionRouter<RT> {
226225 database,
227226 system_env_vars,
228227 query_limiter : Arc :: new ( Limiter :: new (
229- ModuleEnvironment :: Isolate ,
230228 UdfType :: Query ,
231229 * APPLICATION_MAX_CONCURRENT_QUERIES ,
232230 ) ) ,
233231 mutation_limiter : Arc :: new ( Limiter :: new (
234- ModuleEnvironment :: Isolate ,
235232 UdfType :: Mutation ,
236233 * APPLICATION_MAX_CONCURRENT_MUTATIONS ,
237234 ) ) ,
238235 action_limiter : Arc :: new ( Limiter :: new (
239- ModuleEnvironment :: Isolate ,
240236 UdfType :: Action ,
241- * APPLICATION_MAX_CONCURRENT_V8_ACTIONS ,
237+ * APPLICATION_MAX_CONCURRENT_ACTIONS ,
242238 ) ) ,
243239 }
244240 }
@@ -259,8 +255,7 @@ impl<RT: Runtime> FunctionRouter<RT> {
259255 context : ExecutionContext ,
260256 ) -> anyhow:: Result < ( Transaction < RT > , FunctionOutcome ) > {
261257 anyhow:: ensure!( udf_type == UdfType :: Query || udf_type == UdfType :: Mutation ) ;
262- // All queries and mutations are run in the isolate environment.
263- let timer = function_total_timer ( ModuleEnvironment :: Isolate , udf_type) ;
258+ let timer = function_total_timer ( udf_type) ;
264259 let ( tx, outcome) = self
265260 . function_runner_execute ( tx, path_and_args, udf_type, journal, context, None )
266261 . await ?;
@@ -276,6 +271,7 @@ impl<RT: Runtime> FunctionRouter<RT> {
276271 log_line_sender : mpsc:: UnboundedSender < LogLine > ,
277272 context : ExecutionContext ,
278273 ) -> anyhow:: Result < ActionOutcome > {
274+ let timer = function_total_timer ( UdfType :: Action ) ;
279275 let ( _, outcome) = self
280276 . function_runner_execute (
281277 tx,
@@ -293,10 +289,11 @@ impl<RT: Runtime> FunctionRouter<RT> {
293289 outcome
294290 )
295291 } ;
292+ timer. finish ( ) ;
296293 Ok ( outcome)
297294 }
298295
299- // Execute using the function runner. Can be used for v8 udfs other than http
296+ // Execute using the function runner. Can be used for all Udf types including
300297 // actions.
301298 async fn function_runner_execute (
302299 & self ,
@@ -319,9 +316,17 @@ impl<RT: Runtime> FunctionRouter<RT> {
319316 UdfType :: Action => & self . action_limiter ,
320317 UdfType :: HttpAction => anyhow:: bail!( "Function runner does not support http actions" ) ,
321318 } ;
322-
323- let request_guard = limiter. acquire_permit_with_timeout ( & self . rt ) . await ?;
324-
319+ let mut request_guard = limiter. start ( ) ;
320+ select_biased ! {
321+ _ = request_guard. acquire_permit( ) . fuse( ) => { } ,
322+ _ = self . rt. wait( * APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT ) => {
323+ log_function_wait_timeout( udf_type) ;
324+ anyhow:: bail!( ErrorMetadata :: overloaded(
325+ "TooManyConcurrentRequests" ,
326+ "Too many concurrent requests, backoff and try again." ,
327+ ) ) ;
328+ } ,
329+ }
325330 let timer = function_run_timer ( udf_type) ;
326331 let ( function_tx, outcome, usage_stats) = self
327332 . function_runner
@@ -379,7 +384,6 @@ impl<RT: Runtime> FunctionRouter<RT> {
379384// and log gauges for the number of waiting and currently running functions.
380385struct Limiter {
381386 udf_type : UdfType ,
382- env : ModuleEnvironment ,
383387
384388 // Used to limit running functions.
385389 semaphore : Semaphore ,
@@ -390,10 +394,9 @@ struct Limiter {
390394}
391395
392396impl Limiter {
393- fn new ( env : ModuleEnvironment , udf_type : UdfType , total_permits : usize ) -> Self {
397+ fn new ( udf_type : UdfType , total_permits : usize ) -> Self {
394398 let limiter = Self {
395399 udf_type,
396- env,
397400 semaphore : Semaphore :: new ( total_permits) ,
398401 total_permits,
399402 total_outstanding : AtomicUsize :: new ( 0 ) ,
@@ -403,24 +406,6 @@ impl Limiter {
403406 limiter
404407 }
405408
406- async fn acquire_permit_with_timeout < ' a , RT : Runtime > (
407- & ' a self ,
408- rt : & ' a RT ,
409- ) -> anyhow:: Result < RequestGuard < ' a > > {
410- let mut request_guard = self . start ( ) ;
411- select_biased ! {
412- _ = request_guard. acquire_permit( ) . fuse( ) => { } ,
413- _ = rt. wait( * APPLICATION_FUNCTION_RUNNER_SEMAPHORE_TIMEOUT ) => {
414- log_function_wait_timeout( self . env, self . udf_type) ;
415- anyhow:: bail!( ErrorMetadata :: overloaded(
416- "TooManyConcurrentRequests" ,
417- "Too many concurrent requests, backoff and try again." ,
418- ) ) ;
419- } ,
420- }
421- Ok ( request_guard)
422- }
423-
424409 fn start ( & self ) -> RequestGuard {
425410 self . total_outstanding . fetch_add ( 1 , Ordering :: SeqCst ) ;
426411 // Update the gauge to account for the newly waiting request.
@@ -438,18 +423,8 @@ impl Limiter {
438423 . total_outstanding
439424 . load ( Ordering :: SeqCst )
440425 . saturating_sub ( running) ;
441- log_outstanding_functions (
442- running,
443- self . env ,
444- self . udf_type ,
445- OutstandingFunctionState :: Running ,
446- ) ;
447- log_outstanding_functions (
448- waiting,
449- self . env ,
450- self . udf_type ,
451- OutstandingFunctionState :: Waiting ,
452- ) ;
426+ log_outstanding_functions ( running, self . udf_type , OutstandingFunctionState :: Running ) ;
427+ log_outstanding_functions ( waiting, self . udf_type , OutstandingFunctionState :: Waiting ) ;
453428 }
454429}
455430
@@ -488,11 +463,6 @@ impl<'a> Drop for RequestGuard<'a> {
488463 }
489464}
490465
491- /// Executes UDFs for backends.
492- ///
493- /// This struct directly executes http and node actions. Queries, Mutations and
494- /// v8 Actions are instead routed through the FunctionRouter and its
495- /// FunctionRunner implementation.
496466pub struct ApplicationFunctionRunner < RT : Runtime > {
497467 runtime : RT ,
498468 pub ( crate ) database : Database < RT > ,
@@ -510,7 +480,6 @@ pub struct ApplicationFunctionRunner<RT: Runtime> {
510480
511481 cache_manager : CacheManager < RT > ,
512482 system_env_vars : BTreeMap < EnvVarName , EnvVarValue > ,
513- node_action_limiter : Limiter ,
514483}
515484
516485impl < RT : Runtime > HeapSize for ApplicationFunctionRunner < RT > {
@@ -560,11 +529,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
560529 function_log,
561530 cache_manager,
562531 system_env_vars,
563- node_action_limiter : Limiter :: new (
564- ModuleEnvironment :: Node ,
565- UdfType :: Action ,
566- * APPLICATION_MAX_CONCURRENT_NODE_ACTIONS ,
567- ) ,
568532 }
569533 }
570534
@@ -1108,8 +1072,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11081072 . await ?
11091073 . context ( "Missing a valid module_version" ) ?;
11101074 let ( log_line_sender, log_line_receiver) = mpsc:: unbounded ( ) ;
1111-
1112- let timer = function_total_timer ( module_version. environment , UdfType :: Action ) ;
11131075 match module_version. environment {
11141076 ModuleEnvironment :: Isolate => {
11151077 // TODO: This is the only use case of clone. We should get rid of clone,
@@ -1136,7 +1098,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11361098 let memory_in_mb: u64 = ( * ISOLATE_MAX_USER_HEAP_SIZE / ( 1 << 20 ) )
11371099 . try_into ( )
11381100 . unwrap ( ) ;
1139- timer. finish ( ) ;
11401101 Ok ( ActionCompletion {
11411102 outcome,
11421103 execution_time : start. elapsed ( ) ,
@@ -1149,10 +1110,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
11491110 } )
11501111 } ,
11511112 ModuleEnvironment :: Node => {
1152- let _request_guard = self
1153- . node_action_limiter
1154- . acquire_permit_with_timeout ( & self . runtime )
1155- . await ?;
11561113 let mut source_maps = BTreeMap :: new ( ) ;
11571114 if let Some ( source_map) = module_version. source_map . clone ( ) {
11581115 source_maps. insert ( name. module ( ) . clone ( ) , source_map) ;
@@ -1247,7 +1204,6 @@ impl<RT: Runtime> ApplicationFunctionRunner<RT> {
12471204 syscall_trace : node_outcome. syscall_trace ,
12481205 udf_server_version,
12491206 } ;
1250- timer. finish ( ) ;
12511207 let memory_in_mb = node_outcome. memory_used_in_mb ;
12521208 Ok ( ActionCompletion {
12531209 outcome,
0 commit comments