Skip to content

Commit 5961eb8

Browse files
authored
Fix bug where worker service restart could skip failing services and not being able to restart multiple services (#318)
1 parent 995efcf commit 5961eb8

File tree

2 files changed

+189
-18
lines changed

2 files changed

+189
-18
lines changed

actix-server/src/worker.rs

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,6 @@ impl ServerWorker {
283283

284284
fn check_readiness(&mut self, cx: &mut Context<'_>) -> Result<bool, (Token, usize)> {
285285
let mut ready = self.conns.available(cx);
286-
let mut failed = None;
287286
for (idx, srv) in self.services.iter_mut().enumerate() {
288287
if srv.status == WorkerServiceStatus::Available
289288
|| srv.status == WorkerServiceStatus::Unavailable
@@ -314,17 +313,14 @@ impl ServerWorker {
314313
"Service {:?} readiness check returned error, restarting",
315314
self.factories[srv.factory].name(Token(idx))
316315
);
317-
failed = Some((Token(idx), srv.factory));
318316
srv.status = WorkerServiceStatus::Failed;
317+
return Err((Token(idx), srv.factory));
319318
}
320319
}
321320
}
322321
}
323-
if let Some(idx) = failed {
324-
Err(idx)
325-
} else {
326-
Ok(ready)
327-
}
322+
323+
Ok(ready)
328324
}
329325
}
330326

