Skip to content

Commit 8aade72

Browse files
authored
Refactor WorkerState::Shutdown (#310)
1 parent 8079c50 commit 8aade72

File tree

1 file changed

+85
-66
lines changed

1 file changed

+85
-66
lines changed

actix-server/src/worker.rs

Lines changed: 85 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
1-
use std::future::Future;
2-
use std::pin::Pin;
3-
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4-
use std::sync::Arc;
5-
use std::task::{Context, Poll};
6-
use std::time::Duration;
7-
8-
use actix_rt::time::{sleep, Sleep};
9-
use actix_rt::{spawn, Arbiter};
1+
use std::{
2+
future::Future,
3+
mem,
4+
pin::Pin,
5+
sync::{
6+
atomic::{AtomicBool, AtomicUsize, Ordering},
7+
Arc,
8+
},
9+
task::{Context, Poll},
10+
time::Duration,
11+
};
12+
13+
use actix_rt::{
14+
spawn,
15+
time::{sleep, Instant, Sleep},
16+
Arbiter,
17+
};
1018
use actix_utils::counter::Counter;
1119
use futures_core::{future::LocalBoxFuture, ready};
1220
use log::{error, info, trace};
13-
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
14-
use tokio::sync::oneshot;
21+
use tokio::sync::{
22+
mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender},
23+
oneshot,
24+
};
1525

1626
use crate::service::{BoxedServerService, InternalServiceFactory};
1727
use crate::socket::MioStream;
@@ -132,7 +142,7 @@ pub(crate) struct ServerWorker {
132142
conns: Counter,
133143
factories: Vec<Box<dyn InternalServiceFactory>>,
134144
state: WorkerState,
135-
config: ServerWorkerConfig,
145+
shutdown_timeout: Duration,
136146
}
137147

138148
struct WorkerService {
@@ -211,12 +221,12 @@ impl ServerWorker {
211221
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
212222
rx,
213223
rx2,
224+
services: Default::default(),
214225
availability,
215226
factories,
216-
config,
217-
services: Vec::new(),
227+
state: Default::default(),
228+
shutdown_timeout: config.shutdown_timeout,
218229
conns: conns.clone(),
219-
state: WorkerState::Unavailable,
220230
});
221231

222232
let fut = wrk
@@ -337,61 +347,69 @@ enum WorkerState {
337347
Token,
338348
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
339349
),
340-
Shutdown(
341-
Pin<Box<Sleep>>,
342-
Pin<Box<Sleep>>,
343-
Option<oneshot::Sender<bool>>,
344-
),
350+
// Shutdown keep states necessary for server shutdown:
351+
// Sleep for interval check the shutdown progress.
352+
// Instant for the start time of shutdown.
353+
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
354+
Shutdown(Pin<Box<Sleep>>, Instant, oneshot::Sender<bool>),
355+
}
356+
357+
impl Default for WorkerState {
358+
fn default() -> Self {
359+
Self::Unavailable
360+
}
345361
}
346362

