Skip to content

Commit ebca0e5

Browse files
committed
vmm: tear down memory receive threads
... after the VM migration finishes. On-behalf-of: SAP julian.stecklina@sap.com Signed-off-by: Julian Stecklina <julian.stecklina@cyberus-technology.de>
1 parent 220717e commit ebca0e5

File tree

1 file changed

+65
-24
lines changed

1 file changed

+65
-24
lines changed

vmm/src/lib.rs

Lines changed: 65 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ const AUTO_CONVERGE_MAX: u8 = 99;
1919

2020
use std::collections::HashMap;
2121
use std::fs::File;
22-
use std::io::{Read, Write, stdout};
22+
use std::io::{ErrorKind, Read, Write, stdout};
2323
use std::net::{TcpListener, TcpStream};
2424
use std::os::fd::{AsFd, BorrowedFd};
2525
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
@@ -289,6 +289,15 @@ impl Write for SocketStream {
289289
}
290290
}
291291

292+
impl AsFd for SocketStream {
293+
fn as_fd(&self) -> BorrowedFd<'_> {
294+
match self {
295+
SocketStream::Unix(s) => s.as_fd(),
296+
SocketStream::Tcp(s) => s.as_fd(),
297+
}
298+
}
299+
}
300+
292301
impl AsRawFd for SocketStream {
293302
fn as_raw_fd(&self) -> RawFd {
294303
match self {
@@ -891,6 +900,53 @@ impl ReceiveAdditionalConnections {
891900
Ok((event_fd.try_clone()?, event_fd))
892901
}
893902

903+
/// Handle read requests. This function returns until the abort_event_fd is
904+
/// triggered or the connection is closed or encountered an error.
905+
fn abortable_read_requests(
906+
socket: &mut SocketStream,
907+
abort_event_fd: &EventFd,
908+
guest_memory: &GuestMemoryAtomic<GuestMemoryMmap>,
909+
) -> std::result::Result<(), MigratableError> {
910+
loop {
911+
// Implementation of abortable_read_request
912+
if !wait_for_readable(socket, abort_event_fd).map_err(|e| {
913+
MigratableError::MigrateReceive(anyhow!("Failed to poll descriptors: {e}"))
914+
})? {
915+
info!("Got signal to tear down connection.");
916+
return Ok(());
917+
}
918+
919+
// TODO We only check whether we should abort when waiting for a new
920+
// request. If the sender just stops sending data mid-request, we
921+
// should still be abortable, but we are not... In this case, we
922+
// will hang forever. But given that the sender is also in charge of
923+
// driving the migration to completion, this is not a major concern.
924+
// In the long run, it would be preferable to move I/O to
925+
// asynchronous tasks to be able to handle aborts more gracefully.
926+
927+
let req = match Request::read_from(socket) {
928+
Ok(req) => req,
929+
Err(MigratableError::MigrateSocket(io_error))
930+
if io_error.kind() == ErrorKind::UnexpectedEof =>
931+
{
932+
debug!("Connection closed by peer");
933+
return Ok(());
934+
}
935+
Err(e) => return Err(e),
936+
};
937+
938+
if req.command() != Command::Memory {
939+
return Err(MigratableError::MigrateReceive(anyhow!(
940+
"Dropping connection. Only Memory commands are allowed on additional connections, but got {:?}",
941+
req.command()
942+
)));
943+
}
944+
945+
vm_receive_memory(&req, socket, guest_memory)?;
946+
Response::ok().write_to(socket)?;
947+
}
948+
}
949+
894950
/// Starts a thread to accept incoming connections and handle them. These
895951
/// additional connections are used to receive additional memory regions
896952
/// during VM migration.
@@ -906,33 +962,18 @@ impl ReceiveAdditionalConnections {
906962
let mut threads: Vec<std::thread::JoinHandle<()>> = Vec::new();
907963
while let Ok(Some(mut socket)) = listener.abortable_accept(&terminate_fd) {
908964
let guest_memory = guest_memory.clone();
965+
let terminate_fd = terminate_fd.try_clone().unwrap();
909966

910967
// We handle errors locally and log them. Passing them along is
911968
// painful with little value.
912969
threads.push(std::thread::spawn(move || {
913-
loop {
914-
let req = match Request::read_from(&mut socket) {
915-
Ok(req) => req,
916-
Err(e) => {
917-
error!("Failed to read request: {}", e);
918-
break;
919-
}
920-
};
921-
922-
if req.command() != Command::Memory {
923-
error!("Dropping connection. Only Memory commands are allowed on additional connections");
924-
break;
925-
}
926-
927-
if let Err(e) = vm_receive_memory(&req, &mut socket, &guest_memory) {
928-
error!("Failed to receive memory: {}", e);
929-
break;
930-
}
931-
932-
if let Err(e) = Response::ok().write_to(&mut socket) {
933-
error!("Failed to send response: {}", e);
934-
break;
935-
}
970+
if let Err(e) =
971+
Self::abortable_read_requests(&mut socket, &terminate_fd, &guest_memory)
972+
{
973+
error!(
974+
"Failed to read more requests on additional receive connection: {}",
975+
e
976+
);
936977
}
937978
}));
938979
}

0 commit comments

Comments
 (0)