Skip to content

Commit e0fb67f

Browse files
authored
Reduce ServerWorker size (#321)
1 parent ddce2d6 commit e0fb67f

File tree

1 file changed

+38
-34
lines changed

1 file changed

+38
-34
lines changed

actix-server/src/worker.rs

Lines changed: 38 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,10 @@ impl WorkerAvailability {
127127
pub(crate) struct ServerWorker {
128128
rx: UnboundedReceiver<Conn>,
129129
rx2: UnboundedReceiver<Stop>,
130-
services: Vec<WorkerService>,
130+
services: Box<[WorkerService]>,
131131
availability: WorkerAvailability,
132132
conns: Counter,
133-
factories: Vec<Box<dyn InternalServiceFactory>>,
133+
factories: Box<[Box<dyn InternalServiceFactory>]>,
134134
state: WorkerState,
135135
shutdown_timeout: Duration,
136136
}
@@ -199,6 +199,8 @@ impl ServerWorker {
199199
availability: WorkerAvailability,
200200
config: ServerWorkerConfig,
201201
) -> (WorkerHandleAccept, WorkerHandleServer) {
202+
assert!(!availability.available());
203+
202204
let (tx1, rx) = unbounded_channel();
203205
let (tx2, rx2) = unbounded_channel();
204206
let avail = availability.clone();
@@ -213,20 +215,7 @@ impl ServerWorker {
213215
.unwrap()
214216
})
215217
.spawn(async move {
216-
availability.set(false);
217-
let mut wrk = ServerWorker {
218-
rx,
219-
rx2,
220-
services: Vec::new(),
221-
availability,
222-
conns: Counter::new(config.max_concurrent_connections),
223-
factories,
224-
state: Default::default(),
225-
shutdown_timeout: config.shutdown_timeout,
226-
};
227-
228-
let fut = wrk
229-
.factories
218+
let fut = factories
230219
.iter()
231220
.enumerate()
232221
.map(|(idx, factory)| {
@@ -239,29 +228,44 @@ impl ServerWorker {
239228
})
240229
.collect::<Vec<_>>();
241230

242-
// a second spawn to make sure worker future runs as non boxed future.
243-
// As Arbiter::spawn would box the future before send it to arbiter.
231+
// a second spawn to run !Send future tasks.
244232
spawn(async move {
245-
let res: Result<Vec<_>, _> = join_all(fut).await.into_iter().collect();
246-
match res {
247-
Ok(services) => {
248-
for item in services {
249-
for (factory, token, service) in item {
250-
assert_eq!(token.0, wrk.services.len());
251-
wrk.services.push(WorkerService {
252-
factory,
253-
service,
254-
status: WorkerServiceStatus::Unavailable,
255-
});
256-
}
257-
}
258-
}
233+
let res = join_all(fut)
234+
.await
235+
.into_iter()
236+
.collect::<Result<Vec<_>, _>>();
237+
let services = match res {
238+
Ok(res) => res
239+
.into_iter()
240+
.flatten()
241+
.fold(Vec::new(), |mut services, (factory, token, service)| {
242+
assert_eq!(token.0, services.len());
243+
services.push(WorkerService {
244+
factory,
245+
service,
246+
status: WorkerServiceStatus::Unavailable,
247+
});
248+
services
249+
})
250+
.into_boxed_slice(),
259251
Err(e) => {
260252
error!("Can not start worker: {:?}", e);
261253
Arbiter::current().stop();
254+
return;
262255
}
263-
}
264-
wrk.await
256+
};
257+
258+
// a third spawn to make sure ServerWorker runs as non boxed future.
259+
spawn(ServerWorker {
260+
rx,
261+
rx2,
262+
services,
263+
availability,
264+
conns: Counter::new(config.max_concurrent_connections),
265+
factories: factories.into_boxed_slice(),
266+
state: Default::default(),
267+
shutdown_timeout: config.shutdown_timeout,
268+
});
265269
});
266270
});
267271

0 commit comments

Comments
 (0)