347363
impl Future for ServerWorker {
348364
type Output = ();
349365

350366
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367+
let this = self.as_mut().get_mut();
368+
351369
// `StopWorker` message handler
352370
if let Poll::Ready(Some(StopCommand { graceful, result })) =
353-
Pin::new(&mut self.rx2).poll_recv(cx)
371+
Pin::new(&mut this.rx2).poll_recv(cx)
354372
{
355-
self.availability.set(false);
373+
this.availability.set(false);
356374
let num = num_connections();
357375
if num == 0 {
358376
info!("Shutting down worker, 0 connections");
359377
let _ = result.send(true);
360378
return Poll::Ready(());
361379
} else if graceful {
362-
self.shutdown(false);
363380
info!("Graceful worker shutdown, {} connections", num);
364-
self.state = WorkerState::Shutdown(
365-
Box::pin(sleep(Duration::from_secs(1))),
366-
Box::pin(sleep(self.config.shutdown_timeout)),
367-
Some(result),
368-
);
381+
this.shutdown(false);
382+
383+
let timer = Box::pin(sleep(Duration::from_secs(1)));
384+
let start_from = Instant::now();
385+
this.state = WorkerState::Shutdown(timer, start_from, result);
369386
} else {
370387
info!("Force shutdown worker, {} connections", num);
371-
self.shutdown(true);
388+
this.shutdown(true);
389+
372390
let _ = result.send(false);
373391
return Poll::Ready(());
374392
}
375393
}
376394

377-
match self.state {
378-
WorkerState::Unavailable => match self.check_readiness(cx) {
395+
match this.state {
396+
WorkerState::Unavailable => match this.check_readiness(cx) {
379397
Ok(true) => {
380-
self.state = WorkerState::Available;
381-
self.availability.set(true);
398+
this.state = WorkerState::Available;
399+
this.availability.set(true);
382400
self.poll(cx)
383401
}
384402
Ok(false) => Poll::Pending,
385403
Err((token, idx)) => {
386-
self.restart_service(token, idx);
404+
this.restart_service(token, idx);
387405
self.poll(cx)
388406
}
389407
},
390408
WorkerState::Restarting(idx, token, ref mut fut) => {
391409
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
392410
panic!(
393411
"Can not restart {:?} service",
394-
self.factories[idx].name(token)
412+
this.factories[idx].name(token)
395413
)
396414
});
397415

@@ -403,60 +421,61 @@ impl Future for ServerWorker {
403421

404422
trace!(
405423
"Service {:?} has been restarted",
406-
self.factories[idx].name(token)
424+
this.factories[idx].name(token)
407425
);
408426

409-
self.services[token.0].created(service);
410-
self.state = WorkerState::Unavailable;
427+
this.services[token.0].created(service);
428+
this.state = WorkerState::Unavailable;
411429

412430
self.poll(cx)
413431
}
414-
WorkerState::Shutdown(ref mut t1, ref mut t2, ref mut tx) => {
415-
let num = num_connections();
416-
if num == 0 {
417-
let _ = tx.take().unwrap().send(true);
432+
WorkerState::Shutdown(ref mut timer, ref start_from, _) => {
433+
// Wait for 1 second.
434+
ready!(timer.as_mut().poll(cx));
435+
436+
if num_connections() == 0 {
437+
// Graceful shutdown.
438+
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
439+
let _ = sender.send(true);
440+
}
418441
Arbiter::current().stop();
419-
return Poll::Ready(());
420-
}
421-
422-
// check graceful timeout
423-
if Pin::new(t2).poll(cx).is_ready() {
424-
let _ = tx.take().unwrap().send(false);
425-
self.shutdown(true);
442+
Poll::Ready(())
443+
} else if start_from.elapsed() >= this.shutdown_timeout {
444+
// Timeout forceful shutdown.
445+
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
446+
let _ = sender.send(false);
447+
}
426448
Arbiter::current().stop();
427-
return Poll::Ready(());
428-
}
429-
430-
// sleep for 1 second and then check again
431-
if t1.as_mut().poll(cx).is_ready() {
432-
*t1 = Box::pin(sleep(Duration::from_secs(1)));
433-
let _ = t1.as_mut().poll(cx);
449+
Poll::Ready(())
450+
} else {
451+
// Reset timer and wait for 1 second.
452+
let time = Instant::now() + Duration::from_secs(1);
453+
timer.as_mut().reset(time);
454+
timer.as_mut().poll(cx)
434455
}
435-
436-
Poll::Pending
437456
}
438457
// actively poll stream and handle worker command
439458
WorkerState::Available => loop {
440-
match self.check_readiness(cx) {
459+
match this.check_readiness(cx) {
441460
Ok(true) => {}
442461
Ok(false) => {
443462
trace!("Worker is unavailable");
444-
self.availability.set(false);
445-
self.state = WorkerState::Unavailable;
463+
this.availability.set(false);
464+
this.state = WorkerState::Unavailable;
446465
return self.poll(cx);
447466
}
448467
Err((token, idx)) => {
449-
self.restart_service(token, idx);
450-
self.availability.set(false);
468+
this.restart_service(token, idx);
469+
this.availability.set(false);
451470
return self.poll(cx);
452471
}
453472
}
454473

455-
match ready!(Pin::new(&mut self.rx).poll_recv(cx)) {
474+
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
456475
// handle incoming io stream
457476
Some(WorkerCommand(msg)) => {
458-
let guard = self.conns.get();
459-
let _ = self.services[msg.token.0].service.call((guard, msg.io));
477+
let guard = this.conns.get();
478+
let _ = this.services[msg.token.0].service.call((guard, msg.io));
460479
}
461480
None => return Poll::Ready(()),
462481
};

0 commit comments

Comments
 (0)