Skip to content

Commit 555cf5a

Browse files
committed
perf: partially recovers the regression of request throughput on the per_worker policy
(cherry picked from commit 0aac44b)
1 parent 6567af7 commit 555cf5a

File tree

4 files changed

+80
-76
lines changed

4 files changed

+80
-76
lines changed

crates/base/src/rt_worker/supervisor/strategy_per_worker.rs

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub async fn supervise(args: Arguments) -> ShutdownReason {
2121

2222
let Timing {
2323
status: TimingStatus { demand, is_retired },
24-
req: (mut req_start_rx, mut req_end_rx),
24+
req: (_, mut req_end_rx),
2525
} = timing.unwrap_or_default();
2626

2727
let is_retired = is_retired.unwrap();
@@ -51,12 +51,6 @@ pub async fn supervise(args: Arguments) -> ShutdownReason {
5151
if !cpu_time_soft_limit_reached {
5252
// retire worker
5353
is_retired.raise();
54-
if let Some(tx) = pool_msg_tx.clone() {
55-
if tx.send(UserWorkerMsgs::Retire(key)).is_err() {
56-
error!("failed to send retire msg to pool: {:?}", key);
57-
}
58-
}
59-
6054
error!("CPU time soft limit reached. isolate: {:?}", key);
6155
cpu_time_soft_limit_reached = true;
6256

@@ -82,10 +76,6 @@ pub async fn supervise(args: Arguments) -> ShutdownReason {
8276
}
8377
}
8478

85-
Some(notify) = req_start_rx.recv() => {
86-
notify.notify_one();
87-
}
88-
8979
Some(_) = req_end_rx.recv() => {
9080
req_ack_count += 1;
9181

@@ -119,11 +109,6 @@ pub async fn supervise(args: Arguments) -> ShutdownReason {
119109
} else if wall_clock_alerts == 1 {
120110
// retire worker
121111
is_retired.raise();
122-
if let Some(tx) = pool_msg_tx.clone() {
123-
if tx.send(UserWorkerMsgs::Retire(key)).is_err() {
124-
error!("failed to send retire msg to pool: {:?}", key);
125-
}
126-
}
127112
error!("wall clock duration warning. isolate: {:?}", key);
128113
wall_clock_alerts += 1;
129114
} else {

crates/base/src/rt_worker/worker_ctx.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -431,9 +431,6 @@ pub async fn create_user_worker_pool(
431431
Some(UserWorkerMsgs::SendRequest(key, req, res_tx, conn_watch)) => {
432432
worker_pool.send_request(&key, req, res_tx, conn_watch);
433433
}
434-
Some(UserWorkerMsgs::Retire(key)) => {
435-
worker_pool.retire(&key);
436-
}
437434
Some(UserWorkerMsgs::Idle(key)) => {
438435
worker_pool.idle(&key);
439436
}

crates/base/src/rt_worker/worker_pool.rs

Lines changed: 79 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl ActiveWorkerRegistry {
134134
}
135135
}
136136

137-
fn mark_used_and_try_advance(&mut self) -> Option<&Uuid> {
137+
fn mark_used_and_try_advance(&mut self, policy: SupervisorPolicy) -> Option<&Uuid> {
138138
if self.workers.is_empty() {
139139
let _ = self.next.take();
140140
return None;
@@ -147,25 +147,40 @@ impl ActiveWorkerRegistry {
147147
.unwrap_or(0);
148148

149149
match self.workers.iter().nth(idx).cloned() {
150-
Some(WorkerId(key, true)) => {
151-
let key = self
152-
.workers
153-
.replace(WorkerId(key, false))
154-
.and_then(|WorkerId(ref key, _)| self.workers.get(key).map(|it| &it.0));
155-
156-
self.next = self.workers.iter().position(|it| it.1);
157-
key
158-
}
150+
Some(WorkerId(key, true)) => match policy {
151+
SupervisorPolicy::PerWorker => {
152+
self.next = Some(idx + 1);
153+
self.workers.get(&key).map(|it| &it.0)
154+
}
155+
156+
SupervisorPolicy::PerRequest { .. } => {
157+
let key = self
158+
.workers
159+
.replace(WorkerId(key, false))
160+
.and_then(|WorkerId(ref key, _)| self.workers.get(key).map(|it| &it.0));
161+
162+
self.next = self.workers.iter().position(|it| it.1);
163+
key
164+
}
165+
},
166+
159167
_ => {
160168
let _ = self.next.take();
161169
None
162170
}
163171
}
164172
}
165173

166-
fn mark_idle(&mut self, key: &Uuid) {
167-
if let Some(WorkerId(key, false)) = self.workers.get(key).cloned() {
168-
let _ = self.workers.replace(WorkerId(key, true));
174+
fn mark_idle(&mut self, key: &Uuid, policy: SupervisorPolicy) {
175+
if let Some(WorkerId(key, mark)) = self.workers.get(key).cloned() {
176+
if policy.is_per_request() {
177+
if mark != false {
178+
return;
179+
}
180+
181+
let _ = self.workers.replace(WorkerId(key, true));
182+
}
183+
169184
let (notify_tx, _) = self.notify_pair.clone();
170185
let _ = notify_tx.send(Some(key));
171186
}
@@ -418,7 +433,10 @@ impl WorkerPool {
418433
.entry(profile.service_path.clone())
419434
.or_insert_with(|| ActiveWorkerRegistry::new(self.policy.max_parallelism));
420435

421-
registry.workers.insert(WorkerId(key, false));
436+
registry
437+
.workers
438+
.insert(WorkerId(key, self.policy.supervisor_policy.is_per_worker()));
439+
422440
self.user_workers.insert(key, profile);
423441
}
424442

@@ -431,21 +449,24 @@ impl WorkerPool {
431449
) {
432450
let _: Result<(), Error> = match self.user_workers.get(key) {
433451
Some(worker) => {
452+
let policy = self.policy.supervisor_policy;
434453
let profile = worker.clone();
435454
let cancel = worker.cancel.clone();
436455
let (req_start_tx, req_end_tx) = profile.timing_tx_pair.clone();
437456

438457
// Create a closure to handle the request and send the response
439458
let request_handler = async move {
440-
let fence = Arc::new(Notify::const_new());
459+
if !policy.is_per_worker() {
460+
let fence = Arc::new(Notify::const_new());
441461

442-
if let Err(ex) = req_start_tx.send(fence.clone()) {
443-
error!("failed to notify the fence to the supervisor");
444-
return Err(ex)
445-
.with_context(|| "failed to notify the fence to the supervisor");
446-
}
462+
if let Err(ex) = req_start_tx.send(fence.clone()) {
463+
error!("failed to notify the fence to the supervisor");
464+
return Err(ex)
465+
.with_context(|| "failed to notify the fence to the supervisor");
466+
}
447467

448-
fence.notified().await;
468+
fence.notified().await;
469+
}
449470

450471
let result = send_user_worker_request(
451472
profile.worker_request_msg_tx,
@@ -487,26 +508,13 @@ impl WorkerPool {
487508
};
488509
}
489510

490-
pub fn retire(&mut self, key: &Uuid) {
491-
if let Some(profile) = self.user_workers.get(key) {
492-
let registry = self
493-
.active_workers
494-
.get_mut(&profile.service_path)
495-
.expect("registry must be initialized at this point");
496-
497-
if registry.workers.contains(key) {
498-
registry.workers.remove(key);
499-
}
500-
}
501-
}
502-
503511
pub fn idle(&mut self, key: &Uuid) {
504512
if let Some(registry) = self
505513
.user_workers
506514
.get_mut(key)
507515
.and_then(|it| self.active_workers.get_mut(&it.service_path))
508516
{
509-
registry.mark_idle(key);
517+
registry.mark_idle(key, self.policy.supervisor_policy);
510518
}
511519
}
512520

@@ -525,6 +533,19 @@ impl WorkerPool {
525533
let _ = notify_tx.send(None);
526534
}
527535

536+
fn retire(&mut self, key: &Uuid) {
537+
if let Some(profile) = self.user_workers.get(key) {
538+
let registry = self
539+
.active_workers
540+
.get_mut(&profile.service_path)
541+
.expect("registry must be initialized at this point");
542+
543+
if registry.workers.contains(key) {
544+
registry.workers.remove(key);
545+
}
546+
}
547+
}
548+
528549
fn maybe_active_worker(&mut self, service_path: &String, force_create: bool) -> Option<Uuid> {
529550
if force_create {
530551
return None;
@@ -534,33 +555,35 @@ impl WorkerPool {
534555
return None;
535556
};
536557

537-
let mut advance_fn = move || registry.mark_used_and_try_advance().copied();
558+
let policy = self.policy.supervisor_policy;
559+
let mut advance_fn = move || registry.mark_used_and_try_advance(policy).copied();
538560

539-
if self.policy.supervisor_policy.is_per_request() {
561+
if policy.is_per_request() {
540562
return advance_fn();
541563
}
542564

543-
loop {
544-
let Some(worker_uuid) = advance_fn() else {
545-
return None;
546-
};
565+
let Some(worker_uuid) = advance_fn() else {
566+
return None;
567+
};
547568

548-
match self
549-
.user_workers
550-
.get(&worker_uuid)
551-
.and_then(|it| it.status.is_retired.as_ref())
552-
{
553-
Some(is_retired) if !is_retired.is_raised() => {
554-
self.user_workers
555-
.get(&worker_uuid)
556-
.map(|it| it.status.demand.as_ref())
557-
.unwrap()
558-
.fetch_add(1, Ordering::Release);
559-
560-
return Some(worker_uuid);
561-
}
569+
match self
570+
.user_workers
571+
.get(&worker_uuid)
572+
.and_then(|it| it.status.is_retired.as_ref())
573+
{
574+
Some(is_retired) if !is_retired.is_raised() => {
575+
self.user_workers
576+
.get(&worker_uuid)
577+
.map(|it| it.status.demand.as_ref())
578+
.unwrap()
579+
.fetch_add(1, Ordering::Release);
580+
581+
return Some(worker_uuid);
582+
}
562583

563-
_ => {}
584+
_ => {
585+
self.retire(&worker_uuid);
586+
self.maybe_active_worker(service_path, force_create)
564587
}
565588
}
566589
}

crates/sb_workers/context.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ pub enum UserWorkerMsgs {
141141
oneshot::Sender<Result<SendRequestResult, Error>>,
142142
Option<watch::Receiver<ConnSync>>,
143143
),
144-
Retire(Uuid),
145144
Idle(Uuid),
146145
Shutdown(Uuid),
147146
}

0 commit comments

Comments
 (0)