Skip to content

Commit 0a11cf5

Browse files
authored
Separate WorkerHandle to two parts (#323)
1 parent 859f458 commit 0a11cf5

File tree

4 files changed

+74
-55
lines changed

4 files changed

+74
-55
lines changed

actix-server/src/accept.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use slab::Slab;
1212
use crate::server::Server;
1313
use crate::socket::{MioListener, SocketAddr};
1414
use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN};
15-
use crate::worker::{Conn, WorkerHandle};
15+
use crate::worker::{Conn, WorkerHandleAccept};
1616
use crate::Token;
1717

1818
struct ServerSocketInfo {
@@ -66,7 +66,7 @@ impl AcceptLoop {
6666
pub(crate) fn start(
6767
&mut self,
6868
socks: Vec<(Token, MioListener)>,
69-
handles: Vec<WorkerHandle>,
69+
handles: Vec<WorkerHandleAccept>,
7070
) {
7171
let srv = self.srv.take().expect("Can not re-use AcceptInfo");
7272
let poll = self.poll.take().unwrap();
@@ -80,7 +80,7 @@ impl AcceptLoop {
8080
struct Accept {
8181
poll: Poll,
8282
waker: WakerQueue,
83-
handles: Vec<WorkerHandle>,
83+
handles: Vec<WorkerHandleAccept>,
8484
srv: Server,
8585
next: usize,
8686
backpressure: bool,
@@ -105,7 +105,7 @@ impl Accept {
105105
waker: WakerQueue,
106106
socks: Vec<(Token, MioListener)>,
107107
srv: Server,
108-
handles: Vec<WorkerHandle>,
108+
handles: Vec<WorkerHandleAccept>,
109109
) {
110110
// Accept runs in its own thread and would want to spawn additional futures to current
111111
// actix system.
@@ -125,7 +125,7 @@ impl Accept {
125125
poll: Poll,
126126
waker: WakerQueue,
127127
socks: Vec<(Token, MioListener)>,
128-
handles: Vec<WorkerHandle>,
128+
handles: Vec<WorkerHandleAccept>,
129129
srv: Server,
130130
) -> (Accept, Slab<ServerSocketInfo>) {
131131
let mut sockets = Slab::new();

actix-server/src/builder.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,18 @@ use crate::signals::{Signal, Signals};
1919
use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs};
2020
use crate::socket::{MioTcpListener, MioTcpSocket};
2121
use crate::waker_queue::{WakerInterest, WakerQueue};
22-
use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandle};
22+
use crate::worker::{
23+
ServerWorker, ServerWorkerConfig, WorkerAvailability, WorkerHandleAccept,
24+
WorkerHandleServer,
25+
};
2326
use crate::{join_all, Token};
2427

2528
/// Server builder
2629
pub struct ServerBuilder {
2730
threads: usize,
2831
token: Token,
2932
backlog: u32,
30-
handles: Vec<(usize, WorkerHandle)>,
33+
handles: Vec<(usize, WorkerHandleServer)>,
3134
services: Vec<Box<dyn InternalServiceFactory>>,
3235
sockets: Vec<(Token, String, MioListener)>,
3336
accept: AcceptLoop,
@@ -280,10 +283,11 @@ impl ServerBuilder {
280283
// start workers
281284
let handles = (0..self.threads)
282285
.map(|idx| {
283-
let handle = self.start_worker(idx, self.accept.waker_owned());
284-
self.handles.push((idx, handle.clone()));
286+
let (handle_accept, handle_server) =
287+
self.start_worker(idx, self.accept.waker_owned());
288+
self.handles.push((idx, handle_server));
285289

286-
handle
290+
handle_accept
287291
})
288292
.collect();
289293

@@ -311,7 +315,11 @@ impl ServerBuilder {
311315
}
312316
}
313317

314-
fn start_worker(&self, idx: usize, waker: WakerQueue) -> WorkerHandle {
318+
fn start_worker(
319+
&self,
320+
idx: usize,
321+
waker: WakerQueue,
322+
) -> (WorkerHandleAccept, WorkerHandleServer) {
315323
let avail = WorkerAvailability::new(waker);
316324
let services = self.services.iter().map(|v| v.clone_factory()).collect();
317325

@@ -437,9 +445,10 @@ impl ServerBuilder {
437445
break;
438446
}
439447

440-
let handle = self.start_worker(new_idx, self.accept.waker_owned());
441-
self.handles.push((new_idx, handle.clone()));
442-
self.accept.wake(WakerInterest::Worker(handle));
448+
let (handle_accept, handle_server) =
449+
self.start_worker(new_idx, self.accept.waker_owned());
450+
self.handles.push((new_idx, handle_server));
451+
self.accept.wake(WakerInterest::Worker(handle_accept));
443452
}
444453
}
445454
}

actix-server/src/waker_queue.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use std::{
66

77
use mio::{Registry, Token as MioToken, Waker};
88

9-
use crate::worker::WorkerHandle;
9+
use crate::worker::WorkerHandleAccept;
1010

1111
/// Waker token for `mio::Poll` instance.
1212
pub(crate) const WAKER_TOKEN: MioToken = MioToken(usize::MAX);
@@ -84,6 +84,6 @@ pub(crate) enum WakerInterest {
8484
Timer,
8585
/// `Worker` is an interest happen after a worker runs into faulted state(This is determined
8686
/// by if work can be sent to it successfully).`Accept` would be waked up and add the new
87-
/// `WorkerHandle`.
88-
Worker(WorkerHandle),
87+
/// `WorkerHandleAccept`.
88+
Worker(WorkerHandleAccept),
8989
}

actix-server/src/worker.rs

Lines changed: 48 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@ use crate::socket::MioStream;
2828
use crate::waker_queue::{WakerInterest, WakerQueue};
2929
use crate::{join_all, Token};
3030

31-
pub(crate) struct WorkerCommand(Conn);
32-
33-
/// Stop worker message. Returns `true` on successful shutdown
34-
/// and `false` if some connections still alive.
35-
pub(crate) struct StopCommand {
31+
/// Stop worker message. Returns `true` on successful graceful shutdown.
32+
/// and `false` if some connections still alive when shutdown execute.
33+
pub(crate) struct Stop {
3634
graceful: bool,
3735
tx: oneshot::Sender<bool>,
3836
}
@@ -43,42 +41,55 @@ pub(crate) struct Conn {
4341
pub token: Token,
4442
}
4543

46-
// a handle to worker that can send message to worker and share the availability of worker to other
47-
// thread.
48-
#[derive(Clone)]
49-
pub(crate) struct WorkerHandle {
50-
pub idx: usize,
51-
tx1: UnboundedSender<WorkerCommand>,
52-
tx2: UnboundedSender<StopCommand>,
44+
fn handle_pair(
45+
idx: usize,
46+
tx1: UnboundedSender<Conn>,
47+
tx2: UnboundedSender<Stop>,
5348
avail: WorkerAvailability,
49+
) -> (WorkerHandleAccept, WorkerHandleServer) {
50+
let accept = WorkerHandleAccept {
51+
idx,
52+
tx: tx1,
53+
avail,
54+
};
55+
56+
let server = WorkerHandleServer { idx, tx: tx2 };
57+
58+
(accept, server)
5459
}
5560

56-
impl WorkerHandle {
57-
pub fn new(
58-
idx: usize,
59-
tx1: UnboundedSender<WorkerCommand>,
60-
tx2: UnboundedSender<StopCommand>,
61-
avail: WorkerAvailability,
62-
) -> Self {
63-
WorkerHandle {
64-
idx,
65-
tx1,
66-
tx2,
67-
avail,
68-
}
69-
}
61+
/// Handle to worker that can send connection message to worker and share the
62+
/// availability of worker to other thread.
63+
///
64+
/// Held by [Accept](crate::accept::Accept).
65+
pub(crate) struct WorkerHandleAccept {
66+
pub idx: usize,
67+
tx: UnboundedSender<Conn>,
68+
avail: WorkerAvailability,
69+
}
7070

71-
pub fn send(&self, msg: Conn) -> Result<(), Conn> {
72-
self.tx1.send(WorkerCommand(msg)).map_err(|msg| msg.0 .0)
71+
impl WorkerHandleAccept {
72+
pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> {
73+
self.tx.send(msg).map_err(|msg| msg.0)
7374
}
7475

75-
pub fn available(&self) -> bool {
76+
pub(crate) fn available(&self) -> bool {
7677
self.avail.available()
7778
}
79+
}
80+
81+
/// Handle to worker than can send stop message to worker.
82+
///
83+
/// Held by [ServerBuilder](crate::builder::ServerBuilder).
84+
pub(crate) struct WorkerHandleServer {
85+
pub idx: usize,
86+
tx: UnboundedSender<Stop>,
87+
}
7888

79-
pub fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
89+
impl WorkerHandleServer {
90+
pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver<bool> {
8091
let (tx, rx) = oneshot::channel();
81-
let _ = self.tx2.send(StopCommand { graceful, tx });
92+
let _ = self.tx.send(Stop { graceful, tx });
8293
rx
8394
}
8495
}
@@ -114,8 +125,8 @@ impl WorkerAvailability {
114125
///
115126
/// Worker accepts Socket objects via unbounded channel and starts stream processing.
116127
pub(crate) struct ServerWorker {
117-
rx: UnboundedReceiver<WorkerCommand>,
118-
rx2: UnboundedReceiver<StopCommand>,
128+
rx: UnboundedReceiver<Conn>,
129+
rx2: UnboundedReceiver<Stop>,
119130
services: Vec<WorkerService>,
120131
availability: WorkerAvailability,
121132
conns: Counter,
@@ -187,7 +198,7 @@ impl ServerWorker {
187198
factories: Vec<Box<dyn InternalServiceFactory>>,
188199
availability: WorkerAvailability,
189200
config: ServerWorkerConfig,
190-
) -> WorkerHandle {
201+
) -> (WorkerHandleAccept, WorkerHandleServer) {
191202
let (tx1, rx) = unbounded_channel();
192203
let (tx2, rx2) = unbounded_channel();
193204
let avail = availability.clone();
@@ -254,7 +265,7 @@ impl ServerWorker {
254265
});
255266
});
256267

257-
WorkerHandle::new(idx, tx1, tx2, avail)
268+
handle_pair(idx, tx1, tx2, avail)
258269
}
259270

260271
fn restart_service(&mut self, token: Token, factory_id: usize) {
@@ -360,8 +371,7 @@ impl Future for ServerWorker {
360371
let this = self.as_mut().get_mut();
361372

362373
// `StopWorker` message handler
363-
if let Poll::Ready(Some(StopCommand { graceful, tx })) =
364-
Pin::new(&mut this.rx2).poll_recv(cx)
374+
if let Poll::Ready(Some(Stop { graceful, tx })) = Pin::new(&mut this.rx2).poll_recv(cx)
365375
{
366376
this.availability.set(false);
367377
let num = this.conns.total();
@@ -472,7 +482,7 @@ impl Future for ServerWorker {
472482

473483
match ready!(Pin::new(&mut this.rx).poll_recv(cx)) {
474484
// handle incoming io stream
475-
Some(WorkerCommand(msg)) => {
485+
Some(msg) => {
476486
let guard = this.conns.get();
477487
let _ = this.services[msg.token.0].service.call((guard, msg.io));
478488
}

0 commit comments

Comments
 (0)