Skip to content

Commit daaea60

Browse files
amphihertrste
authored andcommitted
vmm: api: make HTTP Server (API) multithreaded
This is a pre-requisite to allow multiple connections simultaneously, such as: - start migration (blocking) - query migration stats On-behalf-of: SAP [email protected] Signed-off-by: Sebastian Eydam <[email protected]>
1 parent 8895cd7 commit daaea60

File tree

2 files changed

+225
-23
lines changed

2 files changed

+225
-23
lines changed

vmm/src/api/http/mod.rs

Lines changed: 224 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,25 @@
66
use std::collections::BTreeMap;
77
use std::error::Error;
88
use std::fs::File;
9+
use std::os::fd::AsRawFd;
910
use std::os::unix::io::{IntoRawFd, RawFd};
1011
use std::os::unix::net::UnixListener;
1112
use std::panic::AssertUnwindSafe;
1213
use std::path::PathBuf;
13-
use std::sync::LazyLock;
14-
use std::sync::mpsc::Sender;
14+
use std::sync::mpsc::{Receiver, Sender, channel, sync_channel};
15+
use std::sync::{Arc, LazyLock, Mutex};
1516
use std::thread;
1617

1718
use hypervisor::HypervisorType;
18-
use log::error;
19+
use log::{debug, error};
1920
use micro_http::{
20-
Body, HttpServer, MediaType, Method, Request, Response, ServerError, StatusCode, Version,
21+
Body, HttpServer, MediaType, Method, Request, Response, ServerError, ServerRequest,
22+
ServerResponse, StatusCode, Version,
2123
};
2224
use seccompiler::{SeccompAction, apply_filter};
2325
use serde_json::Error as SerdeError;
2426
use thiserror::Error;
27+
use vmm_sys_util::epoll::{ControlOperation, Epoll, EpollEvent, EventSet};
2528
use vmm_sys_util::eventfd::EventFd;
2629

2730
use self::http_endpoint::{VmActionHandler, VmCreate, VmInfo, VmmPing, VmmShutdown};
@@ -316,10 +319,152 @@ fn handle_http_request(
316319
response
317320
}
318321

