@@ -19,7 +19,7 @@ const AUTO_CONVERGE_MAX: u8 = 99;
1919
2020use std:: collections:: HashMap ;
2121use std:: fs:: File ;
22- use std:: io:: { Read , Write , stdout} ;
22+ use std:: io:: { ErrorKind , Read , Write , stdout} ;
2323use std:: net:: { TcpListener , TcpStream } ;
2424use std:: os:: fd:: { AsFd , BorrowedFd } ;
2525use std:: os:: unix:: io:: { AsRawFd , FromRawFd , RawFd } ;
@@ -289,6 +289,15 @@ impl Write for SocketStream {
289289 }
290290}
291291
292+ impl AsFd for SocketStream {
293+ fn as_fd ( & self ) -> BorrowedFd < ' _ > {
294+ match self {
295+ SocketStream :: Unix ( s) => s. as_fd ( ) ,
296+ SocketStream :: Tcp ( s) => s. as_fd ( ) ,
297+ }
298+ }
299+ }
300+
292301impl AsRawFd for SocketStream {
293302 fn as_raw_fd ( & self ) -> RawFd {
294303 match self {
@@ -891,6 +900,53 @@ impl ReceiveAdditionalConnections {
891900 Ok ( ( event_fd. try_clone ( ) ?, event_fd) )
892901 }
893902
903+ /// Handle read requests. This function returns until the abort_event_fd is
904+ /// triggered or the connection is closed or encountered an error.
905+ fn abortable_read_requests (
906+ socket : & mut SocketStream ,
907+ abort_event_fd : & EventFd ,
908+ guest_memory : & GuestMemoryAtomic < GuestMemoryMmap > ,
909+ ) -> std:: result:: Result < ( ) , MigratableError > {
910+ loop {
911+ // Implementation of abortable_read_request
912+ if !wait_for_readable ( socket, abort_event_fd) . map_err ( |e| {
913+ MigratableError :: MigrateReceive ( anyhow ! ( "Failed to poll descriptors: {e}" ) )
914+ } ) ? {
915+ info ! ( "Got signal to tear down connection." ) ;
916+ return Ok ( ( ) ) ;
917+ }
918+
919+ // TODO We only check whether we should abort when waiting for a new
920+ // request. If the sender just stops sending data mid-request, we
921+ // should still be abortable, but we are not... In this case, we
922+ // will hang forever. But given that the sender is also in charge of
923+ // driving the migration to completion, this is not a major concern.
924+ // In the long run, it would be preferable to move I/O to
925+ // asynchronous tasks to be able to handle aborts more gracefully.
926+
927+ let req = match Request :: read_from ( socket) {
928+ Ok ( req) => req,
929+ Err ( MigratableError :: MigrateSocket ( io_error) )
930+ if io_error. kind ( ) == ErrorKind :: UnexpectedEof =>
931+ {
932+ debug ! ( "Connection closed by peer" ) ;
933+ return Ok ( ( ) ) ;
934+ }
935+ Err ( e) => return Err ( e) ,
936+ } ;
937+
938+ if req. command ( ) != Command :: Memory {
939+ return Err ( MigratableError :: MigrateReceive ( anyhow ! (
940+ "Dropping connection. Only Memory commands are allowed on additional connections, but got {:?}" ,
941+ req. command( )
942+ ) ) ) ;
943+ }
944+
945+ vm_receive_memory ( & req, socket, guest_memory) ?;
946+ Response :: ok ( ) . write_to ( socket) ?;
947+ }
948+ }
949+
894950 /// Starts a thread to accept incoming connections and handle them. These
895951 /// additional connections are used to receive additional memory regions
896952 /// during VM migration.
@@ -906,33 +962,18 @@ impl ReceiveAdditionalConnections {
906962 let mut threads: Vec < std:: thread:: JoinHandle < ( ) > > = Vec :: new ( ) ;
907963 while let Ok ( Some ( mut socket) ) = listener. abortable_accept ( & terminate_fd) {
908964 let guest_memory = guest_memory. clone ( ) ;
965+ let terminate_fd = terminate_fd. try_clone ( ) . unwrap ( ) ;
909966
910967 // We handle errors locally and log them. Passing them along is
911968 // painful with little value.
912969 threads. push ( std:: thread:: spawn ( move || {
913- loop {
914- let req = match Request :: read_from ( & mut socket) {
915- Ok ( req) => req,
916- Err ( e) => {
917- error ! ( "Failed to read request: {}" , e) ;
918- break ;
919- }
920- } ;
921-
922- if req. command ( ) != Command :: Memory {
923- error ! ( "Dropping connection. Only Memory commands are allowed on additional connections" ) ;
924- break ;
925- }
926-
927- if let Err ( e) = vm_receive_memory ( & req, & mut socket, & guest_memory) {
928- error ! ( "Failed to receive memory: {}" , e) ;
929- break ;
930- }
931-
932- if let Err ( e) = Response :: ok ( ) . write_to ( & mut socket) {
933- error ! ( "Failed to send response: {}" , e) ;
934- break ;
935- }
970+ if let Err ( e) =
971+ Self :: abortable_read_requests ( & mut socket, & terminate_fd, & guest_memory)
972+ {
973+ error ! (
974+ "Failed to read more requests on additional receive connection: {}" ,
975+ e
976+ ) ;
936977 }
937978 } ) ) ;
938979 }
0 commit comments