@@ -155,17 +155,17 @@ impl UffdHandler {
155155 panic ! ( "Could not get UFFD and mappings after 5 retries" ) ;
156156 }
157157
158- pub fn from_unix_stream ( stream : & UnixStream , backing_buffer : * const u8 , size : usize ) -> Self {
159- let ( body , file ) = Self :: get_mappings_and_file ( stream ) ;
160- let mappings =
161- serde_json :: from_str :: < Vec < GuestRegionUffdMapping > > ( & body ) . unwrap_or_else ( |_| {
162- panic ! ( "Cannot deserialize memory mappings. Received body: {body}" )
163- } ) ;
158+ pub fn from_mappings (
159+ mappings : Vec < GuestRegionUffdMapping > ,
160+ uffd : File ,
161+ backing_buffer : * const u8 ,
162+ size : usize ,
163+ ) -> Self {
164164 let memsize: usize = mappings. iter ( ) . map ( |r| r. size ) . sum ( ) ;
165165 // Page size is the same for all memory regions, so just grab the first one
166166 let first_mapping = mappings. first ( ) . unwrap_or_else ( || {
167167 panic ! (
168- "Cannot get the first mapping. Mappings size is {}. Received body: {body} " ,
168+ "Cannot get the first mapping. Mappings size is {}." ,
169169 mappings. len( )
170170 )
171171 } ) ;
@@ -175,7 +175,7 @@ impl UffdHandler {
175175 assert_eq ! ( memsize, size) ;
176176 assert ! ( page_size. is_power_of_two( ) ) ;
177177
178- let uffd = unsafe { Uffd :: from_raw_fd ( file . into_raw_fd ( ) ) } ;
178+ let uffd = unsafe { Uffd :: from_raw_fd ( uffd . into_raw_fd ( ) ) } ;
179179
180180 Self {
181181 mem_regions : mappings,
@@ -377,22 +377,110 @@ impl Runtime {
377377 if pollfds[ i] . revents & libc:: POLLIN != 0 {
378378 nready -= 1 ;
379379 if pollfds[ i] . fd == self . stream . as_raw_fd ( ) {
380- // Handle new uffd from stream
381- let handler = UffdHandler :: from_unix_stream (
382- & self . stream ,
383- self . backing_memory ,
384- self . backing_memory_size ,
385- ) ;
386- pollfds. push ( libc:: pollfd {
387- fd : handler. uffd . as_raw_fd ( ) ,
388- events : libc:: POLLIN ,
389- revents : 0 ,
390- } ) ;
391- self . uffds . insert ( handler. uffd . as_raw_fd ( ) , handler) ;
392-
393- // If connection is closed, we can skip the socket from being polled.
394- if pollfds[ i] . revents & ( libc:: POLLRDHUP | libc:: POLLHUP ) != 0 {
395- skip_stream = 1 ;
380+ const BUFFER_SIZE : usize = 4096 ;
381+
382+ let mut buffer = [ 0u8 ; BUFFER_SIZE ] ;
383+ let mut fds = [ 0 ; 1 ] ;
384+ let mut current_pos = 0 ;
385+ let mut exit_loop = false ;
386+
387+ loop {
388+ // Read more data into the buffer if there's space
389+ let mut iov = [ libc:: iovec {
390+ iov_base : ( buffer[ current_pos..] ) . as_mut_ptr ( ) as * mut libc:: c_void ,
391+ iov_len : buffer. len ( ) - current_pos,
392+ } ] ;
393+
394+ if current_pos < BUFFER_SIZE {
395+ let ret = unsafe { self . stream . recv_with_fds ( & mut iov, & mut fds) } ;
396+ match ret {
397+ Ok ( ( 0 , _) ) => break ,
398+ Ok ( ( n, 1 ) ) => current_pos += n,
399+ Ok ( ( n, 0 ) ) | Ok ( ( _, n) ) => panic ! ( "Wrong number of fds: {}" , n) ,
400+ Err ( e) if e. errno ( ) == libc:: EAGAIN => {
401+ if exit_loop {
402+ break ;
403+ }
404+ continue ;
405+ }
406+ Err ( e) => panic ! ( "Read error: {}" , e) ,
407+ }
408+
409+ exit_loop = false ;
410+ }
411+
412+ let mut parser =
413+ serde_json:: Deserializer :: from_slice ( & buffer[ ..current_pos] )
414+ . into_iter :: < UffdMsgFromFirecracker > ( ) ;
415+ let mut total_consumed = 0 ;
416+ let mut needs_more = false ;
417+
418+ while let Some ( result) = parser. next ( ) {
419+ match result {
420+ Ok ( UffdMsgFromFirecracker :: Mappings ( mappings) ) => {
421+ // Handle new uffd from stream
422+ let handler = UffdHandler :: from_mappings (
423+ mappings,
424+ unsafe { File :: from_raw_fd ( fds[ 0 ] ) } ,
425+ self . backing_memory ,
426+ self . backing_memory_size ,
427+ ) ;
428+
429+ let fd = handler. uffd . as_raw_fd ( ) ;
430+
431+ pollfds. push ( libc:: pollfd {
432+ fd,
433+ events : libc:: POLLIN ,
434+ revents : 0 ,
435+ } ) ;
436+ self . uffds . insert ( fd, handler) ;
437+
438+ // If connection is closed, we can skip the socket from
439+ // being polled.
440+ if pollfds[ i] . revents & ( libc:: POLLRDHUP | libc:: POLLHUP )
441+ != 0
442+ {
443+ skip_stream = 1 ;
444+ }
445+
446+ total_consumed = parser. byte_offset ( ) ;
447+ }
448+ Ok ( UffdMsgFromFirecracker :: FaultReq ( ref _fault_request) ) => {
449+ unimplemented ! (
450+ "Received unsupported message from Firecracker: {:?}" ,
451+ result
452+ )
453+ }
454+ Err ( e) if e. is_eof ( ) => {
455+ needs_more = true ;
456+ break ;
457+ }
458+ Err ( e) => {
459+ println ! (
460+ "Buffer content: {:?}" ,
461+ std:: str :: from_utf8( & buffer[ ..current_pos] )
462+ ) ;
463+ panic ! ( "Invalid JSON: {}" , e) ;
464+ }
465+ }
466+ }
467+
468+ if total_consumed > 0 {
469+ buffer. copy_within ( total_consumed..current_pos, 0 ) ;
470+ current_pos -= total_consumed;
471+ }
472+
473+ if needs_more {
474+ continue ;
475+ }
476+
477+ // We consumed all data in the buffer, but the socket may have remaining
478+ // unread data so we attempt to read from it
479+ // and exit the loop only if we confirm that nothing is in
480+ // there.
481+ if current_pos == 0 {
482+ exit_loop = true ;
483+ }
396484 }
397485 } else {
398486 // Handle one of uffd page faults
0 commit comments