322+
/// Keeps track of the worker threads, and the resources needed to interact
323+
/// with them.
324+
#[derive(Debug)]
325+
struct HttpWorkerThreads {
326+
// The worker threads themselves.
327+
threads: Vec<thread::JoinHandle<Result<()>>>,
328+
// An MPSC channel to send server requests to the workers. We put it into
329+
// an option so we can easily drop it in the destructor.
330+
request_tx: Option<Sender<ServerRequest>>,
331+
// An MPSC channel that the workers use to send responses to the HTTP
332+
// server thread.
333+
response_rx: Receiver<ServerResponse>,
334+
// Workers signal this eventfd when they have a response for the HTTP
335+
// server thread.
336+
response_event: EventFd,
337+
}
338+
339+
impl HttpWorkerThreads {
340+
fn new(
341+
thread_count: usize,
342+
api_notifier: &EventFd,
343+
api_sender: &Sender<ApiRequest>,
344+
seccomp_action: &SeccompAction,
345+
hypervisor_type: HypervisorType,
346+
landlock_enable: bool,
347+
exit_evt: &EventFd,
348+
) -> Result<Self> {
349+
let response_event = EventFd::new(libc::EFD_NONBLOCK).map_err(VmmError::EventFdCreate)?;
350+
let (response_tx, response_rx) = sync_channel::<ServerResponse>(thread_count);
351+
352+
let mut threads = Vec::new();
353+
let (request_tx, request_rx) = channel::<ServerRequest>();
354+
355+
let request_rx = Arc::new(Mutex::new(request_rx));
356+
357+
// We use the same seccomp filter that we already use for the HTTP server thread.
358+
let api_seccomp_filter =
359+
get_seccomp_filter(seccomp_action, Thread::HttpApi, hypervisor_type)
360+
.map_err(VmmError::CreateSeccompFilter)?;
361+
362+
for n in 0..thread_count {
363+
let response_event = response_event.try_clone().map_err(VmmError::EventFdClone)?;
364+
365+
let response_tx = response_tx.clone();
366+
let request_rx = request_rx.clone();
367+
368+
let api_notifier = api_notifier.try_clone().map_err(VmmError::EventFdClone)?;
369+
let api_sender = api_sender.clone();
370+
371+
let api_seccomp_filter = api_seccomp_filter.clone();
372+
let exit_evt = exit_evt.try_clone().map_err(VmmError::EventFdClone)?;
373+
374+
let thread = thread::Builder::new()
375+
.name(format!("http-worker-{n}").to_string())
376+
.spawn(move || {
377+
debug!("Spawned HTTP worker thread with id {n}",);
378+
if !api_seccomp_filter.is_empty() {
379+
apply_filter(&api_seccomp_filter)
380+
.map_err(VmmError::ApplySeccompFilter)
381+
.map_err(|e| {
382+
error!("Error applying seccomp filter: {e:?}");
383+
exit_evt.write(1).ok();
384+
e
385+
})?;
386+
}
387+
388+
if landlock_enable {
389+
Landlock::new()
390+
.map_err(VmmError::CreateLandlock)?
391+
.restrict_self()
392+
.map_err(VmmError::ApplyLandlock)
393+
.map_err(|e| {
394+
error!("Error applying landlock to http-worker thread: {e:?}");
395+
exit_evt.write(1).ok();
396+
e
397+
})?;
398+
}
399+
400+
std::panic::catch_unwind(AssertUnwindSafe(move || {
401+
let id = n;
402+
loop {
403+
let request = request_rx.lock().unwrap().recv();
404+
match request {
405+
Ok(msg) => {
406+
// Process the server request
407+
let response = msg.process(|request| {
408+
handle_http_request(request, &api_notifier, &api_sender)
409+
});
410+
411+
// Send the response to the HTTP server thread together with this
412+
// threads id.
413+
if let Err(e) = response_tx.send(response) {
414+
error!(
415+
"HTTP worker thread {id}: error sending response {e}"
416+
);
417+
break;
418+
}
419+
420+
// Notify the HTTP server thread.
421+
response_event.write(1).ok();
422+
}
423+
Err(e) => {
424+
error!("HTTP worker thread {id}: error receiving request {e}");
425+
break;
426+
}
427+
}
428+
}
429+
}))
430+
.map_err(|_| {
431+
error!("http-worker thread {n} panicked");
432+
exit_evt.write(1).ok()
433+
})
434+
.ok();
435+
436+
Ok(())
437+
})
438+
.map_err(VmmError::HttpThreadSpawn)?;
439+
440+
threads.push(thread);
441+
}
442+
443+
Ok(Self {
444+
threads,
445+
request_tx: Some(request_tx),
446+
response_rx,
447+
response_event,
448+
})
449+
}
450+
}
451+
452+
impl Drop for HttpWorkerThreads {
453+
fn drop(&mut self) {
454+
// Dropping the Sender side of the request channels to throw the worker
455+
// threads out of their loops.
456+
drop(self.request_tx.take());
457+
// Now we can join each thread.
458+
self.threads
459+
.drain(..)
460+
.for_each(|thread| thread.join().unwrap().unwrap());
461+
}
462+
}
463+
319464
fn start_http_thread(
320465
mut server: HttpServer,
321-
api_notifier: EventFd,
322-
api_sender: Sender<ApiRequest>,
466+
api_notifier: &EventFd,
467+
api_sender: &Sender<ApiRequest>,
323468
seccomp_action: &SeccompAction,
324469
exit_evt: EventFd,
325470
hypervisor_type: HypervisorType,
@@ -336,6 +481,42 @@ fn start_http_thread(
336481
.add_kill_switch(api_shutdown_fd_clone)
337482
.map_err(VmmError::CreateApiServer)?;
338483

484+
// We use the epoll mechanism to parallelize this. The epoll tokens are
485+
// attached when registering the FDs with epoll. That way we can later
486+
// check why we were notified.
487+
const HTTP_EPOLL_TOKEN: u64 = 1;
488+
const WORKER_EPOLL_TOKEN: u64 = 2;
489+
490+
// The epoll instance our HTTP server thread will wait on.
491+
let outer_epoll = Epoll::new().unwrap();
492+
let worker_threads = HttpWorkerThreads::new(
493+
2,
494+
api_notifier,
495+
api_sender,
496+
seccomp_action,
497+
hypervisor_type,
498+
landlock_enable,
499+
&exit_evt,
500+
)?;
501+
502+
// Register the fd that the worker threads will signal.
503+
outer_epoll
504+
.ctl(
505+
ControlOperation::Add,
506+
worker_threads.response_event.as_raw_fd(),
507+
EpollEvent::new(EventSet::IN, WORKER_EPOLL_TOKEN),
508+
)
509+
.unwrap();
510+
511+
// Register the HttpServer's fd.
512+
outer_epoll
513+
.ctl(
514+
ControlOperation::Add,
515+
server.epoll().as_raw_fd(),
516+
EpollEvent::new(EventSet::IN, HTTP_EPOLL_TOKEN),
517+
)
518+
.unwrap();
519+
339520
let thread = thread::Builder::new()
340521
.name("http-server".to_string())
341522
.spawn(move || {
@@ -363,24 +544,42 @@ fn start_http_thread(
363544
}
364545

365546
std::panic::catch_unwind(AssertUnwindSafe(move || {
547+
let mut events = vec![EpollEvent::default(); 32];
366548
server.start_server().unwrap();
549+
367550
loop {
368-
match server.requests() {
369-
Ok(request_vec) => {
370-
for server_request in request_vec {
371-
if let Err(e) = server.respond(server_request.process(|request| {
372-
handle_http_request(request, &api_notifier, &api_sender)
373-
})) {
551+
let n = outer_epoll.wait(-1, &mut events).unwrap();
552+
for ev in events.iter().take(n) {
553+
match ev.data() {
554+
HTTP_EPOLL_TOKEN => {
555+
// The HttpServer got a request, handle that.
556+
match server.requests() {
557+
Ok(request_vec) => {
558+
for server_request in request_vec {
559+
worker_threads.request_tx.as_ref().unwrap().send(server_request).unwrap();
560+
}
561+
}
562+
Err(ServerError::ShutdownEvent) => {
563+
server.flush_outgoing_writes();
564+
return;
565+
}
566+
Err(e) => {
567+
error!(
568+
"HTTP server error on retrieving incoming request. Error: {e}"
569+
);
570+
}
571+
}
572+
}
573+
WORKER_EPOLL_TOKEN => {
574+
// One of the worker threads has a response.
575+
// We clear the eventfd first.
576+
let _ = worker_threads.response_event.read().unwrap();
577+
let response = worker_threads.response_rx.recv().unwrap();
578+
if let Err(e) = server.respond(response){
374579
error!("HTTP server error on response: {e}");
375580
}
376581
}
377-
}
378-
Err(ServerError::ShutdownEvent) => {
379-
server.flush_outgoing_writes();
380-
return;
381-
}
382-
Err(e) => {
383-
error!("HTTP server error on retrieving incoming request. Error: {e}");
582+
_ => { }
384583
}
385584
}
386585
}
@@ -398,6 +597,7 @@ fn start_http_thread(
398597
Ok((thread, api_shutdown_fd))
399598
}
400599

600+
#[allow(clippy::needless_pass_by_value)]
401601
pub fn start_http_path_thread(
402602
path: &str,
403603
api_notifier: EventFd,
@@ -415,15 +615,16 @@ pub fn start_http_path_thread(
415615

416616
start_http_thread(
417617
server,
418-
api_notifier,
419-
api_sender,
618+
&api_notifier,
619+
&api_sender,
420620
seccomp_action,
421621
exit_evt,
422622
hypervisor_type,
423623
landlock_enable,
424624
)
425625
}
426626

627+
#[allow(clippy::needless_pass_by_value)]
427628
pub fn start_http_fd_thread(
428629
fd: RawFd,
429630
api_notifier: EventFd,
@@ -437,8 +638,8 @@ pub fn start_http_fd_thread(
437638
let server = unsafe { HttpServer::new_from_fd(fd) }.map_err(VmmError::CreateApiServer)?;
438639
start_http_thread(
439640
server,
440-
api_notifier,
441-
api_sender,
641+
&api_notifier,
642+
&api_sender,
442643
seccomp_action,
443644
exit_evt,
444645
hypervisor_type,

vmm/src/seccomp_filters.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -866,6 +866,7 @@ fn http_api_thread_rules() -> Result<Vec<(i64, Vec<SeccompRule>)>, BackendError>
866866
(libc::SYS_rt_sigprocmask, vec![]),
867867
(libc::SYS_getcwd, vec![]),
868868
(libc::SYS_clock_nanosleep, vec![]),
869+
(libc::SYS_read, vec![]),
869870
])
870871
}
871872

0 commit comments

Comments
 (0)