@@ -408,18 +404,19 @@ impl Future for ServerWorker {
408404
let factory_id = restart.factory_id;
409405
let token = restart.token;
410406

411-
let item = ready!(restart.fut.as_mut().poll(cx)).unwrap_or_else(|_| {
412-
panic!(
413-
"Can not restart {:?} service",
414-
this.factories[factory_id].name(token)
415-
)
416-
});
417-
418-
// Only interest in the first item?
419-
let (token, service) = item
407+
let service = ready!(restart.fut.as_mut().poll(cx))
408+
.unwrap_or_else(|_| {
409+
panic!(
410+
"Can not restart {:?} service",
411+
this.factories[factory_id].name(token)
412+
)
413+
})
420414
.into_iter()
421-
.next()
422-
.expect("No BoxedServerService. Restarting can not progress");
415+
// Find the same token from vector. There should be only one
416+
// So the first match would be enough.
417+
.find(|(t, _)| *t == token)
418+
.map(|(_, service)| service)
419+
.expect("No BoxedServerService found");
423420

424421
trace!(
425422
"Service {:?} has been restarted",

actix-server/tests/test_server.rs

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -264,3 +264,177 @@ async fn test_max_concurrent_connections() {
264264
sys.stop();
265265
let _ = h.join().unwrap();
266266
}
267+
268+
#[actix_rt::test]
269+
async fn test_service_restart() {
270+
use std::task::{Context, Poll};
271+
use std::time::Duration;
272+
273+
use actix_rt::{net::TcpStream, time::sleep};
274+
use actix_service::{fn_factory, Service};
275+
use futures_core::future::LocalBoxFuture;
276+
use tokio::io::AsyncWriteExt;
277+
278+
struct TestService(Arc<AtomicUsize>);
279+
280+
impl Service<TcpStream> for TestService {
281+
type Response = ();
282+
type Error = ();
283+
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
284+
285+
fn poll_ready(&self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
286+
let TestService(ref counter) = self;
287+
let c = counter.fetch_add(1, Ordering::SeqCst);
288+
// Force the service to restart on first readiness check.
289+
if c > 0 {
290+
Poll::Ready(Ok(()))
291+
} else {
292+
Poll::Ready(Err(()))
293+
}
294+
}
295+
296+
fn call(&self, _: TcpStream) -> Self::Future {
297+
Box::pin(async { Ok(()) })
298+
}
299+
}
300+
301+
let addr1 = unused_addr();
302+
let addr2 = unused_addr();
303+
let (tx, rx) = mpsc::channel();
304+
let num = Arc::new(AtomicUsize::new(0));
305+
let num2 = Arc::new(AtomicUsize::new(0));
306+
307+
let num_clone = num.clone();
308+
let num2_clone = num2.clone();
309+
310+
let h = thread::spawn(move || {
311+
actix_rt::System::new().block_on(async {
312+
let server = Server::build()
313+
.backlog(1)
314+
.disable_signals()
315+
.configure(move |cfg| {
316+
let num = num.clone();
317+
let num2 = num2.clone();
318+
cfg.bind("addr1", addr1)
319+
.unwrap()
320+
.bind("addr2", addr2)
321+
.unwrap()
322+
.apply(move |rt| {
323+
let num = num.clone();
324+
let num2 = num2.clone();
325+
rt.service(
326+
"addr1",
327+
fn_factory(move || {
328+
let num = num.clone();
329+
async move { Ok::<_, ()>(TestService(num)) }
330+
}),
331+
);
332+
rt.service(
333+
"addr2",
334+
fn_factory(move || {
335+
let num2 = num2.clone();
336+
async move { Ok::<_, ()>(TestService(num2)) }
337+
}),
338+
);
339+
})
340+
})
341+
.unwrap()
342+
.workers(1)
343+
.run();
344+
345+
let _ = tx.send((server.clone(), actix_rt::System::current()));
346+
server.await
347+
})
348+
});
349+
350+
let (server, sys) = rx.recv().unwrap();
351+
352+
for _ in 0..5 {
353+
TcpStream::connect(addr1)
354+
.await
355+
.unwrap()
356+
.shutdown()
357+
.await
358+
.unwrap();
359+
TcpStream::connect(addr2)
360+
.await
361+
.unwrap()
362+
.shutdown()
363+
.await
364+
.unwrap();
365+
}
366+
367+
sleep(Duration::from_secs(3)).await;
368+
369+
assert!(num_clone.load(Ordering::SeqCst) > 5);
370+
assert!(num2_clone.load(Ordering::SeqCst) > 5);
371+
372+
sys.stop();
373+
let _ = server.stop(false);
374+
let _ = h.join().unwrap();
375+
376+
let addr1 = unused_addr();
377+
let addr2 = unused_addr();
378+
let (tx, rx) = mpsc::channel();
379+
let num = Arc::new(AtomicUsize::new(0));
380+
let num2 = Arc::new(AtomicUsize::new(0));
381+
382+
let num_clone = num.clone();
383+
let num2_clone = num2.clone();
384+
385+
let h = thread::spawn(move || {
386+
let num = num.clone();
387+
actix_rt::System::new().block_on(async {
388+
let server = Server::build()
389+
.backlog(1)
390+
.disable_signals()
391+
.bind("addr1", addr1, move || {
392+
let num = num.clone();
393+
fn_factory(move || {
394+
let num = num.clone();
395+
async move { Ok::<_, ()>(TestService(num)) }
396+
})
397+
})
398+
.unwrap()
399+
.bind("addr2", addr2, move || {
400+
let num2 = num2.clone();
401+
fn_factory(move || {
402+
let num2 = num2.clone();
403+
async move { Ok::<_, ()>(TestService(num2)) }
404+
})
405+
})
406+
.unwrap()
407+
.workers(1)
408+
.run();
409+
410+
let _ = tx.send((server.clone(), actix_rt::System::current()));
411+
server.await
412+
})
413+
});
414+
415+
let (server, sys) = rx.recv().unwrap();
416+
417+
for _ in 0..5 {
418+
TcpStream::connect(addr1)
419+
.await
420+
.unwrap()
421+
.shutdown()
422+
.await
423+
.unwrap();
424+
TcpStream::connect(addr2)
425+
.await
426+
.unwrap()
427+
.shutdown()
428+
.await
429+
.unwrap();
430+
}
431+
432+
sleep(Duration::from_secs(3)).await;
433+
434+
assert!(num_clone.load(Ordering::SeqCst) > 5);
435+
assert!(num2_clone.load(Ordering::SeqCst) > 5);
436+
437+
sys.stop();
438+
let _ = server.stop(false);
439+
let _ = h.join().unwrap();
440+
}

0 commit comments

Comments
 (0)