Skip to content

Commit 3859e91

Browse files
authored
Use named type for WorkerState::Restarting and Shutdown (#317)
1 parent 8aade72 commit 3859e91

File tree

1 file changed

+53
-36
lines changed

1 file changed

+53
-36
lines changed

actix-server/src/worker.rs

Lines changed: 53 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub(crate) struct WorkerCommand(Conn);
3434
/// and `false` if some connections still alive.
3535
pub(crate) struct StopCommand {
3636
graceful: bool,
37-
result: oneshot::Sender<bool>,
37+
tx: oneshot::Sender<bool>,
3838
}
3939

4040
#[derive(Debug)]
@@ -98,8 +98,8 @@ impl WorkerHandle {
9898
}
9999

100100
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
101-
let (result, rx) = oneshot::channel();
102-
let _ = self.tx2.send(StopCommand { graceful, result });
101+
let (tx, rx) = oneshot::channel();
102+
let _ = self.tx2.send(StopCommand { graceful, tx });
103103
rx
104104
}
105105
}
@@ -221,7 +221,7 @@ impl ServerWorker {
221221
let mut wrk = MAX_CONNS_COUNTER.with(move |conns| ServerWorker {
222222
rx,
223223
rx2,
224-
services: Default::default(),
224+
services: Vec::new(),
225225
availability,
226226
factories,
227227
state: Default::default(),
@@ -272,11 +272,15 @@ impl ServerWorker {
272272
WorkerHandle::new(idx, tx1, tx2, avail)
273273
}
274274

275-
fn restart_service(&mut self, token: Token, idx: usize) {
276-
let factory = &self.factories[idx];
275+
fn restart_service(&mut self, token: Token, factory_id: usize) {
276+
let factory = &self.factories[factory_id];
277277
trace!("Service {:?} failed, restarting", factory.name(token));
278278
self.services[token.0].status = WorkerServiceStatus::Restarting;
279-
self.state = WorkerState::Restarting(idx, token, factory.create());
279+
self.state = WorkerState::Restarting(Restart {
280+
factory_id,
281+
token,
282+
fut: factory.create(),
283+
});
280284
}
281285

282286
fn shutdown(&mut self, force: bool) {
@@ -342,16 +346,24 @@ impl ServerWorker {
342346
enum WorkerState {
343347
Available,
344348
Unavailable,
345-
Restarting(
346-
usize,
347-
Token,
348-
LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
349-
),
350-
// Shutdown keep states necessary for server shutdown:
351-
// Sleep for interval check the shutdown progress.
352-
// Instant for the start time of shutdown.
353-
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
354-
Shutdown(Pin<Box<Sleep>>, Instant, oneshot::Sender<bool>),
349+
Restarting(Restart),
350+
Shutdown(Shutdown),
351+
}
352+
353+
struct Restart {
354+
factory_id: usize,
355+
token: Token,
356+
fut: LocalBoxFuture<'static, Result<Vec<(Token, BoxedServerService)>, ()>>,
357+
}
358+
359+
// Shutdown keep states necessary for server shutdown:
360+
// Sleep for interval check the shutdown progress.
361+
// Instant for the start time of shutdown.
362+
// Sender for send back the shutdown outcome(force/grace) to StopCommand caller.
363+
struct Shutdown {
364+
timer: Pin<Box<Sleep>>,
365+
start_from: Instant,
366+
tx: oneshot::Sender<bool>,
355367
}
356368

357369
impl Default for WorkerState {
@@ -367,27 +379,29 @@ impl Future for ServerWorker {
367379
let this = self.as_mut().get_mut();
368380

369381
// `StopWorker` message handler
370-
if let Poll::Ready(Some(StopCommand { graceful, result })) =
382+
if let Poll::Ready(Some(StopCommand { graceful, tx })) =
371383
Pin::new(&mut this.rx2).poll_recv(cx)
372384
{
373385
this.availability.set(false);
374386
let num = num_connections();
375387
if num == 0 {
376388
info!("Shutting down worker, 0 connections");
377-
let _ = result.send(true);
389+
let _ = tx.send(true);
378390
return Poll::Ready(());
379391
} else if graceful {
380392
info!("Graceful worker shutdown, {} connections", num);
381393
this.shutdown(false);
382394

383-
let timer = Box::pin(sleep(Duration::from_secs(1)));
384-
let start_from = Instant::now();
385-
this.state = WorkerState::Shutdown(timer, start_from, result);
395+
this.state = WorkerState::Shutdown(Shutdown {
396+
timer: Box::pin(sleep(Duration::from_secs(1))),
397+
start_from: Instant::now(),
398+
tx,
399+
});
386400
} else {
387401
info!("Force shutdown worker, {} connections", num);
388402
this.shutdown(true);
389403

390-
let _ = result.send(false);
404+
let _ = tx.send(false);
391405
return Poll::Ready(());
392406
}
393407
}
@@ -405,11 +419,14 @@ impl Future for ServerWorker {
405419
self.poll(cx)
406420
}
407421
},
408-
WorkerState::Restarting(idx, token, ref mut fut) => {
409-
let item = ready!(fut.as_mut().poll(cx)).unwrap_or_else(|_| {
422+
WorkerState::Restarting(ref mut restart) => {
423+
let factory_id = restart.factory_id;
424+
let token = restart.token;
425+
426+
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
410427
panic!(
411428
"Can not restart {:?} service",
412-
this.factories[idx].name(token)
429+
this.factories[factory_id].name(token)
413430
)
414431
});
415432

@@ -421,37 +438,37 @@ impl Future for ServerWorker {
421438

422439
trace!(
423440
"Service {:?} has been restarted",
424-
this.factories[idx].name(token)
441+
this.factories[factory_id].name(token)
425442
);
426443

427444
this.services[token.0].created(service);
428445
this.state = WorkerState::Unavailable;
429446

430447
self.poll(cx)
431448
}
432-
WorkerState::Shutdown(ref mut timer, ref start_from, _) => {
449+
WorkerState::Shutdown(ref mut shutdown) => {
433450
// Wait for 1 second.
434-
ready!(timer.as_mut().poll(cx));
451+
ready!(shutdown.timer.as_mut().poll(cx));
435452

436453
if num_connections() == 0 {
437454
// Graceful shutdown.
438-
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
439-
let _ = sender.send(true);
455+
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
456+
let _ = shutdown.tx.send(true);
440457
}
441458
Arbiter::current().stop();
442459
Poll::Ready(())
443-
} else if start_from.elapsed() >= this.shutdown_timeout {
460+
} else if shutdown.start_from.elapsed() >= this.shutdown_timeout {
444461
// Timeout forceful shutdown.
445-
if let WorkerState::Shutdown(_, _, sender) = mem::take(&mut this.state) {
446-
let _ = sender.send(false);
462+
if let WorkerState::Shutdown(shutdown) = mem::take(&mut this.state) {
463+
let _ = shutdown.tx.send(false);
447464
}
448465
Arbiter::current().stop();
449466
Poll::Ready(())
450467
} else {
451468
// Reset timer and wait for 1 second.
452469
let time = Instant::now() + Duration::from_secs(1);
453-
timer.as_mut().reset(time);
454-
timer.as_mut().poll(cx)
470+
shutdown.timer.as_mut().reset(time);
471+
shutdown.timer.as_mut().poll(cx)
455472
}
456473
}
457474
// actively poll stream and handle worker command

0 commit comments

Comments
 (0)