Skip to content

Commit 8e98d91

Browse files
authored
add test for restart worker thread (#328)
1 parent 3c1f577 commit 8e98d91

File tree

2 files changed

+157
-16
lines changed

2 files changed

+157
-16
lines changed

actix-server/src/worker.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,8 @@ impl WorkerAvailability {
125125
///
126126
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
127127
pub(crate) struct ServerWorker {
128+
// UnboundedReceiver<Conn> should always be the first field.
129+
// It must be dropped as soon as ServerWorker dropping.
128130
rx: UnboundedReceiver<Conn>,
129131
rx2: UnboundedReceiver<Stop>,
130132
services: Box<[WorkerService]>,
@@ -370,6 +372,7 @@ impl Default for WorkerState {
370372

371373
impl Drop for ServerWorker {
372374
fn drop(&mut self) {
375+
// Stop the Arbiter ServerWorker runs on on drop.
373376
Arbiter::current().stop();
374377
}
375378
}

actix-server/tests/test_server.rs

Lines changed: 154 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use std::sync::atomic::{AtomicUsize, Ordering};
22
use std::sync::{mpsc, Arc};
3-
use std::{net, thread, time};
3+
use std::{net, thread, time::Duration};
44

5+
use actix_rt::{net::TcpStream, time::sleep};
56
use actix_server::Server;
67
use actix_service::fn_service;
78
use actix_utils::future::ok;
@@ -37,7 +38,7 @@ fn test_bind() {
3738
});
3839
let (_, sys) = rx.recv().unwrap();
3940

40-
thread::sleep(time::Duration::from_millis(500));
41+
thread::sleep(Duration::from_millis(500));
4142
assert!(net::TcpStream::connect(addr).is_ok());
4243
sys.stop();
4344
let _ = h.join();
@@ -64,7 +65,7 @@ fn test_listen() {
6465
});
6566
let sys = rx.recv().unwrap();
6667

67-
thread::sleep(time::Duration::from_millis(500));
68+
thread::sleep(Duration::from_millis(500));
6869
assert!(net::TcpStream::connect(addr).is_ok());
6970
sys.stop();
7071
let _ = h.join();
@@ -73,11 +74,11 @@ fn test_listen() {
7374
#[test]
7475
#[cfg(unix)]
7576
fn test_start() {
77+
use std::io::Read;
78+
7679
use actix_codec::{BytesCodec, Framed};
77-
use actix_rt::net::TcpStream;
7880
use bytes::Bytes;
7981
use futures_util::sink::SinkExt;
80-
use std::io::Read;
8182

8283
let addr = unused_addr();
8384
let (tx, rx) = mpsc::channel();
@@ -112,16 +113,16 @@ fn test_start() {
112113

113114
// pause
114115
let _ = srv.pause();
115-
thread::sleep(time::Duration::from_millis(200));
116+
thread::sleep(Duration::from_millis(200));
116117
let mut conn = net::TcpStream::connect(addr).unwrap();
117-
conn.set_read_timeout(Some(time::Duration::from_millis(100)))
118+
conn.set_read_timeout(Some(Duration::from_millis(100)))
118119
.unwrap();
119120
let res = conn.read_exact(&mut buf);
120121
assert!(res.is_err());
121122

122123
// resume
123124
let _ = srv.resume();
124-
thread::sleep(time::Duration::from_millis(100));
125+
thread::sleep(Duration::from_millis(100));
125126
assert!(net::TcpStream::connect(addr).is_ok());
126127
assert!(net::TcpStream::connect(addr).is_ok());
127128
assert!(net::TcpStream::connect(addr).is_ok());
@@ -133,10 +134,10 @@ fn test_start() {
133134

134135
// stop
135136
let _ = srv.stop(false);
136-
thread::sleep(time::Duration::from_millis(100));
137+
thread::sleep(Duration::from_millis(100));
137138
assert!(net::TcpStream::connect(addr).is_err());
138139

139-
thread::sleep(time::Duration::from_millis(100));
140+
thread::sleep(Duration::from_millis(100));
140141
sys.stop();
141142
let _ = h.join();
142143
}
@@ -182,7 +183,7 @@ fn test_configure() {
182183
let _ = sys.run();
183184
});
184185
let (_, sys) = rx.recv().unwrap();
185-
thread::sleep(time::Duration::from_millis(500));
186+
thread::sleep(Duration::from_millis(500));
186187

187188
assert!(net::TcpStream::connect(addr1).is_ok());
188189
assert!(net::TcpStream::connect(addr2).is_ok());
@@ -200,7 +201,6 @@ async fn test_max_concurrent_connections() {
200201
// The limit test on the other hand is only for concurrent tcp stream limiting a work
201202
// thread accept.
202203

203-
use actix_rt::net::TcpStream;
204204
use tokio::io::AsyncWriteExt;
205205

206206
let addr = unused_addr();
@@ -226,7 +226,7 @@ async fn test_max_concurrent_connections() {
226226
let counter = counter.clone();
227227
async move {
228228
counter.fetch_add(1, Ordering::SeqCst);
229-
actix_rt::time::sleep(time::Duration::from_secs(20)).await;
229+
sleep(Duration::from_secs(20)).await;
230230
counter.fetch_sub(1, Ordering::SeqCst);
231231
Ok::<(), ()>(())
232232
}
@@ -249,7 +249,7 @@ async fn test_max_concurrent_connections() {
249249
conns.push(conn);
250250
}
251251

252-
actix_rt::time::sleep(time::Duration::from_secs(5)).await;
252+
sleep(Duration::from_secs(5)).await;
253253

254254
// counter would remain at 3 even with 12 successful connection.
255255
// and 9 of them remain in backlog.
@@ -268,9 +268,7 @@ async fn test_max_concurrent_connections() {
268268
#[actix_rt::test]
269269
async fn test_service_restart() {
270270
use std::task::{Context, Poll};
271-
use std::time::Duration;
272271

273-
use actix_rt::{net::TcpStream, time::sleep};
274272
use actix_service::{fn_factory, Service};
275273
use futures_core::future::LocalBoxFuture;
276274
use tokio::io::AsyncWriteExt;
@@ -438,3 +436,143 @@ async fn test_service_restart() {
438436
let _ = server.stop(false);
439437
let _ = h.join().unwrap();
440438
}
439+
440+
#[actix_rt::test]
441+
async fn worker_restart() {
442+
use actix_service::{Service, ServiceFactory};
443+
use futures_core::future::LocalBoxFuture;
444+
use tokio::io::{AsyncReadExt, AsyncWriteExt};
445+
446+
struct TestServiceFactory(Arc<AtomicUsize>);
447+
448+
impl ServiceFactory<TcpStream> for TestServiceFactory {
449+
type Response = ();
450+
type Error = ();
451+
type Config = ();
452+
type Service = TestService;
453+
type InitError = ();
454+
type Future = LocalBoxFuture<'static, Result<Self::Service, Self::InitError>>;
455+
456+
fn new_service(&self, _: Self::Config) -> Self::Future {
457+
let counter = self.0.fetch_add(1, Ordering::Relaxed);
458+
459+
Box::pin(async move { Ok(TestService(counter)) })
460+
}
461+
}
462+
463+
struct TestService(usize);
464+
465+
impl Service<TcpStream> for TestService {
466+
type Response = ();
467+
type Error = ();
468+
type Future = LocalBoxFuture<'static, Result<Self::Response, Self::Error>>;
469+
470+
actix_service::always_ready!();
471+
472+
fn call(&self, stream: TcpStream) -> Self::Future {
473+
let counter = self.0;
474+
475+
let mut stream = stream.into_std().unwrap();
476+
use std::io::Write;
477+
let str = counter.to_string();
478+
let buf = str.as_bytes();
479+
480+
let mut written = 0;
481+
482+
while written < buf.len() {
483+
if let Ok(n) = stream.write(&buf[written..]) {
484+
written += n;
485+
}
486+
}
487+
stream.flush().unwrap();
488+
stream.shutdown(net::Shutdown::Write).unwrap();
489+
490+
// force worker 2 to restart service once.
491+
if counter == 2 {
492+
panic!("panic on purpose")
493+
} else {
494+
Box::pin(async { Ok(()) })
495+
}
496+
}
497+
}
498+
499+
let addr = unused_addr();
500+
let (tx, rx) = mpsc::channel();
501+
502+
let counter = Arc::new(AtomicUsize::new(1));
503+
let h = thread::spawn(move || {
504+
let counter = counter.clone();
505+
actix_rt::System::new().block_on(async {
506+
let server = Server::build()
507+
.disable_signals()
508+
.bind("addr", addr, move || TestServiceFactory(counter.clone()))
509+
.unwrap()
510+
.workers(2)
511+
.run();
512+
513+
let _ = tx.send((server.clone(), actix_rt::System::current()));
514+
server.await
515+
})
516+
});
517+
518+
let (server, sys) = rx.recv().unwrap();
519+
520+
sleep(Duration::from_secs(3)).await;
521+
522+
let mut buf = [0; 8];
523+
524+
// worker 1 would not restart and return it's id consistently.
525+
let mut stream = TcpStream::connect(addr).await.unwrap();
526+
let n = stream.read(&mut buf).await.unwrap();
527+
let id = String::from_utf8_lossy(&buf[0..n]);
528+
assert_eq!("1", id);
529+
stream.shutdown().await.unwrap();
530+
531+
// worker 2 dead after return response.
532+
let mut stream = TcpStream::connect(addr).await.unwrap();
533+
let n = stream.read(&mut buf).await.unwrap();
534+
let id = String::from_utf8_lossy(&buf[0..n]);
535+
assert_eq!("2", id);
536+
stream.shutdown().await.unwrap();
537+
538+
// request to worker 1
539+
let mut stream = TcpStream::connect(addr).await.unwrap();
540+
let n = stream.read(&mut buf).await.unwrap();
541+
let id = String::from_utf8_lossy(&buf[0..n]);
542+
assert_eq!("1", id);
543+
stream.shutdown().await.unwrap();
544+
545+
// TODO: Remove sleep if it can pass CI.
546+
sleep(Duration::from_secs(3)).await;
547+
548+
// worker 2 restarting and work goes to worker 1.
549+
let mut stream = TcpStream::connect(addr).await.unwrap();
550+
let n = stream.read(&mut buf).await.unwrap();
551+
let id = String::from_utf8_lossy(&buf[0..n]);
552+
assert_eq!("1", id);
553+
stream.shutdown().await.unwrap();
554+
555+
// TODO: Remove sleep if it can pass CI.
556+
sleep(Duration::from_secs(3)).await;
557+
558+
// worker 2 restarted but worker 1 was still the next to accept connection.
559+
let mut stream = TcpStream::connect(addr).await.unwrap();
560+
let n = stream.read(&mut buf).await.unwrap();
561+
let id = String::from_utf8_lossy(&buf[0..n]);
562+
assert_eq!("1", id);
563+
stream.shutdown().await.unwrap();
564+
565+
// TODO: Remove sleep if it can pass CI.
566+
sleep(Duration::from_secs(3)).await;
567+
568+
// worker 2 accept connection again but it's id is 3.
569+
let mut stream = TcpStream::connect(addr).await.unwrap();
570+
let n = stream.read(&mut buf).await.unwrap();
571+
let id = String::from_utf8_lossy(&buf[0..n]);
572+
assert_eq!("3", id);
573+
stream.shutdown().await.unwrap();
574+
575+
sys.stop();
576+
let _ = server.stop(false);
577+
let _ = h.join().unwrap();
578+
}

0 commit comments

Comments
 (0)