Skip to content

Commit 023b27f

Browse files
committed
test(uffd_utils): buffered reading from UDS socket
Example UFFD handlers are now reading from the UDS socket in a buffered way. This is to make it possible to read messages of different types in future commits to be able to handle fault request messages from Firecracker if Secret Freedom is enabled. Signed-off-by: Nikita Kalyazin <[email protected]>
1 parent 72ab63f commit 023b27f

File tree

1 file changed

+100
-24
lines changed

1 file changed

+100
-24
lines changed

src/firecracker/examples/uffd/uffd_utils.rs

Lines changed: 100 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -153,17 +153,17 @@ impl UffdHandler {
153153
panic!("Could not get UFFD and mappings after 5 retries");
154154
}
155155

156-
pub fn from_unix_stream(stream: &UnixStream, backing_buffer: *const u8, size: usize) -> Self {
157-
let (body, file) = Self::get_mappings_and_file(stream);
158-
let mappings =
159-
serde_json::from_str::<Vec<GuestRegionUffdMapping>>(&body).unwrap_or_else(|_| {
160-
panic!("Cannot deserialize memory mappings. Received body: {body}")
161-
});
156+
pub fn from_mappings(
157+
mappings: Vec<GuestRegionUffdMapping>,
158+
uffd: File,
159+
backing_buffer: *const u8,
160+
size: usize,
161+
) -> Self {
162162
let memsize: usize = mappings.iter().map(|r| r.size).sum();
163163
// Page size is the same for all memory regions, so just grab the first one
164164
let first_mapping = mappings.first().unwrap_or_else(|| {
165165
panic!(
166-
"Cannot get the first mapping. Mappings size is {}. Received body: {body}",
166+
"Cannot get the first mapping. Mappings size is {}.",
167167
mappings.len()
168168
)
169169
});
@@ -173,7 +173,7 @@ impl UffdHandler {
173173
assert_eq!(memsize, size);
174174
assert!(page_size.is_power_of_two());
175175

176-
let uffd = unsafe { Uffd::from_raw_fd(file.into_raw_fd()) };
176+
let uffd = unsafe { Uffd::from_raw_fd(uffd.into_raw_fd()) };
177177

178178
Self {
179179
mem_regions: mappings,
@@ -375,22 +375,98 @@ impl Runtime {
375375
if pollfds[i].revents & libc::POLLIN != 0 {
376376
nready -= 1;
377377
if pollfds[i].fd == self.stream.as_raw_fd() {
378-
// Handle new uffd from stream
379-
let handler = UffdHandler::from_unix_stream(
380-
&self.stream,
381-
self.backing_memory,
382-
self.backing_memory_size,
383-
);
384-
pollfds.push(libc::pollfd {
385-
fd: handler.uffd.as_raw_fd(),
386-
events: libc::POLLIN,
387-
revents: 0,
388-
});
389-
self.uffds.insert(handler.uffd.as_raw_fd(), handler);
390-
391-
// If connection is closed, we can skip the socket from being polled.
392-
if pollfds[i].revents & (libc::POLLRDHUP | libc::POLLHUP) != 0 {
393-
skip_stream = 1;
378+
const BUFFER_SIZE: usize = 4096;
379+
380+
let mut buffer = [0u8; BUFFER_SIZE];
381+
let mut fds = [0; 1];
382+
let mut current_pos = 0;
383+
384+
loop {
385+
// Read more data into the buffer if there's space
386+
let mut iov = [libc::iovec {
387+
iov_base: (buffer[current_pos..]).as_mut_ptr() as *mut libc::c_void,
388+
iov_len: buffer.len(),
389+
}];
390+
391+
if current_pos < BUFFER_SIZE {
392+
let ret = unsafe { self.stream.recv_with_fds(&mut iov, &mut fds) };
393+
match ret {
394+
Ok((0, _)) => break,
395+
Ok((n, 1)) => current_pos += n,
396+
Ok((n, 0)) | Ok((_, n)) => panic!("Wrong number of fds: {}", n),
397+
Err(e) if e.errno() == libc::EAGAIN => continue,
398+
Err(e) => panic!("Read error: {}", e),
399+
}
400+
}
401+
402+
let mut parser =
403+
serde_json::Deserializer::from_slice(&buffer[..current_pos])
404+
.into_iter::<UffdMsgFromFirecracker>();
405+
let mut total_consumed = 0;
406+
let mut needs_more = false;
407+
408+
while let Some(result) = parser.next() {
409+
match result {
410+
Ok(UffdMsgFromFirecracker::Mappings(mappings)) => {
411+
// Handle new uffd from stream
412+
let handler = UffdHandler::from_mappings(
413+
mappings,
414+
unsafe { File::from_raw_fd(fds[0]) },
415+
self.backing_memory,
416+
self.backing_memory_size,
417+
);
418+
419+
let fd = handler.uffd.as_raw_fd();
420+
421+
pollfds.push(libc::pollfd {
422+
fd,
423+
events: libc::POLLIN,
424+
revents: 0,
425+
});
426+
self.uffds.insert(fd, handler);
427+
428+
// If connection is closed, we can skip the socket from
429+
// being polled.
430+
if pollfds[i].revents & (libc::POLLRDHUP | libc::POLLHUP)
431+
!= 0
432+
{
433+
skip_stream = 1;
434+
}
435+
436+
total_consumed = parser.byte_offset();
437+
}
438+
Ok(UffdMsgFromFirecracker::FaultReq(ref _fault_request)) => {
439+
unimplemented!(
440+
"Received unsupported message from Firecracker: {:?}",
441+
result
442+
)
443+
}
444+
Err(e) if e.is_eof() => {
445+
needs_more = true;
446+
break;
447+
}
448+
Err(e) => {
449+
println!(
450+
"Buffer content: {:?}",
451+
std::str::from_utf8(&buffer[..current_pos])
452+
);
453+
panic!("Invalid JSON: {}", e);
454+
}
455+
}
456+
}
457+
458+
if total_consumed > 0 {
459+
buffer.copy_within(total_consumed..current_pos, 0);
460+
current_pos -= total_consumed;
461+
}
462+
463+
if needs_more {
464+
continue;
465+
}
466+
467+
if current_pos == 0 {
468+
break;
469+
}
394470
}
395471
} else {
396472
// Handle one of uffd page faults

0 commit comments

Comments
 (0)