From 6ae3384900aedbf24d3dd25ef8605aa6075f82c6 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Thu, 9 Jan 2025 17:14:12 +0000 Subject: [PATCH 01/14] server: switch to task-based serial console connections Instead of pushing all serial console websocket connections onto a single task, create one task per websocket connection and add a serial console backend that brokers traffic between the websocket tasks and the underlying character device. Each connection to the console backend has an independent handle that has its own permissions (read-write or read-only) and rules for what to do if a byte from the guest can't immediately be written to a client task (block or disconnect the client). These tools are sufficient to implement the serial console access policies described in RFD 491. This change doesn't build on its own; additional work is needed to hook the new backend up to the server APIs. --- bin/propolis-server/src/lib/serial/backend.rs | 340 ++++++++ bin/propolis-server/src/lib/serial/mod.rs | 733 ++++++++++-------- 2 files changed, 731 insertions(+), 342 deletions(-) create mode 100644 bin/propolis-server/src/lib/serial/backend.rs diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs new file mode 100644 index 000000000..19126348f --- /dev/null +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -0,0 +1,340 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! A backend that provides external clients with access to a Propolis character +//! device and tracks the history of bytes the guest has written to that device. + +use std::{ + collections::BTreeMap, + num::NonZeroUsize, + sync::{Arc, Mutex}, +}; + +use propolis::chardev::{ + pollers::{SinkBuffer, SourceBuffer}, + Sink, Source, +}; +use tokio::{ + select, + sync::{mpsc, oneshot}, +}; + +use super::history_buffer::{HistoryBuffer, SerialHistoryOffset}; + +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct ClientId(u64); + +trait ConsoleDevice: Source + Sink {} +impl ConsoleDevice for T {} + +/// A client's rights when accessing a character backend. +#[derive(Clone, Copy)] +pub(super) enum Permissions { + /// The client may both read and write to the backend. + ReadWrite, + + /// The client may only read from the backend. + ReadOnly, +} + +impl Permissions { + pub(super) fn is_writable(&self) -> bool { + matches!(self, Permissions::ReadWrite) + } +} + +/// Determines what happens when the backend is unable to send a guest byte back +/// to a client because the client's channel is full. +#[derive(Clone, Copy)] +pub(super) enum ReadWaitDiscipline { + /// The backend should block until it can send to this client. + Block, + + /// The backend should close this client's connection. + Close, +} + +/// An individual client of a character backend. +struct Client { + /// Bytes read from the character device should be sent to the client on + /// this channel. + tx: mpsc::Sender, + + /// Determines what happens when the backend wants to send a byte and + /// [`Self::tx`] is full. + read_discipline: ReadWaitDiscipline, +} + +/// A handle held by a client that represents its connection to the backend. +pub(super) struct ClientHandle { + /// The client's ID. + id: ClientId, + + /// A reference to the backend to which the client is connected. + backend: Arc, + + /// The client's backend access rights. + permissions: Permissions, +} + +impl ClientHandle { + pub(super) fn is_writable(&self) -> bool { + self.permissions.is_writable() + } +} + +impl Drop for ClientHandle { + fn drop(&mut self) { + let mut inner = self.backend.inner.lock().unwrap(); + inner.clients.remove(&self.id); + } +} + +impl ClientHandle { + /// Attempts to write the bytes in `buf` to the backend. + /// + /// The backend may buffer some of the written bytes before sending them to + /// its device, so when this function returns, it is not guaranteed that all + /// of the written bytes have actually reached the guest. + /// + /// # Return value + /// + /// - `Ok(bytes)` if the write succeeded; `bytes` is the number of bytes + /// that were written. + /// - `Err(ErrorKind::PermissionDenied)` if this handle does not grant write + /// access to the device. + /// - `Err(ErrorKind::ConnectionAborted)` if this handle's client was + /// previously disconnected from the backend, e.g. because it had a full + /// read channel and was using the close-on-full-channel read discipline. + /// + /// # Cancel safety + /// + /// The future returned by this function is cancel-safe. If it is dropped, + /// it is guaranteed that no bytes were written to the backend or its + /// device. See [`SinkBuffer::write`]. + pub(super) async fn write( + &mut self, + buf: &[u8], + ) -> Result { + if buf.is_empty() { + return Ok(0); + } + + if !matches!(self.permissions, Permissions::ReadWrite) { + return Err(std::io::Error::from( + std::io::ErrorKind::PermissionDenied, + )); + } + + // The client handle may have been invalidated if it used the "close on + // full channel" discipline. If this client is no longer active, return + // an error. + // + // The read dispatcher task doesn't synchronize with writes, so it's + // possible for the client to be invalidated after the lock is dropped. + // (It can't be held while writing to the backend because + // `SinkBuffer::write` is async.) This is OK; since reads and writes + // aren't synchronized to begin with, in any situation like this it was + // equally possible for the write to finish before the client was + // invalidated. + { + let inner = self.backend.inner.lock().unwrap(); + if !inner.clients.contains_key(&self.id) { + return Err(std::io::Error::from( + std::io::ErrorKind::ConnectionAborted, + )); + } + } + + Ok(self + .backend + .sink_buffer + .write(buf, self.backend.sink.as_ref()) + .await + .expect("chardev sink writes always succeed")) + } +} + +/// Character backend state that's protected by a lock. +struct Inner { + /// A history of the bytes read from this backend's device. + buffer: HistoryBuffer, + + /// A table mapping client IDs to clients. + clients: BTreeMap, + + /// The ID to assign to the next client to attach to this backend. + next_client_id: u64, +} + +impl Inner { + fn new(history_size: usize) -> Self { + Self { + buffer: HistoryBuffer::new(history_size), + clients: BTreeMap::new(), + next_client_id: 0, + } + } + + fn next_client_id(&mut self) -> ClientId { + let id = self.next_client_id; + self.next_client_id += 1; + ClientId(id) + } +} + +/// A backend for a Propolis serial console that allows multiple clients to +/// access a single serial device. +pub struct ConsoleBackend { + inner: Arc>, + dev: Arc, + sink: Arc, + sink_buffer: Arc, + done_tx: oneshot::Sender<()>, +} + +impl ConsoleBackend { + pub fn new( + device: Arc, + history_bytes: usize, + ) -> Arc { + const SINK_BUFFER_BYTES: usize = 64; + + let sink_buffer = + SinkBuffer::new(NonZeroUsize::new(SINK_BUFFER_BYTES).unwrap()); + sink_buffer.attach(device.as_ref()); + + let sink = device.clone(); + let source = device.clone(); + let (done_tx, done_rx) = oneshot::channel(); + + let this = Arc::new(Self { + inner: Arc::new(Mutex::new(Inner::new(history_bytes))), + dev: device, + sink, + sink_buffer, + done_tx, + }); + + let inner = this.inner.clone(); + tokio::spawn(async move { + read_task(inner, source, done_rx).await; + }); + + this + } + + /// Attaches a new client to this backend, yielding a handle that the client + /// can use to issue further operations. + /// + /// # Arguments + /// + /// - `read_tx`: A channel to which the backend should send bytes read from + /// its device. + pub(super) fn attach_client( + self: &Arc, + read_tx: mpsc::Sender, + permissions: Permissions, + wait_discipline: ReadWaitDiscipline, + ) -> ClientHandle { + let mut inner = self.inner.lock().unwrap(); + let id = inner.next_client_id(); + let client = Client { tx: read_tx, read_discipline: wait_discipline }; + + inner.clients.insert(id, client); + ClientHandle { id, backend: self.clone(), permissions } + } + + pub fn history_vec( + &self, + byte_offset: SerialHistoryOffset, + max_bytes: Option, + ) -> Result<(Vec, usize), super::history_buffer::Error> { + let inner = self.inner.lock().unwrap(); + inner.buffer.contents_vec(byte_offset, max_bytes) + } + + pub fn bytes_since_start(&self) -> usize { + self.inner.lock().unwrap().buffer.bytes_from_start() + } +} + +impl Drop for ConsoleBackend { + fn drop(&mut self) { + let (tx, _rx) = oneshot::channel(); + let done_tx = std::mem::replace(&mut self.done_tx, tx); + let _ = done_tx.send(()); + } +} + +async fn read_task( + inner: Arc>, + source: Arc, + mut done_rx: oneshot::Receiver<()>, +) { + const READ_BUFFER_SIZE_BYTES: usize = 512; + let buf = SourceBuffer::new(propolis::chardev::pollers::Params { + poll_interval: std::time::Duration::from_millis(10), + poll_miss_thresh: 5, + buf_size: NonZeroUsize::new(READ_BUFFER_SIZE_BYTES).unwrap(), + }); + buf.attach(source.as_ref()); + + let mut bytes = vec![0; READ_BUFFER_SIZE_BYTES]; + loop { + let bytes_read = select! { + biased; + + _ = &mut done_rx => { + return; + } + + res = buf.read(bytes.as_mut_slice(), source.as_ref()) => { + res.unwrap() + } + }; + + let to_send = &bytes[0..bytes_read]; + + struct CapturedClient { + id: ClientId, + tx: mpsc::Sender, + discipline: ReadWaitDiscipline, + disconnect: bool, + } + + let mut clients = { + let guard = inner.lock().unwrap(); + guard + .clients + .iter() + .map(|(id, client)| CapturedClient { + id: *id, + tx: client.tx.clone(), + discipline: client.read_discipline, + disconnect: false, + }) + .collect::>() + }; + + for byte in to_send { + for client in clients.iter_mut() { + client.disconnect = match client.discipline { + ReadWaitDiscipline::Block => { + client.tx.send(*byte).await.is_err() + } + ReadWaitDiscipline::Close => { + client.tx.try_send(*byte).is_err() + } + } + } + } + + let mut guard = inner.lock().unwrap(); + guard.buffer.consume(to_send); + for client in clients.iter().filter(|c| c.disconnect) { + guard.clients.remove(&client.id); + } + } +} diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 82062af73..05ec3ab15 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -4,398 +4,447 @@ //! Routines to expose a connection to an instance's serial port. -use crate::migrate::MigrateError; - -use std::collections::HashMap; -use std::net::SocketAddr; -use std::num::NonZeroUsize; -use std::ops::Range; -use std::sync::Arc; -use std::time::Duration; - -use crate::serial::history_buffer::{HistoryBuffer, SerialHistoryOffset}; -use futures::future::Fuse; -use futures::stream::SplitSink; -use futures::{FutureExt, SinkExt, StreamExt}; -use propolis::chardev::{pollers, Sink, Source}; +use std::{ + collections::{BTreeMap, VecDeque}, + sync::{Arc, Mutex}, +}; + +use backend::ConsoleBackend; +use dropshot::WebsocketConnectionRaw; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; use propolis_api_types::InstanceSerialConsoleControlMessage; -use slog::{info, warn, Logger}; -use thiserror::Error; -use tokio::sync::{mpsc, oneshot, Mutex, RwLock as AsyncRwLock}; -use tokio::task::JoinHandle; -use tokio_tungstenite::tungstenite::protocol::{ - frame::coding::CloseCode, CloseFrame, +use slog::{info, warn}; +use tokio::{ + select, + sync::{mpsc, oneshot}, + task::JoinHandle, +}; +use tokio_tungstenite::{ + tungstenite::{ + protocol::{frame::coding::CloseCode, CloseFrame}, + Message, + }, + WebSocketStream, }; -use tokio_tungstenite::tungstenite::Message; -use tokio_tungstenite::{tungstenite, WebSocketStream}; +mod backend; pub(crate) mod history_buffer; #[usdt::provider(provider = "propolis")] mod probes { - fn serial_close_recv() {} - fn serial_new_ws() {} - fn serial_uart_write(n: usize) {} - fn serial_uart_out() {} - fn serial_uart_read(n: usize) {} - fn serial_inject_uart() {} - fn serial_ws_recv() {} + fn serial_event_done() {} + fn serial_event_read(b: u8) {} + fn serial_event_console_disconnect() {} + fn serial_event_ws_recv(len: usize) {} + fn serial_event_ws_error() {} + fn serial_event_ws_disconnect() {} fn serial_buffer_size(n: usize) {} } -/// Errors which may occur during the course of a serial connection. -#[derive(Error, Debug)] -pub enum SerialTaskError { - #[error("Cannot upgrade HTTP request to WebSockets: {0}")] - Upgrade(#[from] hyper::Error), +#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] +struct ClientId(u64); - #[error("WebSocket Error: {0}")] - WebSocket(#[from] tungstenite::Error), +#[derive(Clone, Copy, PartialEq, Eq)] +pub enum ClientKind { + ReadWrite, + ReadOnly, +} - #[error("IO error: {0}")] - Io(#[from] std::io::Error), +struct ClientTask { + hdl: JoinHandle<()>, + control_tx: mpsc::Sender, + done_tx: oneshot::Sender<()>, +} - #[error("Mismatched websocket streams while closing")] - MismatchedStreams, +#[derive(Default)] +struct ClientTasks { + tasks: BTreeMap, + next_id: u64, + rw_client_id: Option, +} - #[error("Error while waiting for notification: {0}")] - OneshotRecv(#[from] oneshot::error::RecvError), +impl ClientTasks { + fn next_id(&mut self) -> ClientId { + let id = self.next_id; + self.next_id += 1; + ClientId(id) + } - #[error("JSON marshalling error while processing control message: {0}")] - Json(#[from] serde_json::Error), + fn remove_by_id(&mut self, id: ClientId) { + self.tasks.remove(&id); + match self.rw_client_id { + Some(existing_id) if existing_id == id => { + self.rw_client_id = None; + } + _ => {} + } + } + + fn remove_rw_client(&mut self) -> Option { + if let Some(id) = self.rw_client_id.take() { + self.tasks.remove(&id) + } else { + None + } + } } -pub enum SerialTaskControlMessage { - Stopping, - Migration { destination: SocketAddr, from_start: u64 }, +pub struct SerialConsoleManager { + log: slog::Logger, + backend: Arc, + client_tasks: Arc>, } -pub struct SerialTask { - /// Handle to attached serial session - pub task: JoinHandle<()>, - /// Channel used to signal the task to terminate gracefully or notify - /// clients of a migration - pub control_ch: mpsc::Sender, - /// Channel used to send new client connections to the streaming task - pub websocks_ch: - mpsc::Sender>, +impl SerialConsoleManager { + pub fn new(log: slog::Logger, backend: Arc) -> Self { + Self { + log, + backend, + client_tasks: Arc::new(Mutex::new(ClientTasks::default())), + } + } + + pub async fn stop(self) { + let tasks = { + let mut guard = self.client_tasks.lock().unwrap(); + std::mem::take(&mut guard.tasks) + }; + + let mut tasks: Vec<_> = tasks.into_values().collect(); + for task in &mut tasks { + let (tx, _rx) = oneshot::channel(); + let done_tx = std::mem::replace(&mut task.done_tx, tx); + let _ = done_tx.send(()); + } + + futures::future::join_all(tasks.into_iter().map(|task| task.hdl)).await; + } + + pub async fn connect( + &self, + ws: WebSocketStream, + kind: ClientKind, + ) { + const SERIAL_CHANNEL_SIZE: usize = 256; + let (console_tx, console_rx) = mpsc::channel(SERIAL_CHANNEL_SIZE); + let (control_tx, control_rx) = mpsc::channel(1); + let (task_start_tx, task_start_rx) = oneshot::channel(); + let (task_done_tx, task_done_rx) = oneshot::channel(); + let (permissions, discipline) = match kind { + ClientKind::ReadWrite => ( + backend::Permissions::ReadWrite, + backend::ReadWaitDiscipline::Block, + ), + ClientKind::ReadOnly => ( + backend::Permissions::ReadOnly, + backend::ReadWaitDiscipline::Close, + ), + }; + + let mut client_tasks = self.client_tasks.lock().unwrap(); + let client_id = client_tasks.next_id(); + let prev_rw_task = if kind == ClientKind::ReadWrite { + client_tasks.remove_rw_client() + } else { + None + }; + + let backend_hdl = + self.backend.attach_client(console_tx, permissions, discipline); + + let ctx = SerialTaskContext { + log: self.log.clone(), + ws, + backend_hdl, + console_rx, + control_rx, + start_rx: task_start_rx, + done_rx: task_done_rx, + client_tasks: self.client_tasks.clone(), + client_id, + }; + + let task = ClientTask { + hdl: tokio::spawn(async move { serial_task(ctx).await }), + control_tx, + done_tx: task_done_tx, + }; + + client_tasks.tasks.insert(client_id, task); + if kind == ClientKind::ReadWrite { + assert!(client_tasks.rw_client_id.is_none()); + client_tasks.rw_client_id = Some(client_id); + } + + drop(client_tasks); + if let Some(task) = prev_rw_task { + task.done_tx.send(()); + task.hdl.await; + } + + task_start_tx.send(()); + } } -pub async fn instance_serial_task( - mut websocks_recv: mpsc::Receiver< - WebSocketStream, - >, - mut control_recv: mpsc::Receiver, - serial: Arc>, - log: Logger, -) -> Result<(), SerialTaskError> { - info!(log, "Entered serial task"); - let mut output = [0u8; 1024]; - let mut cur_output: Option> = None; - let mut cur_input: Option<(Vec, usize)> = None; - - let mut ws_sinks: HashMap< - usize, - SplitSink, Message>, - > = HashMap::new(); - let mut ws_streams: HashMap< - usize, - futures::stream::SplitStream< - WebSocketStream, - >, - > = HashMap::new(); - - let (send_ch, mut recv_ch) = mpsc::channel(4); - - let mut next_stream_id = 0usize; +/// Context passed to every serial console task. +struct SerialTaskContext { + /// The logger this task should use. + log: slog::Logger, + + /// The websocket connection this task uses to communicate with its client. + ws: WebSocketStream, + + /// A handle representing this task's connection to the console backend. + backend_hdl: backend::ClientHandle, + + /// Receives output bytes from the guest. + console_rx: mpsc::Receiver, + + /// Receives control commands that should be relayed to websocket clients + /// (e.g. impending migrations). + control_rx: mpsc::Receiver, + + start_rx: oneshot::Receiver<()>, + /// Signaled when the task is + done_rx: oneshot::Receiver<()>, + + /// A reference to the manager's client task map, used to deregister this + /// task on disconnection. + client_tasks: Arc>, + + /// This task's ID, used to deregister this task on disconnection. + client_id: ClientId, +} + +async fn serial_task( + SerialTaskContext { + log, + ws, + mut backend_hdl, + mut console_rx, + mut control_rx, + start_rx, + mut done_rx, + client_tasks, + client_id, + }: SerialTaskContext, +) { + enum Event { + Done, + ConsoleRead(u8), + ConsoleDisconnected, + WroteToBackend(Result), + ControlMessage(InstanceSerialConsoleControlMessage), + WebsocketMessage(Message), + WebsocketError(tokio_tungstenite::tungstenite::Error), + WebsocketDisconnected, + } + + async fn close( + log: &slog::Logger, + client_id: ClientId, + sink: SplitSink, Message>, + stream: SplitStream>, + reason: &str, + ) { + let mut ws = + sink.reunite(stream).expect("sink and stream should match"); + if let Err(e) = ws + .close(Some(CloseFrame { + code: CloseCode::Away, + reason: reason.into(), + })) + .await + { + warn!( + log, "error sending close frame to client"; + "client_id" => client_id.0, + "error" => ?e, + ); + } + } + + let _ = start_rx.await; + + info!( + log, + "serial console task started"; + "client_id" => client_id.0, + ); + + let mut remaining_to_send: VecDeque = VecDeque::new(); + let (mut sink, mut stream) = ws.split(); + let mut close_reason: Option<&'static str> = None; loop { - let (uart_read, ws_send) = - if ws_sinks.is_empty() || cur_output.is_none() { - (serial.read_source(&mut output).fuse(), Fuse::terminated()) - } else { - let range = cur_output.clone().unwrap(); + use futures::future::Either; + + let (will_send, send_fut) = + if backend_hdl.is_writable() && !remaining_to_send.is_empty() { + remaining_to_send.make_contiguous(); ( - Fuse::terminated(), - if !ws_sinks.is_empty() { - futures::stream::iter( - ws_sinks.iter_mut().zip(std::iter::repeat( - Vec::from(&output[range]), - )), - ) - .for_each_concurrent(4, |((_i, ws), bin)| { - ws.send(Message::binary(bin)).map(|_| ()) - }) - .fuse() - } else { - Fuse::terminated() - }, + true, + Either::Left( + backend_hdl.write(remaining_to_send.as_slices().0), + ), ) + } else { + (false, Either::Right(futures::future::pending())) }; - let (ws_recv, uart_write) = match &cur_input { - None => ( - if !ws_streams.is_empty() { - futures::stream::iter(ws_streams.iter_mut()) - .for_each_concurrent(4, |(i, ws)| { - ws.next() - .then(|msg| send_ch.send((*i, msg))) - .map(|_| ()) - }) - .fuse() - } else { - Fuse::terminated() - }, - Fuse::terminated(), - ), - Some((data, consumed)) => ( - Fuse::terminated(), - serial.write_sink(&data[*consumed..]).fuse(), - ), + // If there are no bytes to be sent to the guest, accept another message + // from the websocket. + let ws_fut = if !will_send { + Either::Left(stream.next()) + } else { + Either::Right(futures::future::pending()) }; - let input_recv_ch_fut = recv_ch.recv().fuse(); - let new_ws_recv = websocks_recv.recv().fuse(); - let control_recv_fut = control_recv.recv().fuse(); - - tokio::select! { - // Poll in the order written + let event = select! { + // The priority of these branches is important: + // + // 1. Requests to stop the client take precedence over everything + // else. + // 2. New bytes written by the guest need to be processed before any + // other requests: if a guest outputs a byte while a read-write + // client is attached, the relevant vCPU will be blocked until + // the client processes the byte. biased; - // It's important we always poll the close channel first - // so that a constant stream of incoming/outgoing messages - // don't cause us to ignore it - message = control_recv_fut => { - probes::serial_close_recv!(|| {}); - match message { - Some(SerialTaskControlMessage::Stopping) | None => { - // Gracefully close the connections to any clients - for (i, ws0) in ws_sinks.into_iter() { - let ws1 = ws_streams.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - let mut ws = ws0.reunite(ws1).map_err(|_| SerialTaskError::MismatchedStreams)?; - let _ = ws.close(Some(CloseFrame { - code: CloseCode::Away, - reason: "VM stopped".into(), - })).await; - } - } - Some(SerialTaskControlMessage::Migration { destination, from_start }) => { - let mut failures = 0; - for sink in ws_sinks.values_mut() { - if sink.send(Message::Text(serde_json::to_string( - &InstanceSerialConsoleControlMessage::Migrating { - destination, - from_start, - } - )?)).await.is_err() { - failures += 1; - } - } - if failures > 0 { - warn!(log, "Failed to send migration info to {} connected clients.", failures); - } - } - } - info!(log, "Terminating serial task"); - break; + _ = &mut done_rx => { + Event::Done } - new_ws = new_ws_recv => { - probes::serial_new_ws!(|| {}); - if let Some(ws) = new_ws { - let (ws_sink, ws_stream) = ws.split(); - ws_sinks.insert(next_stream_id, ws_sink); - ws_streams.insert(next_stream_id, ws_stream); - next_stream_id += 1; + res = console_rx.recv() => { + match res { + Some(b) => Event::ConsoleRead(b), + None => Event::ConsoleDisconnected, } } - // Write bytes into the UART from the WS - written = uart_write => { - probes::serial_uart_write!(|| { written.unwrap_or(0) }); - match written { - Some(0) | None => { - break; - } - Some(n) => { - let (data, consumed) = cur_input.as_mut().unwrap(); - *consumed += n; - if *consumed == data.len() { - cur_input = None; - } - } - } + control = control_rx.recv() => { + Event::ControlMessage(control.expect( + "serial control channel should outlive its task" + )) } - // Transmit bytes from the UART through the WS - _ = ws_send => { - probes::serial_uart_out!(|| {}); - cur_output = None; + res = send_fut => { + Event::WroteToBackend(res) } - // Read bytes from the UART to be transmitted out the WS - nread = uart_read => { - // N.B. Putting this probe inside the match arms below causes - // the `break` arm to be taken unexpectedly. See - // propolis#292 for details. - probes::serial_uart_read!(|| { nread.unwrap_or(0) }); - match nread { - Some(0) | None => { - break; - } - Some(n) => { - cur_output = Some(0..n) - } + ws = ws_fut => { + match ws { + None => Event::WebsocketDisconnected, + Some(Ok(msg)) => Event::WebsocketMessage(msg), + Some(Err(err)) => Event::WebsocketError(err), } } + }; + + match event { + Event::Done => { + probes::serial_event_done!(|| ()); + close_reason = Some("VM stopped"); + break; + } + Event::ConsoleRead(b) => { + probes::serial_event_read!(|| (b)); + + // Waiting outside the `select!` is OK here: + // + // - If the client is a read-write client, it is allowed to + // block the guest to ensure that every byte of guest output + // is transmitted to the client. + // - If the client is a read-only client, and it is slow to + // acknowledge this message, its channel to the backend will + // eventually fill up. If this happens and the backend thus + // becomes unable to send new bytes, it will drop the channel + // to allow the guest to make progress. + let _ = sink.send(Message::binary(vec![b])).await; + } + Event::ConsoleDisconnected => { + probes::serial_event_console_disconnect!(|| ()); + info!( + log, "console backend dropped its client channel"; + "client_id" => client_id.0 + ); + break; + } + Event::ControlMessage(control) => { + let _ = sink + .send(Message::Text( + serde_json::to_string(&control).expect( + "control messages can always serialize into JSON", + ), + )) + .await; + } + Event::WroteToBackend(result) => { + let written = match result { + Ok(n) => n, + Err(e) => { + let reason = if e.kind() + == std::io::ErrorKind::ConnectionAborted + { + "read-write console connection overtaken" + } else { + "error writing to console backend" + }; + + warn!( + log, + "dropping read-write console client"; + "client_id" => client_id.0, + "error" => ?e, + "reason" => reason + ); + + close_reason = Some(reason); + break; + } + }; - // Receive bytes from the intermediate channel to be injected into - // the UART. This needs to be checked before `ws_recv` so that - // "close" messages can be processed and their indicated - // sinks/streams removed before they are polled again. - pair = input_recv_ch_fut => { - probes::serial_inject_uart!(|| {}); - if let Some((i, msg)) = pair { - match msg { - Some(Ok(Message::Binary(input))) => { - cur_input = Some((input, 0)); - } - Some(Ok(Message::Close(..))) | None => { - info!(log, "Removing closed serial connection {}.", i); - let sink = ws_sinks.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - let stream = ws_streams.remove(&i).ok_or(SerialTaskError::MismatchedStreams)?; - if let Err(e) = sink.reunite(stream).map_err(|_| SerialTaskError::MismatchedStreams)?.close(None).await { - warn!(log, "Failed while closing stream {}: {}", i, e); - } - }, - _ => continue, + drop(remaining_to_send.drain(..written)); + } + Event::WebsocketMessage(msg) => { + match (backend_hdl.is_writable(), msg) { + (true, Message::Binary(bytes)) => { + probes::serial_event_ws_recv!(|| (bytes.len())); + remaining_to_send.extend(bytes.as_slice()); + } + (false, Message::Binary(_)) => { + continue; } + (_, _) => continue, } } - - // Receive bytes from connected WS clients to feed to the - // intermediate recv_ch - _ = ws_recv => { - probes::serial_ws_recv!(|| {}); + Event::WebsocketError(e) => { + probes::serial_event_ws_error!(|| ()); + warn!( + log, "serial console websocket error"; + "client_id" => client_id.0, + "error" => ?e + ); + break; + } + Event::WebsocketDisconnected => { + probes::serial_event_ws_disconnect!(|| ()); + info!( + log, "serial console client disconnected"; + "client_id" => client_id.0 + ); + break; } } } - info!(log, "Returning from serial task"); - Ok(()) -} - -/// Represents a serial connection into the VM. -pub struct Serial { - uart: Arc, - - task_control_ch: Mutex>>, - - sink_poller: Arc, - source_poller: Arc, - history: AsyncRwLock, -} - -impl Serial { - /// Creates a new buffered serial connection on top of `uart.` - /// - /// Creation of this object disables "autodiscard", and destruction - /// of the object re-enables "autodiscard" mode. - /// - /// # Arguments - /// - /// * `uart` - The device which data will be read from / written to. - /// * `sink_size` - A lower bound on the size of the writeback buffer. - /// * `source_size` - A lower bound on the size of the read buffer. - pub fn new( - uart: Arc, - sink_size: NonZeroUsize, - source_size: NonZeroUsize, - ) -> Serial { - let sink_poller = pollers::SinkBuffer::new(sink_size); - let source_poller = pollers::SourceBuffer::new(pollers::Params { - buf_size: source_size, - poll_interval: Duration::from_millis(10), - poll_miss_thresh: 5, - }); - let history = Default::default(); - sink_poller.attach(uart.as_ref()); - source_poller.attach(uart.as_ref()); - uart.set_autodiscard(false); - - let task_control_ch = Default::default(); - - Serial { uart, task_control_ch, sink_poller, source_poller, history } - } - - pub async fn read_source(&self, buf: &mut [u8]) -> Option { - let uart = self.uart.clone(); - let bytes_read = self.source_poller.read(buf, uart.as_ref()).await?; - self.history.write().await.consume(&buf[..bytes_read]); - Some(bytes_read) - } - - pub async fn write_sink(&self, buf: &[u8]) -> Option { - let uart = self.uart.clone(); - self.sink_poller.write(buf, uart.as_ref()).await - } - - pub(crate) async fn history_vec( - &self, - byte_offset: SerialHistoryOffset, - max_bytes: Option, - ) -> Result<(Vec, usize), history_buffer::Error> { - self.history.read().await.contents_vec(byte_offset, max_bytes) - } - - // provide the channel through which we inform connected websocket clients - // that a migration has occurred, and where to reconnect. - // (the server's serial-to-websocket task -- and thus the receiving end of - // this channel -- are spawned in `instance_ensure_common`, after the - // construction of `Serial`) - pub(crate) async fn set_task_control_sender( - &self, - control_ch: mpsc::Sender, - ) { - self.task_control_ch.lock().await.replace(control_ch); - } - - pub(crate) async fn export_history( - &self, - destination: SocketAddr, - ) -> Result { - let read_hist = self.history.read().await; - let from_start = read_hist.bytes_from_start() as u64; - let encoded = ron::to_string(&*read_hist) - .map_err(|e| MigrateError::Codec(e.to_string()))?; - drop(read_hist); - if let Some(ch) = self.task_control_ch.lock().await.as_ref() { - ch.send(SerialTaskControlMessage::Migration { - destination, - from_start, - }) - .await - .map_err(|_| MigrateError::InvalidInstanceState)?; - } - Ok(encoded) - } - pub(crate) async fn import( - &self, - serialized_hist: &str, - ) -> Result<(), MigrateError> { - self.sink_poller.attach(self.uart.as_ref()); - self.source_poller.attach(self.uart.as_ref()); - self.uart.set_autodiscard(false); - let decoded = ron::from_str(serialized_hist) - .map_err(|e| MigrateError::Codec(e.to_string()))?; - let mut write_hist = self.history.write().await; - *write_hist = decoded; - Ok(()) + info!(log, "serial console task exiting"; "client_id" => client_id.0); + if let Some(close_reason) = close_reason { + close(&log, client_id, sink, stream, close_reason).await; } -} -impl Drop for Serial { - fn drop(&mut self) { - self.uart.set_autodiscard(true); - } + client_tasks.lock().unwrap().tasks.remove(&client_id); } From deb1642de1ba73c0db049c5f44c485af430297d9 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Thu, 9 Jan 2025 20:56:31 +0000 Subject: [PATCH 02/14] server: hook up com1 to new backend This plumbs the backend everywhere it needs to be, but import and export aren't quite set up yet. --- bin/propolis-server/src/lib/initializer.rs | 9 ++-- bin/propolis-server/src/lib/migrate/source.rs | 5 ++ bin/propolis-server/src/lib/serial/mod.rs | 24 +++++++++- bin/propolis-server/src/lib/server.rs | 16 +++---- bin/propolis-server/src/lib/vm/active.rs | 2 +- bin/propolis-server/src/lib/vm/ensure.rs | 24 ++++++---- bin/propolis-server/src/lib/vm/mod.rs | 6 +-- bin/propolis-server/src/lib/vm/objects.rs | 12 +++-- bin/propolis-server/src/lib/vm/services.rs | 48 ++++--------------- .../src/lib/vm/state_driver.rs | 14 +++++- 10 files changed, 86 insertions(+), 74 deletions(-) diff --git a/bin/propolis-server/src/lib/initializer.rs b/bin/propolis-server/src/lib/initializer.rs index 094e7cc3c..1a502b7f5 100644 --- a/bin/propolis-server/src/lib/initializer.rs +++ b/bin/propolis-server/src/lib/initializer.rs @@ -9,7 +9,7 @@ use std::os::unix::fs::FileTypeExt; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; -use crate::serial::Serial; +use crate::serial::backend::ConsoleBackend; use crate::spec::{self, Spec, StorageBackend, StorageDevice}; use crate::stats::{ track_network_interface_kstats, track_vcpu_kstats, VirtualDiskProducer, @@ -377,7 +377,7 @@ impl MachineInitializer<'_> { pub fn initialize_uart( &mut self, chipset: &RegisteredChipset, - ) -> Serial { + ) -> Arc { let mut com1 = None; for (name, desc) in self.spec.serial.iter() { if desc.device != spec::SerialPortDevice::Uart { @@ -401,9 +401,8 @@ impl MachineInitializer<'_> { } } - let sink_size = NonZeroUsize::new(64).unwrap(); - let source_size = NonZeroUsize::new(1024).unwrap(); - Serial::new(com1.unwrap(), sink_size, source_size) + const COM1_HISTORY_BYTES: usize = 1024 * 1024; + ConsoleBackend::new(com1.unwrap().clone(), COM1_HISTORY_BYTES) } pub fn initialize_ps2( diff --git a/bin/propolis-server/src/lib/migrate/source.rs b/bin/propolis-server/src/lib/migrate/source.rs index 49328c763..747879862 100644 --- a/bin/propolis-server/src/lib/migrate/source.rs +++ b/bin/propolis-server/src/lib/migrate/source.rs @@ -32,6 +32,7 @@ use crate::migrate::{ }; use crate::vm::objects::VmObjects; +use crate::vm::services::VmServices; use crate::vm::state_publisher::{ ExternalStateUpdate, MigrationStateUpdate, StatePublisher, }; @@ -133,6 +134,7 @@ pub(crate) trait SourceProtocol { async fn run( self, vm_objects: &VmObjects, + vm_services: &VmServices, publisher: &mut StatePublisher, persistent_state: &mut PersistentState, ) -> Result<(), MigrateError>; @@ -318,6 +320,7 @@ impl SourceProtocol for RonV0 { async fn run( self, vm_objects: &VmObjects, + vm_services: &VmServices, publisher: &mut StatePublisher, persistent_state: &mut PersistentState, ) -> Result<(), MigrateError> { @@ -327,6 +330,7 @@ impl SourceProtocol for RonV0 { conn: self.conn, dirt: self.dirt, vm: vm_objects, + vm_services, state_publisher: publisher, persistent_state, paused: false, @@ -342,6 +346,7 @@ struct RonV0Runner<'vm, T: MigrateConn> { conn: WebSocketStream, dirt: Option>, vm: &'vm VmObjects, + vm_services: &'vm VmServices, state_publisher: &'vm mut StatePublisher, persistent_state: &'vm mut PersistentState, paused: bool, diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 05ec3ab15..048822a10 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -6,6 +6,7 @@ use std::{ collections::{BTreeMap, VecDeque}, + net::SocketAddr, sync::{Arc, Mutex}, }; @@ -30,7 +31,7 @@ use tokio_tungstenite::{ WebSocketStream, }; -mod backend; +pub(crate) mod backend; pub(crate) mod history_buffer; #[usdt::provider(provider = "propolis")] @@ -187,6 +188,27 @@ impl SerialConsoleManager { task_start_tx.send(()); } + + pub async fn notify_migration(&self, destination: SocketAddr) { + let from_start = self.backend.bytes_since_start() as u64; + let clients: Vec<_> = { + let client_tasks = self.client_tasks.lock().unwrap(); + client_tasks + .tasks + .values() + .map(|client| client.control_tx.clone()) + .collect() + }; + + for client in clients { + let _ = client + .send(InstanceSerialConsoleControlMessage::Migrating { + destination, + from_start, + }) + .await; + } + } } /// Context passed to every serial console task. diff --git a/bin/propolis-server/src/lib/server.rs b/bin/propolis-server/src/lib/server.rs index ea1c41849..7a3f34dbb 100644 --- a/bin/propolis-server/src/lib/server.rs +++ b/bin/propolis-server/src/lib/server.rs @@ -405,7 +405,6 @@ async fn instance_serial_history_get( let max_bytes = query_params.max_bytes.map(|x| x as usize); let (data, end) = serial .history_vec(byte_offset, max_bytes) - .await .map_err(|e| HttpError::for_bad_request(None, e.to_string()))?; Ok(HttpResponseOk(api::InstanceSerialConsoleHistoryResponse { @@ -444,7 +443,7 @@ async fn instance_serial( let byte_offset = SerialHistoryOffset::try_from(&query.into_inner()).ok(); if let Some(mut byte_offset) = byte_offset { loop { - let (data, offset) = serial.history_vec(byte_offset, None).await?; + let (data, offset) = serial.history_vec(byte_offset, None)?; if data.is_empty() { break; } @@ -456,14 +455,13 @@ async fn instance_serial( } // Get serial task's handle and send it the websocket stream - let serial_task = vm.services().serial_task.lock().await; - serial_task + let serial_mgr = vm.services().serial_mgr.lock().await; + serial_mgr .as_ref() - .ok_or("Instance has no serial task")? - .websocks_ch - .send(ws_stream) - .await - .map_err(|e| format!("Serial socket hand-off failed: {}", e).into()) + .ok_or("Instance has no serial console manager")? + .connect(ws_stream, crate::serial::ClientKind::ReadWrite); + + Ok(()) } #[channel { diff --git a/bin/propolis-server/src/lib/vm/active.rs b/bin/propolis-server/src/lib/vm/active.rs index 8a812bf27..3114a47d6 100644 --- a/bin/propolis-server/src/lib/vm/active.rs +++ b/bin/propolis-server/src/lib/vm/active.rs @@ -40,7 +40,7 @@ pub(crate) struct ActiveVm { /// Services that interact with VM users or the control plane outside the /// Propolis API (e.g. the serial console, VNC, and metrics reporting). - pub(super) services: VmServices, + pub(super) services: Arc, /// The runtime on which this VM's state driver and any tasks spawned by /// the VM's components will run. diff --git a/bin/propolis-server/src/lib/vm/ensure.rs b/bin/propolis-server/src/lib/vm/ensure.rs index 1097344fa..c9d805aa0 100644 --- a/bin/propolis-server/src/lib/vm/ensure.rs +++ b/bin/propolis-server/src/lib/vm/ensure.rs @@ -275,13 +275,15 @@ impl<'a> VmEnsureObjectsCreated<'a> { /// installs an active VM into the parent VM state machine and notifies the /// ensure requester that its request is complete. pub(crate) async fn ensure_active(self) -> VmEnsureActive<'a> { - let vm_services = VmServices::new( - self.log, - &self.vm_objects, - &self.ensure_request.properties, - self.ensure_options, - ) - .await; + let vm_services = Arc::new( + VmServices::new( + self.log, + &self.vm_objects, + &self.ensure_request.properties, + self.ensure_options, + ) + .await, + ); let vmm_rt_hdl = self.vmm_rt.handle().clone(); self.vm @@ -289,7 +291,7 @@ impl<'a> VmEnsureObjectsCreated<'a> { self.log, self.input_queue.clone(), &self.vm_objects, - vm_services, + &vm_services, self.vmm_rt, ) .await; @@ -312,6 +314,7 @@ impl<'a> VmEnsureObjectsCreated<'a> { vmm_rt_hdl, state_publisher: self.state_publisher, vm_objects: self.vm_objects, + vm_services, input_queue: self.input_queue, kernel_vm_paused: self.kernel_vm_paused, } @@ -326,12 +329,14 @@ pub(crate) struct VmEnsureActive<'a> { vmm_rt_hdl: tokio::runtime::Handle, state_publisher: &'a mut StatePublisher, vm_objects: Arc, + vm_services: Arc, input_queue: Arc, kernel_vm_paused: bool, } pub(super) struct VmEnsureActiveOutput { pub vm_objects: Arc, + pub vm_services: Arc, pub input_queue: Arc, pub vmm_rt_hdl: tokio::runtime::Handle, } @@ -368,6 +373,7 @@ impl VmEnsureActive<'_> { pub(super) fn into_inner(self) -> VmEnsureActiveOutput { VmEnsureActiveOutput { vm_objects: self.vm_objects, + vm_services: self.vm_services, input_queue: self.input_queue, vmm_rt_hdl: self.vmm_rt_hdl, } @@ -425,7 +431,7 @@ async fn initialize_vm_objects( init.initialize_rtc(&chipset)?; init.initialize_hpet(); - let com1 = Arc::new(init.initialize_uart(&chipset)); + let com1 = init.initialize_uart(&chipset); let ps2ctrl = init.initialize_ps2(&chipset); init.initialize_qemu_debug_port()?; init.initialize_qemu_pvpanic(VirtualMachine::new( diff --git a/bin/propolis-server/src/lib/vm/mod.rs b/bin/propolis-server/src/lib/vm/mod.rs index debb9af78..2b1bda452 100644 --- a/bin/propolis-server/src/lib/vm/mod.rs +++ b/bin/propolis-server/src/lib/vm/mod.rs @@ -103,7 +103,7 @@ pub(crate) mod ensure; pub(crate) mod guest_event; pub(crate) mod objects; mod request_queue; -mod services; +pub(crate) mod services; mod state_driver; pub(crate) mod state_publisher; @@ -409,7 +409,7 @@ impl Vm { log: &slog::Logger, state_driver_queue: Arc, objects: &Arc, - services: services::VmServices, + services: &Arc, vmm_rt: tokio::runtime::Runtime, ) { info!(self.log, "installing active VM"); @@ -423,7 +423,7 @@ impl Vm { external_state_rx: vm.external_state_rx, properties: vm.properties, objects: objects.clone(), - services, + services: services.clone(), tokio_rt: vmm_rt, }); } diff --git a/bin/propolis-server/src/lib/vm/objects.rs b/bin/propolis-server/src/lib/vm/objects.rs index 2ade7dab1..22d203f54 100644 --- a/bin/propolis-server/src/lib/vm/objects.rs +++ b/bin/propolis-server/src/lib/vm/objects.rs @@ -13,7 +13,7 @@ use std::{ use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use propolis::{ - hw::{ps2::ctrl::PS2Ctrl, qemu::ramfb::RamFb, uart::LpcUart}, + hw::{ps2::ctrl::PS2Ctrl, qemu::ramfb::RamFb}, vmm::VmmHdl, Machine, }; @@ -21,7 +21,9 @@ use propolis_api_types::instance_spec::SpecKey; use slog::{error, info}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; -use crate::{serial::Serial, spec::Spec, vcpu_tasks::VcpuTaskController}; +use crate::{ + serial::backend::ConsoleBackend, spec::Spec, vcpu_tasks::VcpuTaskController, +}; use super::{ state_driver::VmStartReason, BlockBackendMap, CrucibleBackendMap, DeviceMap, @@ -50,7 +52,7 @@ pub(super) struct InputVmObjects { pub devices: DeviceMap, pub block_backends: BlockBackendMap, pub crucible_backends: CrucibleBackendMap, - pub com1: Arc>, + pub com1: Arc, pub framebuffer: Option>, pub ps2ctrl: Arc, } @@ -81,7 +83,7 @@ pub(crate) struct VmObjectsLocked { crucible_backends: CrucibleBackendMap, /// A handle to the serial console connection to the VM's first COM port. - com1: Arc>, + com1: Arc, /// A handle to the VM's framebuffer. framebuffer: Option>, @@ -175,7 +177,7 @@ impl VmObjectsLocked { /// Yields a clonable reference to the serial console for this VM's first /// COM port. - pub(crate) fn com1(&self) -> &Arc> { + pub(crate) fn com1(&self) -> &Arc { &self.com1 } diff --git a/bin/propolis-server/src/lib/vm/services.rs b/bin/propolis-server/src/lib/vm/services.rs index da73fa702..920248e1d 100644 --- a/bin/propolis-server/src/lib/vm/services.rs +++ b/bin/propolis-server/src/lib/vm/services.rs @@ -12,14 +12,14 @@ use propolis_api_types::InstanceProperties; use slog::{error, info, Logger}; use crate::{ - serial::SerialTaskControlMessage, + serial::SerialConsoleManager, server::MetricsEndpointConfig, spec::Spec, stats::{ServerStats, VirtualMachine}, vnc::VncServer, }; -use super::objects::{VmObjects, VmObjectsShared}; +use super::objects::VmObjects; /// Information used to serve Oximeter metrics. #[derive(Default)] @@ -35,8 +35,8 @@ pub(crate) struct OximeterState { /// A collection of services visible to consumers outside this Propolis that /// depend on the functionality supplied by an extant VM. pub(crate) struct VmServices { - /// A VM's serial console handler task. - pub serial_task: tokio::sync::Mutex>, + /// A VM's serial console manager. + pub serial_mgr: tokio::sync::Mutex>, /// A VM's Oximeter state. /// @@ -79,10 +79,11 @@ impl VmServices { vnc_server.attach(vm_objects.ps2ctrl().clone(), ramfb.clone()); } - let serial_task = start_serial_task(log, &vm_objects).await; + let serial_mgr = + SerialConsoleManager::new(log.clone(), vm_objects.com1().clone()); Self { - serial_task: tokio::sync::Mutex::new(Some(serial_task)), + serial_mgr: tokio::sync::Mutex::new(Some(serial_mgr)), oximeter: tokio::sync::Mutex::new(oximeter_state), vnc_server, } @@ -92,12 +93,8 @@ impl VmServices { pub(super) async fn stop(&self, log: &Logger) { self.vnc_server.stop().await; - if let Some(serial_task) = self.serial_task.lock().await.take() { - let _ = serial_task - .control_ch - .send(SerialTaskControlMessage::Stopping) - .await; - let _ = serial_task.task.await; + if let Some(serial_mgr) = self.serial_mgr.lock().await.take() { + serial_mgr.stop().await; } let mut oximeter_state = self.oximeter.lock().await; @@ -165,30 +162,3 @@ async fn register_oximeter_producer( oximeter_state } - -/// Launches a serial console handler task. -async fn start_serial_task( - log: &slog::Logger, - vm_objects: &VmObjectsShared<'_>, -) -> crate::serial::SerialTask { - let (websocks_ch, websocks_recv) = tokio::sync::mpsc::channel(1); - let (control_ch, control_recv) = tokio::sync::mpsc::channel(1); - - let serial = vm_objects.com1().clone(); - serial.set_task_control_sender(control_ch.clone()).await; - let err_log = log.new(slog::o!("component" => "serial task")); - let task = tokio::spawn(async move { - if let Err(e) = crate::serial::instance_serial_task( - websocks_recv, - control_recv, - serial, - err_log.clone(), - ) - .await - { - error!(err_log, "Failure in serial task: {}", e); - } - }); - - crate::serial::SerialTask { task, control_ch, websocks_ch } -} diff --git a/bin/propolis-server/src/lib/vm/state_driver.rs b/bin/propolis-server/src/lib/vm/state_driver.rs index 0987699e9..9f746c24e 100644 --- a/bin/propolis-server/src/lib/vm/state_driver.rs +++ b/bin/propolis-server/src/lib/vm/state_driver.rs @@ -35,6 +35,7 @@ use super::{ guest_event::{self, GuestEvent}, objects::VmObjects, request_queue::{self, ExternalRequest, InstanceAutoStart}, + services::VmServices, state_publisher::{MigrationStateUpdate, StatePublisher}, InstanceEnsureResponseTx, }; @@ -234,6 +235,9 @@ struct StateDriver { /// The VM objects this driver is managing. objects: Arc, + /// The services associated with this driver's VM. + services: Arc, + /// The input queue this driver gets events from. input_queue: Arc, @@ -289,12 +293,17 @@ pub(super) async fn run_state_driver( } }; - let VmEnsureActiveOutput { vm_objects, input_queue, vmm_rt_hdl } = - activated_vm.into_inner(); + let VmEnsureActiveOutput { + vm_objects, + vm_services, + input_queue, + vmm_rt_hdl, + } = activated_vm.into_inner(); let state_driver = StateDriver { log, objects: vm_objects, + services: vm_services, input_queue, external_state: state_publisher, paused: false, @@ -631,6 +640,7 @@ impl StateDriver { match migration .run( &self.objects, + &self.services, &mut self.external_state, &mut self.migration_src_state, ) From 453d093e7b57ed1e9a84579b92e753608588ef8c Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Thu, 9 Jan 2025 22:32:17 +0000 Subject: [PATCH 03/14] server: import/export history --- .../src/lib/migrate/destination.rs | 50 ++++++++----- bin/propolis-server/src/lib/migrate/source.rs | 36 ++++++--- bin/propolis-server/src/lib/serial/backend.rs | 35 +++++++-- .../src/lib/serial/history_buffer.rs | 45 ++++++++++- bin/propolis-server/src/lib/serial/mod.rs | 74 ++++++++++--------- bin/propolis-server/src/lib/server.rs | 3 +- 6 files changed, 171 insertions(+), 72 deletions(-) diff --git a/bin/propolis-server/src/lib/migrate/destination.rs b/bin/propolis-server/src/lib/migrate/destination.rs index 49bb96b1f..7a9eefb49 100644 --- a/bin/propolis-server/src/lib/migrate/destination.rs +++ b/bin/propolis-server/src/lib/migrate/destination.rs @@ -6,7 +6,8 @@ use bitvec::prelude as bv; use futures::{SinkExt, StreamExt}; use propolis::common::{GuestAddr, Lifecycle, PAGE_SIZE}; use propolis::migrate::{ - MigrateCtx, MigrateStateError, Migrator, PayloadOffer, PayloadOffers, + MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOffer, + PayloadOffers, }; use propolis::vmm; use propolis_api_types::instance_spec::SpecKey; @@ -22,10 +23,10 @@ use tokio_tungstenite::tungstenite::protocol::CloseFrame; use tokio_tungstenite::{tungstenite, WebSocketStream}; use uuid::Uuid; -use crate::migrate::codec; use crate::migrate::memx; use crate::migrate::preamble::Preamble; use crate::migrate::probes; +use crate::migrate::{codec, DevicePayload}; use crate::migrate::{ Device, MigrateError, MigratePhase, MigrateRole, MigrationState, PageIter, }; @@ -511,6 +512,21 @@ impl RonV0 { return Err(MigrateError::UnexpectedMessage); } }; + + let com1_payload: DevicePayload = match self.read_msg().await? { + codec::Message::Serialized(encoded) => { + ron::de::from_reader(encoded.as_bytes()) + .map_err(codec::ProtocolError::from)? + } + msg => { + error!( + self.log(), + "device_state: unexpected COM1 history message: {msg:?}" + ); + return Err(MigrateError::UnexpectedMessage); + } + }; + self.read_ok().await?; info!(self.log(), "Devices: {devices:#?}"); @@ -529,6 +545,18 @@ impl RonV0 { })?; self.import_device(&target, &device, &migrate_ctx)?; } + + let com1_data = + &mut ron::Deserializer::from_str(&com1_payload.data) + .map_err(codec::ProtocolError::from)?; + let com1_offer = PayloadOffer { + kind: &com1_payload.kind, + version: com1_payload.version, + payload: Box::new(::erase( + com1_data, + )), + }; + vm_objects.com1().import(com1_offer, &migrate_ctx)?; } self.send_msg(codec::Message::Okay).await @@ -762,24 +790,8 @@ impl RonV0 { .map_err(codec::ProtocolError::from)?, )) .await?; - let com1_history = match self.read_msg().await? { - codec::Message::Serialized(encoded) => encoded, - msg => { - error!(self.log(), "server_state: unexpected message: {msg:?}"); - return Err(MigrateError::UnexpectedMessage); - } - }; - - ensure_ctx - .vm_objects() - .lock_shared() - .await - .com1() - .import(&com1_history) - .await - .map_err(|e| MigrateError::Codec(e.to_string()))?; - self.send_msg(codec::Message::Okay).await + Ok(()) } async fn finish( diff --git a/bin/propolis-server/src/lib/migrate/source.rs b/bin/propolis-server/src/lib/migrate/source.rs index 747879862..8f8c128ff 100644 --- a/bin/propolis-server/src/lib/migrate/source.rs +++ b/bin/propolis-server/src/lib/migrate/source.rs @@ -6,7 +6,7 @@ use bitvec::prelude::{BitSlice, Lsb0}; use futures::{SinkExt, StreamExt}; use propolis::common::{GuestAddr, GuestData, PAGE_SIZE}; use propolis::migrate::{ - MigrateCtx, MigrateStateError, Migrator, PayloadOutputs, + MigrateCtx, MigrateSingle, MigrateStateError, Migrator, PayloadOutputs, }; use propolis::vmm; use propolis_api_types::instance_spec::VersionedInstanceSpec; @@ -685,6 +685,7 @@ impl RonV0Runner<'_, T> { async fn device_state(&mut self) -> Result<(), MigrateError> { self.update_state(MigrationState::Device); let mut device_states = vec![]; + let com1_payload; { let objects = self.vm.lock_shared().await; let migrate_ctx = @@ -733,6 +734,14 @@ impl RonV0Runner<'_, T> { } Ok(()) })?; + + let com1_state = objects.com1().export(&migrate_ctx)?; + com1_payload = DevicePayload { + kind: com1_state.kind.to_owned(), + version: com1_state.version, + data: ron::ser::to_string(&com1_state.payload) + .map_err(codec::ProtocolError::from)?, + }; } info!(self.log(), "Device States: {device_states:#?}"); @@ -743,7 +752,14 @@ impl RonV0Runner<'_, T> { )) .await?; + self.send_msg(codec::Message::Serialized( + ron::ser::to_string(&com1_payload) + .map_err(codec::ProtocolError::from)?, + )) + .await?; + self.send_msg(codec::Message::Okay).await?; + self.read_ok().await } @@ -787,15 +803,15 @@ impl RonV0Runner<'_, T> { } _ => return Err(MigrateError::UnexpectedMessage), }; - let com1_history = self - .vm - .lock_shared() - .await - .com1() - .export_history(remote_addr) - .await?; - self.send_msg(codec::Message::Serialized(com1_history)).await?; - self.read_ok().await + + { + let mgr = self.vm_services.serial_mgr.lock().await; + if let Some(mgr) = mgr.as_ref() { + mgr.notify_migration(remote_addr).await; + } + } + + Ok(()) } async fn finish(&mut self) -> Result<(), MigrateError> { diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index 19126348f..d2cadbb46 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -25,9 +25,6 @@ use super::history_buffer::{HistoryBuffer, SerialHistoryOffset}; #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct ClientId(u64); -trait ConsoleDevice: Source + Sink {} -impl ConsoleDevice for T {} - /// A client's rights when accessing a character backend. #[derive(Clone, Copy)] pub(super) enum Permissions { @@ -188,7 +185,6 @@ impl Inner { /// access a single serial device. pub struct ConsoleBackend { inner: Arc>, - dev: Arc, sink: Arc, sink_buffer: Arc, done_tx: oneshot::Sender<()>, @@ -211,7 +207,6 @@ impl ConsoleBackend { let this = Arc::new(Self { inner: Arc::new(Mutex::new(Inner::new(history_bytes))), - dev: device, sink, sink_buffer, done_tx, @@ -268,6 +263,36 @@ impl Drop for ConsoleBackend { } } +mod migrate { + use propolis::migrate::{ + MigrateCtx, MigrateSingle, MigrateStateError, PayloadOffer, + }; + + use crate::serial::history_buffer::migrate::HistoryBufferContentsV1; + + use super::ConsoleBackend; + + impl MigrateSingle for ConsoleBackend { + fn export( + &self, + _ctx: &MigrateCtx, + ) -> Result + { + Ok(self.inner.lock().unwrap().buffer.export().into()) + } + + fn import( + &self, + mut offer: PayloadOffer, + _ctx: &MigrateCtx, + ) -> Result<(), MigrateStateError> { + let contents: HistoryBufferContentsV1 = offer.parse()?; + self.inner.lock().unwrap().buffer.import(contents); + Ok(()) + } + } +} + async fn read_task( inner: Arc>, source: Arc, diff --git a/bin/propolis-server/src/lib/serial/history_buffer.rs b/bin/propolis-server/src/lib/serial/history_buffer.rs index f688e71ed..7eae70c32 100644 --- a/bin/propolis-server/src/lib/serial/history_buffer.rs +++ b/bin/propolis-server/src/lib/serial/history_buffer.rs @@ -7,7 +7,6 @@ use dropshot::HttpError; use propolis_api_types as api; -use serde::{Deserialize, Serialize}; use std::collections::VecDeque; use std::convert::TryFrom; @@ -29,7 +28,7 @@ const DEFAULT_MAX_LENGTH: isize = 16 * 1024; /// An abstraction for storing the contents of the instance's serial console /// output, intended for retrieval by the web console or other monitoring or /// troubleshooting tools. -#[derive(Deserialize, Serialize, Clone)] +#[derive(Clone)] pub(crate) struct HistoryBuffer { beginning: Vec, rolling: VecDeque, @@ -223,6 +222,48 @@ impl HistoryBuffer { pub fn bytes_from_start(&self) -> usize { self.total_bytes } + + pub(super) fn export(&self) -> migrate::HistoryBufferContentsV1 { + let slices = self.rolling.as_slices(); + let mut most_recent = Vec::with_capacity(self.rolling.len()); + most_recent.extend(slices.0); + most_recent.extend(slices.1); + migrate::HistoryBufferContentsV1 { + beginning: self.beginning.clone(), + most_recent, + total_bytes: self.total_bytes, + buffer_size: self.buffer_size, + } + } + + pub(super) fn import( + &mut self, + contents: migrate::HistoryBufferContentsV1, + ) { + self.beginning = contents.beginning; + self.rolling.clear(); + self.rolling.extend(contents.most_recent.iter()); + self.total_bytes = contents.total_bytes; + self.buffer_size = contents.buffer_size; + } +} + +pub(crate) mod migrate { + use serde::{Deserialize, Serialize}; + + #[derive(Deserialize, Serialize, Clone)] + pub(crate) struct HistoryBufferContentsV1 { + pub(super) beginning: Vec, + pub(super) most_recent: Vec, + pub(super) total_bytes: usize, + pub(super) buffer_size: usize, + } + + impl propolis::migrate::Schema<'_> for HistoryBufferContentsV1 { + fn id() -> propolis::migrate::SchemaId { + ("serial-console-backend", 1) + } + } } #[cfg(test)] diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 048822a10..a45de1417 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -145,48 +145,52 @@ impl SerialConsoleManager { ), }; - let mut client_tasks = self.client_tasks.lock().unwrap(); - let client_id = client_tasks.next_id(); - let prev_rw_task = if kind == ClientKind::ReadWrite { - client_tasks.remove_rw_client() - } else { - None - }; + let prev_rw_task; + { + let mut client_tasks = self.client_tasks.lock().unwrap(); + let client_id = client_tasks.next_id(); + prev_rw_task = if kind == ClientKind::ReadWrite { + client_tasks.remove_rw_client() + } else { + None + }; - let backend_hdl = - self.backend.attach_client(console_tx, permissions, discipline); - - let ctx = SerialTaskContext { - log: self.log.clone(), - ws, - backend_hdl, - console_rx, - control_rx, - start_rx: task_start_rx, - done_rx: task_done_rx, - client_tasks: self.client_tasks.clone(), - client_id, - }; + let backend_hdl = + self.backend.attach_client(console_tx, permissions, discipline); + + let ctx = SerialTaskContext { + log: self.log.clone(), + ws, + backend_hdl, + console_rx, + control_rx, + start_rx: task_start_rx, + done_rx: task_done_rx, + client_tasks: self.client_tasks.clone(), + client_id, + }; - let task = ClientTask { - hdl: tokio::spawn(async move { serial_task(ctx).await }), - control_tx, - done_tx: task_done_tx, - }; + let task = ClientTask { + hdl: tokio::spawn(async move { serial_task(ctx).await }), + control_tx, + done_tx: task_done_tx, + }; - client_tasks.tasks.insert(client_id, task); - if kind == ClientKind::ReadWrite { - assert!(client_tasks.rw_client_id.is_none()); - client_tasks.rw_client_id = Some(client_id); + client_tasks.tasks.insert(client_id, task); + if kind == ClientKind::ReadWrite { + assert!(client_tasks.rw_client_id.is_none()); + client_tasks.rw_client_id = Some(client_id); + } } - drop(client_tasks); if let Some(task) = prev_rw_task { - task.done_tx.send(()); - task.hdl.await; + let _ = task.done_tx.send(()); + let _ = task.hdl.await; } - task_start_tx.send(()); + task_start_tx + .send(()) + .expect("new serial task shouldn't exit before starting"); } pub async fn notify_migration(&self, destination: SocketAddr) { @@ -468,5 +472,5 @@ async fn serial_task( close(&log, client_id, sink, stream, close_reason).await; } - client_tasks.lock().unwrap().tasks.remove(&client_id); + client_tasks.lock().unwrap().remove_by_id(client_id); } diff --git a/bin/propolis-server/src/lib/server.rs b/bin/propolis-server/src/lib/server.rs index 7a3f34dbb..59e2ad957 100644 --- a/bin/propolis-server/src/lib/server.rs +++ b/bin/propolis-server/src/lib/server.rs @@ -459,7 +459,8 @@ async fn instance_serial( serial_mgr .as_ref() .ok_or("Instance has no serial console manager")? - .connect(ws_stream, crate::serial::ClientKind::ReadWrite); + .connect(ws_stream, crate::serial::ClientKind::ReadWrite) + .await; Ok(()) } From bc37169a6336fb1fd271834f1674d4d6dc26db07 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Thu, 9 Jan 2025 23:43:08 +0000 Subject: [PATCH 04/14] server: support for read-only serial clients --- bin/propolis-server/src/lib/serial/history_buffer.rs | 2 ++ bin/propolis-server/src/lib/server.rs | 12 ++++++++++-- crates/propolis-api-types/src/lib.rs | 5 +++++ openapi/propolis-server.json | 8 ++++++++ 4 files changed, 25 insertions(+), 2 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/history_buffer.rs b/bin/propolis-server/src/lib/serial/history_buffer.rs index 7eae70c32..0055358d1 100644 --- a/bin/propolis-server/src/lib/serial/history_buffer.rs +++ b/bin/propolis-server/src/lib/serial/history_buffer.rs @@ -53,10 +53,12 @@ impl TryFrom<&api::InstanceSerialConsoleStreamRequest> for SerialHistoryOffset { api::InstanceSerialConsoleStreamRequest { from_start: Some(offset), most_recent: None, + .. } => Ok(SerialHistoryOffset::FromStart(*offset as usize)), api::InstanceSerialConsoleStreamRequest { from_start: None, most_recent: Some(offset), + .. } => Ok(SerialHistoryOffset::MostRecent(*offset as usize)), _ => Err(()), } diff --git a/bin/propolis-server/src/lib/server.rs b/bin/propolis-server/src/lib/server.rs index 59e2ad957..2aeb8c2d2 100644 --- a/bin/propolis-server/src/lib/server.rs +++ b/bin/propolis-server/src/lib/server.rs @@ -425,6 +425,7 @@ async fn instance_serial( let ctx = rqctx.context(); let vm = ctx.vm.active_vm().await.ok_or_else(not_created_error)?; let serial = vm.objects().lock_shared().await.com1().clone(); + let query = query.into_inner(); // Use the default buffering paramters for the websocket configuration // @@ -440,7 +441,7 @@ async fn instance_serial( ) .await; - let byte_offset = SerialHistoryOffset::try_from(&query.into_inner()).ok(); + let byte_offset = SerialHistoryOffset::try_from(&query).ok(); if let Some(mut byte_offset) = byte_offset { loop { let (data, offset) = serial.history_vec(byte_offset, None)?; @@ -459,7 +460,14 @@ async fn instance_serial( serial_mgr .as_ref() .ok_or("Instance has no serial console manager")? - .connect(ws_stream, crate::serial::ClientKind::ReadWrite) + .connect( + ws_stream, + if query.writable { + crate::serial::ClientKind::ReadWrite + } else { + crate::serial::ClientKind::ReadOnly + }, + ) .await; Ok(()) diff --git a/crates/propolis-api-types/src/lib.rs b/crates/propolis-api-types/src/lib.rs index 4296be0ac..1360c6305 100644 --- a/crates/propolis-api-types/src/lib.rs +++ b/crates/propolis-api-types/src/lib.rs @@ -325,6 +325,11 @@ pub struct InstanceSerialConsoleStreamRequest { /// recently buffered data retrieved from the instance. (See note on `from_start` about mutual /// exclusivity) pub most_recent: Option, + /// True if the connection should allow writing. If this option is set, any + /// existing writer to the serial console will be disconnected when this + /// client connects. + #[serde(default)] + pub writable: bool, } /// Control message(s) sent through the websocket to serial console clients. diff --git a/openapi/propolis-server.json b/openapi/propolis-server.json index 4e23f600c..c933e20d5 100644 --- a/openapi/propolis-server.json +++ b/openapi/propolis-server.json @@ -265,6 +265,14 @@ "format": "uint64", "minimum": 0 } + }, + { + "in": "query", + "name": "writable", + "description": "True if the connection should allow writing. If this option is set, any existing writer to the serial console will be disconnected when this client connects.", + "schema": { + "type": "boolean" + } } ], "responses": { From 5220758319f3e945bf11a4d74b8f019f9fbe6582 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 10 Jan 2025 00:24:29 +0000 Subject: [PATCH 05/14] client: make phd and cli read-write by default --- bin/propolis-cli/src/main.rs | 16 ++++++++++---- lib/propolis-client/src/support.rs | 30 +++++++++++++++++++++----- phd-tests/framework/src/test_vm/mod.rs | 1 + 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/bin/propolis-cli/src/main.rs b/bin/propolis-cli/src/main.rs index 47e72c847..bfe3f7f52 100644 --- a/bin/propolis-cli/src/main.rs +++ b/bin/propolis-cli/src/main.rs @@ -125,6 +125,10 @@ enum Command { /// Defaults to the most recent 16 KiB of console output (-16384). #[clap(long, short)] byte_offset: Option, + + /// True if the serial console connection should be read-only. + #[clap(long, action)] + readonly: bool, }, /// Migrate instance to new propolis-server @@ -604,9 +608,11 @@ async fn stdin_to_websockets_task( async fn serial( addr: SocketAddr, byte_offset: Option, + readonly: bool, log: Logger, ) -> anyhow::Result<()> { - let mut ws_console = serial_connect(addr, byte_offset, log).await?; + let mut ws_console = + serial_connect(addr, byte_offset, readonly, log).await?; let _raw_guard = RawTermiosGuard::stdio_guard() .with_context(|| anyhow!("failed to set raw mode"))?; @@ -699,6 +705,7 @@ async fn serial( async fn serial_connect( addr: SocketAddr, byte_offset: Option, + readonly: bool, log: Logger, ) -> anyhow::Result { let offset = match byte_offset { @@ -707,7 +714,8 @@ async fn serial_connect( None => WSClientOffset::MostRecent(16384), }; - Ok(InstanceSerialConsoleHelper::new(addr, offset, Some(log)).await?) + Ok(InstanceSerialConsoleHelper::new(addr, offset, readonly, Some(log)) + .await?) } async fn migrate_instance( @@ -914,8 +922,8 @@ async fn main() -> anyhow::Result<()> { } Command::Get => get_instance(&client).await?, Command::State { state } => put_instance(&client, state).await?, - Command::Serial { byte_offset } => { - serial(addr, byte_offset, log).await? + Command::Serial { byte_offset, readonly } => { + serial(addr, byte_offset, readonly, log).await? } Command::Migrate { dst_server, dst_port, dst_uuid, crucible_disks } => { let dst_addr = SocketAddr::new(dst_server, dst_port); diff --git a/lib/propolis-client/src/support.rs b/lib/propolis-client/src/support.rs index 9ab7c2d87..efb1956ff 100644 --- a/lib/propolis-client/src/support.rs +++ b/lib/propolis-client/src/support.rs @@ -74,6 +74,7 @@ pub(crate) trait SerialConsoleStreamBuilder: Send { &mut self, address: SocketAddr, offset: WSClientOffset, + readonly: bool, ) -> Result, WSError>; } @@ -95,6 +96,7 @@ impl SerialConsoleStreamBuilder for PropolisSerialBuilder { &mut self, address: SocketAddr, offset: WSClientOffset, + readonly: bool, ) -> Result, WSError> { let client = PropolisClient::new(&format!("http://{}", address)); let mut req = client.instance_serial(); @@ -108,6 +110,7 @@ impl SerialConsoleStreamBuilder for PropolisSerialBuilder { } } + req = req.writable(!readonly); let upgraded = req .send() .await @@ -157,6 +160,7 @@ impl SerialConsoleStreamBuilder // offset is currently unused by this builder. Worth testing in // the future. _offset: WSClientOffset, + _readonly: bool, ) -> Result, WSError> { if let Some((delay, stream)) = self.client_conns_and_delays.remove(&address) @@ -191,6 +195,7 @@ pub enum WSClientOffset { pub struct InstanceSerialConsoleHelper { stream_builder: Box, ws_stream: WebSocketStream>, + readonly: bool, log: Option, } @@ -202,10 +207,12 @@ impl InstanceSerialConsoleHelper { pub async fn new( address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = PropolisSerialBuilder::new(); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } /// Creates a new serial console helper for testing. @@ -217,6 +224,7 @@ impl InstanceSerialConsoleHelper { connections: impl IntoIterator, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = TestSerialBuilder::new( @@ -224,7 +232,8 @@ impl InstanceSerialConsoleHelper { .into_iter() .map(|(addr, stream)| (addr, Duration::ZERO, stream)), ); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } /// Creates a new serial console helper for testing, with delays before @@ -238,10 +247,12 @@ impl InstanceSerialConsoleHelper { connections: impl IntoIterator, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { let stream_builder = TestSerialBuilder::new(connections); - Self::new_with_builder(stream_builder, address, offset, log).await + Self::new_with_builder(stream_builder, address, offset, readonly, log) + .await } // Currently used for testing, and not exposed to clients. @@ -249,12 +260,18 @@ impl InstanceSerialConsoleHelper { mut stream_builder: impl SerialConsoleStreamBuilder + 'static, address: SocketAddr, offset: WSClientOffset, + readonly: bool, log: Option, ) -> Result { - let stream = stream_builder.build(address, offset).await?; + let stream = stream_builder.build(address, offset, readonly).await?; let ws_stream = WebSocketStream::from_raw_socket(stream, Role::Client, None).await; - Ok(Self { stream_builder: Box::new(stream_builder), ws_stream, log }) + Ok(Self { + stream_builder: Box::new(stream_builder), + ws_stream, + readonly, + log, + }) } /// Receives the next [WSMessage] from the server, holding it in @@ -401,6 +418,7 @@ impl InstanceSerialConsoleMessage<'_> { .build( destination, WSClientOffset::FromStart(from_start), + self.helper.readonly, ) .await?; self.helper.ws_stream = WebSocketStream::from_raw_socket( @@ -463,6 +481,7 @@ mod test { [(address, client_conn)], address, WSClientOffset::FromStart(0), + false, None, ) .await @@ -514,6 +533,7 @@ mod test { ], address_1, WSClientOffset::FromStart(0), + false, None, ) .await diff --git a/phd-tests/framework/src/test_vm/mod.rs b/phd-tests/framework/src/test_vm/mod.rs index caa17360e..54ef2d1ce 100644 --- a/phd-tests/framework/src/test_vm/mod.rs +++ b/phd-tests/framework/src/test_vm/mod.rs @@ -349,6 +349,7 @@ impl TestVm { .server_addr(), ), WSClientOffset::MostRecent(0), + false, None, ) .await?; From 7975920a4250bf3acb52bd2cda9e594536bea7e6 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 10 Jan 2025 00:43:22 +0000 Subject: [PATCH 06/14] server: don't allow clients to block VM stop; docs & cleanup --- bin/propolis-server/src/lib/serial/backend.rs | 46 ++++- bin/propolis-server/src/lib/serial/mod.rs | 162 ++++++++++++------ 2 files changed, 149 insertions(+), 59 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index d2cadbb46..c459bfc58 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -44,7 +44,7 @@ impl Permissions { /// Determines what happens when the backend is unable to send a guest byte back /// to a client because the client's channel is full. #[derive(Clone, Copy)] -pub(super) enum ReadWaitDiscipline { +pub(super) enum FullReadChannelDiscipline { /// The backend should block until it can send to this client. Block, @@ -60,7 +60,7 @@ struct Client { /// Determines what happens when the backend wants to send a byte and /// [`Self::tx`] is full. - read_discipline: ReadWaitDiscipline, + read_discipline: FullReadChannelDiscipline, } /// A handle held by a client that represents its connection to the backend. @@ -185,8 +185,20 @@ impl Inner { /// access a single serial device. pub struct ConsoleBackend { inner: Arc>, + + /// The character [`Sink`] that should receive writes to this backend. + /// Writes should not access the sink directly; instead, they should be + /// directed to [`Self::sink_buffer`]. This reference is needed because + /// [`SinkBuffer`]'s current API requires a sink to be passed into each + /// attempt to write (buffers don't own references to their sinks). sink: Arc, + + /// The buffer that sits in front of this backend's sink. Writes to the + /// backend should be directed at the buffer, not at [`Self::sink`]. sink_buffer: Arc, + + /// A channel used to tell the backend's reader task that the backend has + /// been closed. done_tx: oneshot::Sender<()>, } @@ -227,20 +239,26 @@ impl ConsoleBackend { /// /// - `read_tx`: A channel to which the backend should send bytes read from /// its device. + /// - `permissions`: The read/write permissions this client should have. + /// - `full_read_tx_discipline`: Describes what should happen if the reader + /// task ever finds that `read_tx` is full when dispatching a byte to it. pub(super) fn attach_client( self: &Arc, read_tx: mpsc::Sender, permissions: Permissions, - wait_discipline: ReadWaitDiscipline, + full_read_tx_discipline: FullReadChannelDiscipline, ) -> ClientHandle { let mut inner = self.inner.lock().unwrap(); let id = inner.next_client_id(); - let client = Client { tx: read_tx, read_discipline: wait_discipline }; + let client = + Client { tx: read_tx, read_discipline: full_read_tx_discipline }; inner.clients.insert(id, client); ClientHandle { id, backend: self.clone(), permissions } } + /// Returns the contents of this backend's history buffer. See + /// [`HistoryBuffer::contents_vec`]. pub fn history_vec( &self, byte_offset: SerialHistoryOffset, @@ -250,6 +268,8 @@ impl ConsoleBackend { inner.buffer.contents_vec(byte_offset, max_bytes) } + /// Returns the number of bytes that have ever been sent to this backend's + /// history buffer. pub fn bytes_since_start(&self) -> usize { self.inner.lock().unwrap().buffer.bytes_from_start() } @@ -293,6 +313,9 @@ mod migrate { } } +/// Reads bytes from the supplied `source` and dispatches them to the clients in +/// `inner`. Each backend is expected to spin up one task that runs this +/// function. async fn read_task( inner: Arc>, source: Arc, @@ -322,10 +345,15 @@ async fn read_task( let to_send = &bytes[0..bytes_read]; + // Capture a list of all the clients who should receive this byte with + // the lock held, then drop the lock before sending to any of them. Note + // that sends to clients may block for an arbitrary length of time: the + // receiver may be relaying received bytes to a websocket, and the + // remote peer may be slow to accept them. struct CapturedClient { id: ClientId, tx: mpsc::Sender, - discipline: ReadWaitDiscipline, + discipline: FullReadChannelDiscipline, disconnect: bool, } @@ -343,19 +371,23 @@ async fn read_task( .collect::>() }; + // Prepare to delete any clients that are no longer active (i.e., who + // have dropped the receiver sides of their channels) or who are using + // the close-on-full-channel discipline and who have a full channel. for byte in to_send { for client in clients.iter_mut() { client.disconnect = match client.discipline { - ReadWaitDiscipline::Block => { + FullReadChannelDiscipline::Block => { client.tx.send(*byte).await.is_err() } - ReadWaitDiscipline::Close => { + FullReadChannelDiscipline::Close => { client.tx.try_send(*byte).is_err() } } } } + // Clean up any clients who met the disconnection criteria. let mut guard = inner.lock().unwrap(); guard.buffer.consume(to_send); for client in clients.iter().filter(|c| c.disconnect) { diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index a45de1417..9fe821894 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -2,7 +2,25 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -//! Routines to expose a connection to an instance's serial port. +//! Defines the [`SerialConsoleManager`], which manages client websocket +//! connections to one of an instance's serial ports. +//! +//! Each `SerialConsoleManager` brokers connections to one serial device. Each +//! websocket connection to the device creates a [`ClientTask`] tracked by the +//! manager. This task largely plumbs incoming bytes from the websocket into the +//! backend and vice-versa, but it can also be used to send control messages to +//! clients, e.g. to notify them that an instance is migrating. +//! +//! The connection manager implements the connection policies described in RFD +//! 491: +//! +//! - A single serial device can have only one read-write client at a time (but +//! can have multiple read-only clients). +//! - The connection manager configures its connections to the backend so that +//! - The backend will block when sending bytes to read-write clients that +//! can't receive them immediately, but +//! - The backend will disconnect read-only clients who can't immediately +//! receive new bytes. use std::{ collections::{BTreeMap, VecDeque}, @@ -12,10 +30,7 @@ use std::{ use backend::ConsoleBackend; use dropshot::WebsocketConnectionRaw; -use futures::{ - stream::{SplitSink, SplitStream}, - SinkExt, StreamExt, -}; +use futures::{SinkExt, StreamExt}; use propolis_api_types::InstanceSerialConsoleControlMessage; use slog::{info, warn}; use tokio::{ @@ -48,22 +63,35 @@ mod probes { #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] struct ClientId(u64); +/// Specifies whether a client should be a read-write or read-only client. #[derive(Clone, Copy, PartialEq, Eq)] pub enum ClientKind { ReadWrite, ReadOnly, } +/// Tracks an individual client of this console and its associated tokio task. struct ClientTask { + /// The join handle for this client's task. hdl: JoinHandle<()>, + + /// Receives server-level commands and notifications about this connection. control_tx: mpsc::Sender, + + /// Triggered when the serial console manager shuts down. done_tx: oneshot::Sender<()>, } +/// The collection of [`ClientTask`]s belonging to a console manager. #[derive(Default)] struct ClientTasks { + /// A table mapping task IDs to task structs. tasks: BTreeMap, + + /// The next task ID to assign. next_id: u64, + + /// The ID of the current read-write client, if there is one. rw_client_id: Option, } @@ -74,6 +102,7 @@ impl ClientTasks { ClientId(id) } + /// Removes the task with ID `id` from the task collection. fn remove_by_id(&mut self, id: ClientId) { self.tasks.remove(&id); match self.rw_client_id { @@ -84,6 +113,8 @@ impl ClientTasks { } } + /// If there is a read-write client in the task collection, removes it (from + /// both the read-write client position and the task table) and returns it. fn remove_rw_client(&mut self) -> Option { if let Some(id) = self.rw_client_id.take() { self.tasks.remove(&id) @@ -93,6 +124,7 @@ impl ClientTasks { } } +/// Manages individual websocket client connections to a single serial device. pub struct SerialConsoleManager { log: slog::Logger, backend: Arc, @@ -108,6 +140,7 @@ impl SerialConsoleManager { } } + /// Directs all client tasks to stop and waits for them all to finish. pub async fn stop(self) { let tasks = { let mut guard = self.client_tasks.lock().unwrap(); @@ -124,6 +157,8 @@ impl SerialConsoleManager { futures::future::join_all(tasks.into_iter().map(|task| task.hdl)).await; } + /// Creates a new task that connects a websocket stream to this manager's + /// serial console. pub async fn connect( &self, ws: WebSocketStream, @@ -137,14 +172,18 @@ impl SerialConsoleManager { let (permissions, discipline) = match kind { ClientKind::ReadWrite => ( backend::Permissions::ReadWrite, - backend::ReadWaitDiscipline::Block, + backend::FullReadChannelDiscipline::Block, ), ClientKind::ReadOnly => ( backend::Permissions::ReadOnly, - backend::ReadWaitDiscipline::Close, + backend::FullReadChannelDiscipline::Close, ), }; + // If this client is to be the read-write client, and there's already + // such a client, the old client needs to be evicted and stopped before + // the new client can start. Create and install the new task under the + // lock, then drop the lock before running down the old task. let prev_rw_task; { let mut client_tasks = self.client_tasks.lock().unwrap(); @@ -188,11 +227,21 @@ impl SerialConsoleManager { let _ = task.hdl.await; } + // Don't let the new task start until any old tasks have been run down. + // This prevents writes from an old R/W task from being interleaved with + // writes from a new one. task_start_tx .send(()) .expect("new serial task shouldn't exit before starting"); } + /// Notifies all active clients that the instance is migrating to a new + /// host. + /// + /// This function reads state from the console backend and relays it to + /// clients. The caller should ensure that the VM is paused before calling + /// this function so that backend's state doesn't change after this function + /// captures it. pub async fn notify_migration(&self, destination: SocketAddr) { let from_start = self.backend.bytes_since_start() as u64; let clients: Vec<_> = { @@ -233,9 +282,10 @@ struct SerialTaskContext { /// (e.g. impending migrations). control_rx: mpsc::Receiver, + /// Signaled to direct a task to enter its processing loop. start_rx: oneshot::Receiver<()>, - /// Signaled when the task is + /// Signaled to tell a task to shut down. done_rx: oneshot::Receiver<()>, /// A reference to the manager's client task map, used to deregister this @@ -270,30 +320,8 @@ async fn serial_task( WebsocketDisconnected, } - async fn close( - log: &slog::Logger, - client_id: ClientId, - sink: SplitSink, Message>, - stream: SplitStream>, - reason: &str, - ) { - let mut ws = - sink.reunite(stream).expect("sink and stream should match"); - if let Err(e) = ws - .close(Some(CloseFrame { - code: CloseCode::Away, - reason: reason.into(), - })) - .await - { - warn!( - log, "error sending close frame to client"; - "client_id" => client_id.0, - "error" => ?e, - ); - } - } - + // Wait to be told to start the main loop. This allows the task creation + // logic to ensure that there's only one active console writer at a time. let _ = start_rx.await; info!( @@ -379,17 +407,23 @@ async fn serial_task( Event::ConsoleRead(b) => { probes::serial_event_read!(|| (b)); - // Waiting outside the `select!` is OK here: - // - // - If the client is a read-write client, it is allowed to - // block the guest to ensure that every byte of guest output - // is transmitted to the client. - // - If the client is a read-only client, and it is slow to - // acknowledge this message, its channel to the backend will - // eventually fill up. If this happens and the backend thus - // becomes unable to send new bytes, it will drop the channel - // to allow the guest to make progress. - let _ = sink.send(Message::binary(vec![b])).await; + // In general it's OK to wait for this byte to be sent, since + // read-write clients are allowed to block the backend and + // read-only clients will be disconnected if they don't process + // bytes in a timely manner. That said, even read-write clients + // need to monitor `done_rx` here; otherwise a badly-behaved + // client can prevent a VM from stopping. + select! { + biased; + + _ = &mut done_rx => { + probes::serial_event_done!(|| ()); + close_reason = Some("VM stopped"); + break; + } + + _ = sink.send(Message::binary(vec![b])) => {} + } } Event::ConsoleDisconnected => { probes::serial_event_console_disconnect!(|| ()); @@ -400,13 +434,24 @@ async fn serial_task( break; } Event::ControlMessage(control) => { - let _ = sink - .send(Message::Text( - serde_json::to_string(&control).expect( - "control messages can always serialize into JSON", - ), - )) - .await; + // As above, don't let a client that's not processing control + // messages prevent the VM from stopping. + select! { + biased; + + _ = &mut done_rx => { + probes::serial_event_done!(|| ()); + close_reason = Some("VM stopped"); + break; + } + + _ = sink + .send(Message::Text( + serde_json::to_string(&control).expect( + "control messages can always serialize into JSON", + ), + )) => {} + } } Event::WroteToBackend(result) => { let written = match result { @@ -468,8 +513,21 @@ async fn serial_task( } info!(log, "serial console task exiting"; "client_id" => client_id.0); - if let Some(close_reason) = close_reason { - close(&log, client_id, sink, stream, close_reason).await; + let mut ws = sink.reunite(stream).expect("sink and stream should match"); + if let Err(e) = ws + .close(Some(CloseFrame { + code: CloseCode::Away, + reason: close_reason + .unwrap_or("serial connection task exited") + .into(), + })) + .await + { + warn!( + log, "error sending close frame to client"; + "client_id" => client_id.0, + "error" => ?e, + ); } client_tasks.lock().unwrap().remove_by_id(client_id); From 5b4e8c9c55c938982bc859cd192ebae52fa87fda Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 10 Jan 2025 19:03:38 +0000 Subject: [PATCH 07/14] format comments --- crates/propolis-api-types/src/lib.rs | 44 +++++++++++++++------------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/crates/propolis-api-types/src/lib.rs b/crates/propolis-api-types/src/lib.rs index 1360c6305..8bad49ddc 100644 --- a/crates/propolis-api-types/src/lib.rs +++ b/crates/propolis-api-types/src/lib.rs @@ -287,28 +287,30 @@ pub struct Instance { /// Request a specific range of an Instance's serial console output history. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq)] pub struct InstanceSerialConsoleHistoryRequest { - /// Character index in the serial buffer from which to read, counting the bytes output since - /// instance start. If this is not provided, `most_recent` must be provided, and if this *is* - /// provided, `most_recent` must *not* be provided. + /// Character index in the serial buffer from which to read, counting the + /// bytes output since instance start. If this is not provided, + /// `most_recent` must be provided, and if this *is* provided, `most_recent` + /// must *not* be provided. pub from_start: Option, - /// Character index in the serial buffer from which to read, counting *backward* from the most - /// recently buffered data retrieved from the instance. (See note on `from_start` about mutual - /// exclusivity) + /// Character index in the serial buffer from which to read, counting + /// *backward* from the most recently buffered data retrieved from the + /// instance. (See note on `from_start` about mutual exclusivity) pub most_recent: Option, - /// Maximum number of bytes of buffered serial console contents to return. If the requested - /// range runs to the end of the available buffer, the data returned will be shorter than - /// `max_bytes`. + /// Maximum number of bytes of buffered serial console contents to return. + /// If the requested range runs to the end of the available buffer, the data + /// returned will be shorter than `max_bytes`. pub max_bytes: Option, } /// Contents of an Instance's serial console buffer. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema)] pub struct InstanceSerialConsoleHistoryResponse { - /// The bytes starting from the requested offset up to either the end of the buffer or the - /// request's `max_bytes`. Provided as a u8 array rather than a string, as it may not be UTF-8. + /// The bytes starting from the requested offset up to either the end of the + /// buffer or the request's `max_bytes`. Provided as a u8 array rather than + /// a string, as it may not be UTF-8. pub data: Vec, - /// The absolute offset since boot (suitable for use as `byte_offset` in a subsequent request) - /// of the last byte returned in `data`. + /// The absolute offset since boot (suitable for use as `byte_offset` in a + /// subsequent request) of the last byte returned in `data`. pub last_byte_offset: u64, } @@ -316,14 +318,16 @@ pub struct InstanceSerialConsoleHistoryResponse { /// bytes from the buffered history first. #[derive(Clone, Debug, Deserialize, Serialize, JsonSchema, PartialEq)] pub struct InstanceSerialConsoleStreamRequest { - /// Character index in the serial buffer from which to read, counting the bytes output since - /// instance start. If this is provided, `most_recent` must *not* be provided. - // TODO: if neither is specified, send enough serial buffer history to reconstruct - // the current contents and cursor state of an interactive terminal + /// Character index in the serial buffer from which to read, counting the + /// bytes output since instance start. If this is provided, `most_recent` + /// must *not* be provided. + // TODO: if neither is specified, send enough serial buffer history to + // reconstruct the current contents and cursor state of an interactive + // terminal pub from_start: Option, - /// Character index in the serial buffer from which to read, counting *backward* from the most - /// recently buffered data retrieved from the instance. (See note on `from_start` about mutual - /// exclusivity) + /// Character index in the serial buffer from which to read, counting + /// *backward* from the most recently buffered data retrieved from the + /// instance. (See note on `from_start` about mutual exclusivity) pub most_recent: Option, /// True if the connection should allow writing. If this option is set, any /// existing writer to the serial console will be disconnected when this From d6ea1c0cd9bcf26f567cf45941cc35533558ed29 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Thu, 16 Jan 2025 23:54:42 +0000 Subject: [PATCH 08/14] server: rename some serial backend types for clarity --- bin/propolis-server/src/lib/serial/backend.rs | 22 ++++++++++--------- bin/propolis-server/src/lib/serial/mod.rs | 2 +- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index c459bfc58..286c14ad5 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -53,7 +53,7 @@ pub(super) enum FullReadChannelDiscipline { } /// An individual client of a character backend. -struct Client { +struct ClientState { /// Bytes read from the character device should be sent to the client on /// this channel. tx: mpsc::Sender, @@ -64,7 +64,7 @@ struct Client { } /// A handle held by a client that represents its connection to the backend. -pub(super) struct ClientHandle { +pub(super) struct Client { /// The client's ID. id: ClientId, @@ -75,20 +75,20 @@ pub(super) struct ClientHandle { permissions: Permissions, } -impl ClientHandle { +impl Client { pub(super) fn is_writable(&self) -> bool { self.permissions.is_writable() } } -impl Drop for ClientHandle { +impl Drop for Client { fn drop(&mut self) { let mut inner = self.backend.inner.lock().unwrap(); inner.clients.remove(&self.id); } } -impl ClientHandle { +impl Client { /// Attempts to write the bytes in `buf` to the backend. /// /// The backend may buffer some of the written bytes before sending them to @@ -159,7 +159,7 @@ struct Inner { buffer: HistoryBuffer, /// A table mapping client IDs to clients. - clients: BTreeMap, + clients: BTreeMap, /// The ID to assign to the next client to attach to this backend. next_client_id: u64, @@ -247,14 +247,16 @@ impl ConsoleBackend { read_tx: mpsc::Sender, permissions: Permissions, full_read_tx_discipline: FullReadChannelDiscipline, - ) -> ClientHandle { + ) -> Client { let mut inner = self.inner.lock().unwrap(); let id = inner.next_client_id(); - let client = - Client { tx: read_tx, read_discipline: full_read_tx_discipline }; + let client = ClientState { + tx: read_tx, + read_discipline: full_read_tx_discipline, + }; inner.clients.insert(id, client); - ClientHandle { id, backend: self.clone(), permissions } + Client { id, backend: self.clone(), permissions } } /// Returns the contents of this backend's history buffer. See diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 9fe821894..331806352 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -273,7 +273,7 @@ struct SerialTaskContext { ws: WebSocketStream, /// A handle representing this task's connection to the console backend. - backend_hdl: backend::ClientHandle, + backend_hdl: backend::Client, /// Receives output bytes from the guest. console_rx: mpsc::Receiver, From c2cfc0637a8bcb965cbee13e2ade56ff07f58695 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Fri, 17 Jan 2025 01:01:57 +0000 Subject: [PATCH 09/14] server: use simplex pipes to move read bytes between tasks --- Cargo.lock | 1 + Cargo.toml | 1 + bin/propolis-server/Cargo.toml | 1 + bin/propolis-server/src/lib/serial/backend.rs | 381 +++++++++++------- bin/propolis-server/src/lib/serial/mod.rs | 330 +++++++++------ 5 files changed, 432 insertions(+), 282 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8cf5c69be..7983eac4f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4483,6 +4483,7 @@ dependencies = [ "serde", "serde_derive", "serde_json", + "slab", "slog", "slog-async", "slog-bunyan", diff --git a/Cargo.toml b/Cargo.toml index a3abc64e5..39cf86091 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -150,6 +150,7 @@ serde_derive = "1.0" serde_json = "1.0" serde_test = "1.0.138" serde_with = "3.11.0" +slab = "0.4.9" slog = "2.7" slog-async = "2.8" slog-bunyan = "2.4.0" diff --git a/bin/propolis-server/Cargo.toml b/bin/propolis-server/Cargo.toml index c748d4dfb..f9cf9b29c 100644 --- a/bin/propolis-server/Cargo.toml +++ b/bin/propolis-server/Cargo.toml @@ -50,6 +50,7 @@ toml.workspace = true serde.workspace = true serde_derive.workspace = true serde_json.workspace = true +slab.workspace = true slog.workspace = true slog-async.workspace = true slog-bunyan.workspace = true diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index 286c14ad5..1ceb4bfd1 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -6,9 +6,10 @@ //! device and tracks the history of bytes the guest has written to that device. use std::{ - collections::BTreeMap, + future::Future, num::NonZeroUsize, sync::{Arc, Mutex}, + task::{Context, Poll}, }; use propolis::chardev::{ @@ -16,18 +17,16 @@ use propolis::chardev::{ Sink, Source, }; use tokio::{ + io::{AsyncWriteExt, SimplexStream, WriteHalf}, select, - sync::{mpsc, oneshot}, + sync::{mpsc, oneshot, watch}, }; use super::history_buffer::{HistoryBuffer, SerialHistoryOffset}; -#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -struct ClientId(u64); - /// A client's rights when accessing a character backend. #[derive(Clone, Copy)] -pub(super) enum Permissions { +enum Permissions { /// The client may both read and write to the backend. ReadWrite, @@ -35,12 +34,6 @@ pub(super) enum Permissions { ReadOnly, } -impl Permissions { - pub(super) fn is_writable(&self) -> bool { - matches!(self, Permissions::ReadWrite) - } -} - /// Determines what happens when the backend is unable to send a guest byte back /// to a client because the client's channel is full. #[derive(Clone, Copy)] @@ -52,43 +45,83 @@ pub(super) enum FullReadChannelDiscipline { Close, } -/// An individual client of a character backend. -struct ClientState { - /// Bytes read from the character device should be sent to the client on - /// this channel. - tx: mpsc::Sender, +/// An entity that receives bytes written by the guest. +/// +/// Readers are instantiated in [`ConsoleBackend::attach_client`] and passed via +/// channel to the backend's [`read_task`]. +struct Reader { + /// Bytes received from the guest are sent into this stream. + tx: WriteHalf, + + /// Determines what happens if a read fails to accept all of the bytes the + /// task wanted to write. + discipline: FullReadChannelDiscipline, + + /// Set to `true` when this reader is dropped. + defunct_tx: watch::Sender, - /// Determines what happens when the backend wants to send a byte and - /// [`Self::tx`] is full. - read_discipline: FullReadChannelDiscipline, + /// Set to `true` when this reader's associated [`Client`] is dropped. + defunct_rx: watch::Receiver, } -/// A handle held by a client that represents its connection to the backend. -pub(super) struct Client { - /// The client's ID. - id: ClientId, +impl Drop for Reader { + fn drop(&mut self) { + let _ = self.defunct_tx.send(true); + } +} - /// A reference to the backend to which the client is connected. +/// An entity that can attempt to write bytes to the guest. +struct Writer { + /// The backend to which this writer directs its writes. backend: Arc, - /// The client's backend access rights. - permissions: Permissions, + /// Set to `true` when this writer's associated [`Reader`] is dropped. + defunct_rx: watch::Receiver, } -impl Client { - pub(super) fn is_writable(&self) -> bool { - self.permissions.is_writable() +impl Writer { + /// Implements [`Client::write`] for the client that owns this [`Writer`]. + async fn write(&mut self, buf: &[u8]) -> Result { + assert!(!buf.is_empty()); + + if *self.defunct_rx.borrow_and_update() { + return Err(std::io::Error::from( + std::io::ErrorKind::ConnectionAborted, + )); + } + + Ok(self + .backend + .sink_buffer + .write(buf, self.backend.sink.as_ref()) + .await + .expect("chardev sink writes always succeed")) } } -impl Drop for Client { - fn drop(&mut self) { - let mut inner = self.backend.inner.lock().unwrap(); - inner.clients.remove(&self.id); - } +/// A client of this backend. +pub(super) struct Client { + /// The client's associated [`Writer`], if it was instantiated as a + /// read-write client. + writer: Option, + + /// The client writes `true` to this channel when it is dropped. + defunct_tx: watch::Sender, + + /// The read task writes `true` to this channel if this client's associated + /// [`Reader`] is closed. + defunct_rx: watch::Receiver, } impl Client { + pub(super) fn can_write(&self) -> bool { + self.writer.is_some() + } + + pub(super) fn get_defunct_rx(&self) -> watch::Receiver { + self.defunct_rx.clone() + } + /// Attempts to write the bytes in `buf` to the backend. /// /// The backend may buffer some of the written bytes before sending them to @@ -99,17 +132,18 @@ impl Client { /// /// - `Ok(bytes)` if the write succeeded; `bytes` is the number of bytes /// that were written. - /// - `Err(ErrorKind::PermissionDenied)` if this handle does not grant write - /// access to the device. - /// - `Err(ErrorKind::ConnectionAborted)` if this handle's client was - /// previously disconnected from the backend, e.g. because it had a full - /// read channel and was using the close-on-full-channel read discipline. + /// - `Err(std::io::Error::PermissionDenied)` if this client does not have + /// write access to the device. + /// - `Err(std::io::Error::ConnectionAborted)` if this client lost its + /// ability to write to the device because it stopped servicing reads, + /// i.e., its read channel was full and it uses the close-on-full-channel + /// read discipline. /// /// # Cancel safety /// /// The future returned by this function is cancel-safe. If it is dropped, /// it is guaranteed that no bytes were written to the backend or its - /// device. See [`SinkBuffer::write`]. + /// device. pub(super) async fn write( &mut self, buf: &[u8], @@ -118,38 +152,19 @@ impl Client { return Ok(0); } - if !matches!(self.permissions, Permissions::ReadWrite) { + let Some(writer) = self.writer.as_mut() else { return Err(std::io::Error::from( std::io::ErrorKind::PermissionDenied, )); - } + }; - // The client handle may have been invalidated if it used the "close on - // full channel" discipline. If this client is no longer active, return - // an error. - // - // The read dispatcher task doesn't synchronize with writes, so it's - // possible for the client to be invalidated after the lock is dropped. - // (It can't be held while writing to the backend because - // `SinkBuffer::write` is async.) This is OK; since reads and writes - // aren't synchronized to begin with, in any situation like this it was - // equally possible for the write to finish before the client was - // invalidated. - { - let inner = self.backend.inner.lock().unwrap(); - if !inner.clients.contains_key(&self.id) { - return Err(std::io::Error::from( - std::io::ErrorKind::ConnectionAborted, - )); - } - } + writer.write(buf).await + } +} - Ok(self - .backend - .sink_buffer - .write(buf, self.backend.sink.as_ref()) - .await - .expect("chardev sink writes always succeed")) +impl Drop for Client { + fn drop(&mut self) { + let _ = self.defunct_tx.send(true); } } @@ -157,27 +172,11 @@ impl Client { struct Inner { /// A history of the bytes read from this backend's device. buffer: HistoryBuffer, - - /// A table mapping client IDs to clients. - clients: BTreeMap, - - /// The ID to assign to the next client to attach to this backend. - next_client_id: u64, } impl Inner { fn new(history_size: usize) -> Self { - Self { - buffer: HistoryBuffer::new(history_size), - clients: BTreeMap::new(), - next_client_id: 0, - } - } - - fn next_client_id(&mut self) -> ClientId { - let id = self.next_client_id; - self.next_client_id += 1; - ClientId(id) + Self { buffer: HistoryBuffer::new(history_size) } } } @@ -197,6 +196,10 @@ pub struct ConsoleBackend { /// backend should be directed at the buffer, not at [`Self::sink`]. sink_buffer: Arc, + /// Receives the [`Reader`]s generated when new clients connect to this + /// backend. + reader_tx: mpsc::UnboundedSender, + /// A channel used to tell the backend's reader task that the backend has /// been closed. done_tx: oneshot::Sender<()>, @@ -215,48 +218,84 @@ impl ConsoleBackend { let sink = device.clone(); let source = device.clone(); + + let (reader_tx, reader_rx) = mpsc::unbounded_channel(); let (done_tx, done_rx) = oneshot::channel(); let this = Arc::new(Self { inner: Arc::new(Mutex::new(Inner::new(history_bytes))), sink, sink_buffer, + reader_tx, done_tx, }); let inner = this.inner.clone(); tokio::spawn(async move { - read_task(inner, source, done_rx).await; + read_task(inner, source, reader_rx, done_rx).await; }); this } - /// Attaches a new client to this backend, yielding a handle that the client - /// can use to issue further operations. + /// Attaches a new read-only client to this backend. Incoming bytes from the + /// guest are sent to `read_tx`. `discipline` specifies what happens if the + /// channel is full when new bytes are available to read. /// - /// # Arguments + /// The caller may disconnect its session by dropping the returned + /// [`Client`]. + pub(super) fn attach_read_only_client( + self: &Arc, + read_tx: WriteHalf, + discipline: FullReadChannelDiscipline, + ) -> Client { + self.attach_client(read_tx, Permissions::ReadOnly, discipline) + } + + /// Attaches a new read-write client to this backend. Incoming bytes from + /// the guest are sent to `read_tx`. `discipline` specifies what happens if + /// the channel is full when new bytes are available to read. /// - /// - `read_tx`: A channel to which the backend should send bytes read from - /// its device. - /// - `permissions`: The read/write permissions this client should have. - /// - `full_read_tx_discipline`: Describes what should happen if the reader - /// task ever finds that `read_tx` is full when dispatching a byte to it. - pub(super) fn attach_client( + /// The caller may disconnect its session by dropping the returned + /// [`Client`]. + pub(super) fn attach_read_write_client( + self: &Arc, + read_tx: WriteHalf, + discipline: FullReadChannelDiscipline, + ) -> Client { + self.attach_client(read_tx, Permissions::ReadWrite, discipline) + } + + /// Attaches a new client to this backend, returning a [`Client`] that + /// represents the caller's connection to the backend. + fn attach_client( self: &Arc, - read_tx: mpsc::Sender, + read_tx: WriteHalf, permissions: Permissions, - full_read_tx_discipline: FullReadChannelDiscipline, + discipline: FullReadChannelDiscipline, ) -> Client { - let mut inner = self.inner.lock().unwrap(); - let id = inner.next_client_id(); - let client = ClientState { + let (defunct_tx, defunct_rx) = watch::channel(false); + let writer = match permissions { + Permissions::ReadWrite => Some(Writer { + backend: self.clone(), + defunct_rx: defunct_rx.clone(), + }), + Permissions::ReadOnly => None, + }; + + let reader = Reader { tx: read_tx, - read_discipline: full_read_tx_discipline, + discipline, + defunct_tx: defunct_tx.clone(), + defunct_rx: defunct_rx.clone(), }; - inner.clients.insert(id, client); - Client { id, backend: self.clone(), permissions } + // Unwrapping is safe here because `read_task` is only allowed to exit + // after the backend signals `done_tx`, which only happens when the + // backend is dropped. + self.reader_tx.send(reader).unwrap(); + + Client { writer, defunct_tx, defunct_rx } } /// Returns the contents of this backend's history buffer. See @@ -321,79 +360,117 @@ mod migrate { async fn read_task( inner: Arc>, source: Arc, + mut reader_rx: mpsc::UnboundedReceiver, mut done_rx: oneshot::Receiver<()>, ) { - const READ_BUFFER_SIZE_BYTES: usize = 512; let buf = SourceBuffer::new(propolis::chardev::pollers::Params { poll_interval: std::time::Duration::from_millis(10), poll_miss_thresh: 5, - buf_size: NonZeroUsize::new(READ_BUFFER_SIZE_BYTES).unwrap(), + buf_size: NonZeroUsize::new(super::SERIAL_READ_BUFFER_SIZE).unwrap(), }); buf.attach(source.as_ref()); - let mut bytes = vec![0; READ_BUFFER_SIZE_BYTES]; + enum Event { + NewReader(Reader), + BytesRead(usize), + } + + let mut readers = vec![]; + let mut bytes = vec![0; super::SERIAL_READ_BUFFER_SIZE]; loop { - let bytes_read = select! { + let event = select! { biased; _ = &mut done_rx => { return; } - res = buf.read(bytes.as_mut_slice(), source.as_ref()) => { - res.unwrap() - } - }; + new_reader = reader_rx.recv() => { + let Some(reader) = new_reader else { + return; + }; - let to_send = &bytes[0..bytes_read]; - - // Capture a list of all the clients who should receive this byte with - // the lock held, then drop the lock before sending to any of them. Note - // that sends to clients may block for an arbitrary length of time: the - // receiver may be relaying received bytes to a websocket, and the - // remote peer may be slow to accept them. - struct CapturedClient { - id: ClientId, - tx: mpsc::Sender, - discipline: FullReadChannelDiscipline, - disconnect: bool, - } + Event::NewReader(reader) + } - let mut clients = { - let guard = inner.lock().unwrap(); - guard - .clients - .iter() - .map(|(id, client)| CapturedClient { - id: *id, - tx: client.tx.clone(), - discipline: client.read_discipline, - disconnect: false, - }) - .collect::>() + bytes_read = buf.read(bytes.as_mut_slice(), source.as_ref()) => { + Event::BytesRead(bytes_read.unwrap()) + } }; - // Prepare to delete any clients that are no longer active (i.e., who - // have dropped the receiver sides of their channels) or who are using - // the close-on-full-channel discipline and who have a full channel. - for byte in to_send { - for client in clients.iter_mut() { - client.disconnect = match client.discipline { - FullReadChannelDiscipline::Block => { - client.tx.send(*byte).await.is_err() - } - FullReadChannelDiscipline::Close => { - client.tx.try_send(*byte).is_err() + // Returns `true` if it was possible to send the entirety of `to_send` + // to `reader` without violating the reader's full-channel discipline. + async fn send_ok(reader: &mut Reader, to_send: &[u8]) -> bool { + match reader.discipline { + // Reads and writes to simplex streams (unlike channels) do not + // resolve to errors if the other half of the stream is dropped + // while the read or write is outstanding; instead, the future + // stays pending forever. + // + // To handle cases where the reader's client handle was dropped + // mid-read, select over the attempt to write and the "defunct" + // watcher and retire the client if the watcher fires first. + FullReadChannelDiscipline::Block => { + select! { + res = reader.tx.write_all(to_send) => { + res.is_ok() + } + + _ = reader.defunct_rx.changed() => { + false + } } } + // In the close-on-full-channel case it suffices to poll the + // future exactly once and see if this manages to write all of + // the data. No selection is needed here: if `write` returns + // `Poll::Pending`, `poll_once` will return an error, + // irrespective of the reason the write didn't complete. + FullReadChannelDiscipline::Close => { + matches!( + poll_once(reader.tx.write(to_send)), + Some(Ok(len)) if len == to_send.len() + ) + } } } - // Clean up any clients who met the disconnection criteria. - let mut guard = inner.lock().unwrap(); - guard.buffer.consume(to_send); - for client in clients.iter().filter(|c| c.disconnect) { - guard.clients.remove(&client.id); + match event { + Event::NewReader(r) => readers.push(r), + Event::BytesRead(bytes_read) => { + let to_send = &bytes[0..bytes_read]; + + // Send the bytes to each reader, dropping readers who fail to + // accept all the bytes on offer. + // + // It would be nice to use `Vec::retain_mut` here, but async + // closures aren't quite stable yet, so hand-roll a while loop + // instead. + let mut idx = 0; + while idx < readers.len() { + let ok = send_ok(&mut readers[idx], to_send).await; + if ok { + idx += 1; + } else { + readers.swap_remove(idx); + } + } + + inner.lock().unwrap().buffer.consume(to_send); + } } } } + +/// A helper function to poll a future `f` exactly once, returning its output +/// `Some(R)` if it is immediately ready and `None` otherwise. +fn poll_once(f: impl Future) -> Option { + tokio::pin!(f); + + let waker = futures::task::noop_waker(); + let mut cx = Context::from_waker(&waker); + match f.as_mut().poll(&mut cx) { + Poll::Ready(result) => Some(result), + Poll::Pending => None, + } +} diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 331806352..e20617978 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -23,7 +23,7 @@ //! receive new bytes. use std::{ - collections::{BTreeMap, VecDeque}, + collections::VecDeque, net::SocketAddr, sync::{Arc, Mutex}, }; @@ -32,8 +32,10 @@ use backend::ConsoleBackend; use dropshot::WebsocketConnectionRaw; use futures::{SinkExt, StreamExt}; use propolis_api_types::InstanceSerialConsoleControlMessage; +use slab::Slab; use slog::{info, warn}; use tokio::{ + io::{AsyncReadExt, ReadHalf, SimplexStream}, select, sync::{mpsc, oneshot}, task::JoinHandle, @@ -51,17 +53,23 @@ pub(crate) mod history_buffer; #[usdt::provider(provider = "propolis")] mod probes { - fn serial_event_done() {} - fn serial_event_read(b: u8) {} - fn serial_event_console_disconnect() {} - fn serial_event_ws_recv(len: usize) {} - fn serial_event_ws_error() {} - fn serial_event_ws_disconnect() {} + fn serial_task_done() {} + fn serial_task_loop(read_from_ws: bool, has_bytes_to_write: bool) {} + fn serial_task_backend_read(len: usize) {} + fn serial_task_backend_write(len: usize) {} + fn serial_task_console_disconnect() {} + fn serial_task_ws_recv(len: usize) {} + fn serial_task_ws_error() {} + fn serial_task_ws_disconnect() {} fn serial_buffer_size(n: usize) {} } +/// The size, in bytes, of the intermediate buffers used to store bytes as they +/// move from the guest character device to the individual reader tasks. +const SERIAL_READ_BUFFER_SIZE: usize = 512; + #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)] -struct ClientId(u64); +struct ClientId(usize); /// Specifies whether a client should be a read-write or read-only client. #[derive(Clone, Copy, PartialEq, Eq)] @@ -70,13 +78,36 @@ pub enum ClientKind { ReadOnly, } +/// The possible events that can be sent to a serial task's control channel. +enum ControlEvent { + /// The task has been registered into the task set and can start its main + /// loop. + Start(ClientId, backend::Client), + /// Another part of the server has dispatched a console session control + /// message to this task. + Message(InstanceSerialConsoleControlMessage), +} + +impl std::fmt::Debug for ControlEvent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Start(arg0, _arg1) => { + f.debug_tuple("Start").field(arg0).finish() + } + Self::Message(arg0) => { + f.debug_tuple("Message").field(arg0).finish() + } + } + } +} + /// Tracks an individual client of this console and its associated tokio task. struct ClientTask { /// The join handle for this client's task. hdl: JoinHandle<()>, /// Receives server-level commands and notifications about this connection. - control_tx: mpsc::Sender, + control_tx: mpsc::UnboundedSender, /// Triggered when the serial console manager shuts down. done_tx: oneshot::Sender<()>, @@ -85,31 +116,28 @@ struct ClientTask { /// The collection of [`ClientTask`]s belonging to a console manager. #[derive(Default)] struct ClientTasks { - /// A table mapping task IDs to task structs. - tasks: BTreeMap, - - /// The next task ID to assign. - next_id: u64, + /// The set of registered tasks. + tasks: Slab, /// The ID of the current read-write client, if there is one. rw_client_id: Option, } impl ClientTasks { - fn next_id(&mut self) -> ClientId { - let id = self.next_id; - self.next_id += 1; - ClientId(id) + fn add(&mut self, task: ClientTask) -> ClientId { + ClientId(self.tasks.insert(task)) } - /// Removes the task with ID `id` from the task collection. - fn remove_by_id(&mut self, id: ClientId) { - self.tasks.remove(&id); - match self.rw_client_id { - Some(existing_id) if existing_id == id => { - self.rw_client_id = None; + /// Ensures the task with ID `id` is removed from this task set. + fn ensure_removed_by_id(&mut self, id: ClientId) { + if self.tasks.contains(id.0) { + self.tasks.remove(id.0); + match self.rw_client_id { + Some(existing_id) if existing_id == id => { + self.rw_client_id = None; + } + _ => {} } - _ => {} } } @@ -117,7 +145,7 @@ impl ClientTasks { /// both the read-write client position and the task table) and returns it. fn remove_rw_client(&mut self) -> Option { if let Some(id) = self.rw_client_id.take() { - self.tasks.remove(&id) + Some(self.tasks.remove(id.0)) } else { None } @@ -147,7 +175,9 @@ impl SerialConsoleManager { std::mem::take(&mut guard.tasks) }; - let mut tasks: Vec<_> = tasks.into_values().collect(); + let mut tasks: Vec = + tasks.into_iter().map(|e| e.1).collect(); + for task in &mut tasks { let (tx, _rx) = oneshot::channel(); let done_tx = std::mem::replace(&mut task.done_tx, tx); @@ -164,75 +194,80 @@ impl SerialConsoleManager { ws: WebSocketStream, kind: ClientKind, ) { - const SERIAL_CHANNEL_SIZE: usize = 256; - let (console_tx, console_rx) = mpsc::channel(SERIAL_CHANNEL_SIZE); - let (control_tx, control_rx) = mpsc::channel(1); - let (task_start_tx, task_start_rx) = oneshot::channel(); + let (console_rx, console_tx) = + tokio::io::simplex(SERIAL_READ_BUFFER_SIZE); + let (control_tx, control_rx) = mpsc::unbounded_channel(); let (task_done_tx, task_done_rx) = oneshot::channel(); - let (permissions, discipline) = match kind { - ClientKind::ReadWrite => ( - backend::Permissions::ReadWrite, - backend::FullReadChannelDiscipline::Block, - ), - ClientKind::ReadOnly => ( - backend::Permissions::ReadOnly, - backend::FullReadChannelDiscipline::Close, - ), + let discipline = match kind { + ClientKind::ReadWrite => backend::FullReadChannelDiscipline::Block, + ClientKind::ReadOnly => backend::FullReadChannelDiscipline::Close, }; - // If this client is to be the read-write client, and there's already - // such a client, the old client needs to be evicted and stopped before - // the new client can start. Create and install the new task under the - // lock, then drop the lock before running down the old task. let prev_rw_task; + let new_id; { let mut client_tasks = self.client_tasks.lock().unwrap(); - let client_id = client_tasks.next_id(); + + // There can only ever be one read-write client at a time. If one + // already exists, remove it from the task tracker. It will be + // replaced with this registrant's ID before the lock is dropped. prev_rw_task = if kind == ClientKind::ReadWrite { client_tasks.remove_rw_client() } else { None }; - let backend_hdl = - self.backend.attach_client(console_tx, permissions, discipline); - let ctx = SerialTaskContext { log: self.log.clone(), ws, - backend_hdl, console_rx, control_rx, - start_rx: task_start_rx, done_rx: task_done_rx, client_tasks: self.client_tasks.clone(), - client_id, }; let task = ClientTask { - hdl: tokio::spawn(async move { serial_task(ctx).await }), - control_tx, + hdl: tokio::spawn(serial_task(ctx)), + control_tx: control_tx.clone(), done_tx: task_done_tx, }; - client_tasks.tasks.insert(client_id, task); + new_id = client_tasks.add(task); if kind == ClientKind::ReadWrite { assert!(client_tasks.rw_client_id.is_none()); - client_tasks.rw_client_id = Some(client_id); + client_tasks.rw_client_id = Some(new_id); } - } + }; + + // Register with the backend to get a client handle to pass back to the + // new client task. + // + // Note that if the read-full discipline is Close and the guest is very + // busily writing data, it is possible for the backend to decide the + // channel is unresponsive before the client task is ever told to enter + // its main loop. While not ideal, this is legal because it is also + // possible for the channel to have filled between the time the task + // processed its "start" message and the time it actually began to poll + // its read channel for data. In either case the client will be + // disconnected immediately and not get to read anything. + let backend_hdl = match kind { + ClientKind::ReadWrite => { + self.backend.attach_read_write_client(console_tx, discipline) + } + ClientKind::ReadOnly => { + self.backend.attach_read_only_client(console_tx, discipline) + } + }; + // If this task displaced a previous read-write task, make sure that + // task has exited before allowing the new one to enter its main loop + // to avoid accidentally interleaving multiple tasks' writes. if let Some(task) = prev_rw_task { let _ = task.done_tx.send(()); let _ = task.hdl.await; } - // Don't let the new task start until any old tasks have been run down. - // This prevents writes from an old R/W task from being interleaved with - // writes from a new one. - task_start_tx - .send(()) - .expect("new serial task shouldn't exit before starting"); + let _ = control_tx.send(ControlEvent::Start(new_id, backend_hdl)); } /// Notifies all active clients that the instance is migrating to a new @@ -248,18 +283,18 @@ impl SerialConsoleManager { let client_tasks = self.client_tasks.lock().unwrap(); client_tasks .tasks - .values() - .map(|client| client.control_tx.clone()) + .iter() + .map(|(_, client)| client.control_tx.clone()) .collect() }; for client in clients { - let _ = client - .send(InstanceSerialConsoleControlMessage::Migrating { + let _ = client.send(ControlEvent::Message( + InstanceSerialConsoleControlMessage::Migrating { destination, from_start, - }) - .await; + }, + )); } } } @@ -272,18 +307,11 @@ struct SerialTaskContext { /// The websocket connection this task uses to communicate with its client. ws: WebSocketStream, - /// A handle representing this task's connection to the console backend. - backend_hdl: backend::Client, - /// Receives output bytes from the guest. - console_rx: mpsc::Receiver, + console_rx: ReadHalf, - /// Receives control commands that should be relayed to websocket clients - /// (e.g. impending migrations). - control_rx: mpsc::Receiver, - - /// Signaled to direct a task to enter its processing loop. - start_rx: oneshot::Receiver<()>, + /// TODO(gjc) + control_rx: mpsc::UnboundedReceiver, /// Signaled to tell a task to shut down. done_rx: oneshot::Receiver<()>, @@ -291,38 +319,36 @@ struct SerialTaskContext { /// A reference to the manager's client task map, used to deregister this /// task on disconnection. client_tasks: Arc>, - - /// This task's ID, used to deregister this task on disconnection. - client_id: ClientId, } async fn serial_task( SerialTaskContext { log, ws, - mut backend_hdl, mut console_rx, mut control_rx, - start_rx, mut done_rx, client_tasks, - client_id, }: SerialTaskContext, ) { enum Event { Done, - ConsoleRead(u8), + ReadFromBackend(usize), ConsoleDisconnected, WroteToBackend(Result), - ControlMessage(InstanceSerialConsoleControlMessage), + Control(ControlEvent), WebsocketMessage(Message), WebsocketError(tokio_tungstenite::tungstenite::Error), WebsocketDisconnected, } - // Wait to be told to start the main loop. This allows the task creation - // logic to ensure that there's only one active console writer at a time. - let _ = start_rx.await; + let (client_id, mut backend_hdl) = match control_rx.recv().await { + Some(ControlEvent::Start(id, hdl)) => (id, hdl), + Some(e) => { + panic!("serial task's first message should be Start but was {e:?}") + } + None => panic!("serial task's control channel closed unexpectedly"), + }; info!( log, @@ -330,39 +356,61 @@ async fn serial_task( "client_id" => client_id.0, ); - let mut remaining_to_send: VecDeque = VecDeque::new(); + let mut client_defunct = backend_hdl.get_defunct_rx(); + let mut read_from_guest = vec![0; SERIAL_READ_BUFFER_SIZE]; + let mut remaining_to_send = VecDeque::new(); let (mut sink, mut stream) = ws.split(); let mut close_reason: Option<&'static str> = None; loop { use futures::future::Either; - let (will_send, send_fut) = - if backend_hdl.is_writable() && !remaining_to_send.is_empty() { + // If this is a read-write client that has read some bytes from the + // websocket peer, create a future that will attempt to write those + // bytes to the console backend, and hold off on reading more data from + // the peer until those bytes are sent. + // + // Otherwise, create an always-pending "write to backend" future and + // accept another message from the websocket. + // + // Read from the websocket even if the client is read-only to avoid + // putting backpressure on the remote peer (and possibly causing it to + // hang). This has the added advantage that if the peer disconnects, the + // read-from-peer future will yield an error, allowing this task to + // clean itself up without having to wait for another occasion to send a + // message to the peer. + let (read_from_ws, backend_write_fut) = + if backend_hdl.can_write() && !remaining_to_send.is_empty() { remaining_to_send.make_contiguous(); ( - true, + false, Either::Left( backend_hdl.write(remaining_to_send.as_slices().0), ), ) } else { - (false, Either::Right(futures::future::pending())) + (true, Either::Right(futures::future::pending())) }; - // If there are no bytes to be sent to the guest, accept another message - // from the websocket. - let ws_fut = if !will_send { + let ws_fut = if read_from_ws { Either::Left(stream.next()) } else { Either::Right(futures::future::pending()) }; + probes::serial_task_loop!(|| ( + read_from_ws, + !remaining_to_send.is_empty() + )); + let event = select! { // The priority of these branches is important: // - // 1. Requests to stop the client take precedence over everything - // else. - // 2. New bytes written by the guest need to be processed before any + // 1. If the console manager asks to stop this task, it should stop + // immediately without doing any more work (the VM is going + // away). + // 2. Similarly, if the backend's read task marked this client as + // defunct, the task should exit right away. + // 3. New bytes written by the guest need to be processed before any // other requests: if a guest outputs a byte while a read-write // client is attached, the relevant vCPU will be blocked until // the client processes the byte. @@ -372,20 +420,24 @@ async fn serial_task( Event::Done } - res = console_rx.recv() => { + _ = client_defunct.changed() => { + Event::ConsoleDisconnected + } + + res = console_rx.read(read_from_guest.as_mut_slice()) => { match res { - Some(b) => Event::ConsoleRead(b), - None => Event::ConsoleDisconnected, + Ok(bytes_read) => Event::ReadFromBackend(bytes_read), + Err(_) => Event::ConsoleDisconnected, } } control = control_rx.recv() => { - Event::ControlMessage(control.expect( + Event::Control(control.expect( "serial control channel should outlive its task" )) } - res = send_fut => { + res = backend_write_fut => { Event::WroteToBackend(res) } @@ -400,47 +452,66 @@ async fn serial_task( match event { Event::Done => { - probes::serial_event_done!(|| ()); + probes::serial_task_done!(|| ()); close_reason = Some("VM stopped"); break; } - Event::ConsoleRead(b) => { - probes::serial_event_read!(|| (b)); + Event::ReadFromBackend(len) => { + probes::serial_task_backend_read!(|| (len)); // In general it's OK to wait for this byte to be sent, since // read-write clients are allowed to block the backend and // read-only clients will be disconnected if they don't process // bytes in a timely manner. That said, even read-write clients // need to monitor `done_rx` here; otherwise a badly-behaved - // client can prevent a VM from stopping. - select! { + // remote peer can prevent a VM from stopping. + let res = select! { biased; _ = &mut done_rx => { - probes::serial_event_done!(|| ()); + probes::serial_task_done!(|| ()); close_reason = Some("VM stopped"); break; } - _ = sink.send(Message::binary(vec![b])) => {} + res = sink.send( + Message::binary(read_from_guest[..len].to_vec()) + ) => { res } + }; + + if let Err(e) = res { + info!( + log, "failed to write to a console client"; + "client_id" => client_id.0, + "error" => ?e + ); + close_reason = Some("sending bytes to the client failed"); + break; } } Event::ConsoleDisconnected => { - probes::serial_event_console_disconnect!(|| ()); + probes::serial_task_console_disconnect!(|| ()); info!( - log, "console backend dropped its client channel"; + log, "console backend closed its client channel"; "client_id" => client_id.0 ); break; } - Event::ControlMessage(control) => { - // As above, don't let a client that's not processing control + Event::Control(control) => { + let control = match control { + ControlEvent::Start(..) => { + panic!("received a start message after starting"); + } + ControlEvent::Message(msg) => msg, + }; + + // As above, don't let a peer that's not processing control // messages prevent the VM from stopping. select! { biased; _ = &mut done_rx => { - probes::serial_event_done!(|| ()); + probes::serial_task_done!(|| ()); close_reason = Some("VM stopped"); break; } @@ -460,7 +531,7 @@ async fn serial_task( let reason = if e.kind() == std::io::ErrorKind::ConnectionAborted { - "read-write console connection overtaken" + "read-write console connection closed by backend" } else { "error writing to console backend" }; @@ -478,22 +549,21 @@ async fn serial_task( } }; + probes::serial_task_backend_write!(|| written); drop(remaining_to_send.drain(..written)); } Event::WebsocketMessage(msg) => { - match (backend_hdl.is_writable(), msg) { - (true, Message::Binary(bytes)) => { - probes::serial_event_ws_recv!(|| (bytes.len())); - remaining_to_send.extend(bytes.as_slice()); - } - (false, Message::Binary(_)) => { - continue; - } - (_, _) => continue, + assert!( + remaining_to_send.is_empty(), + "should only read from the socket when the buffer is empty" + ); + if let Message::Binary(bytes) = msg { + probes::serial_task_ws_recv!(|| (bytes.len())); + remaining_to_send.extend(bytes.as_slice()); } } Event::WebsocketError(e) => { - probes::serial_event_ws_error!(|| ()); + probes::serial_task_ws_error!(|| ()); warn!( log, "serial console websocket error"; "client_id" => client_id.0, @@ -502,7 +572,7 @@ async fn serial_task( break; } Event::WebsocketDisconnected => { - probes::serial_event_ws_disconnect!(|| ()); + probes::serial_task_ws_disconnect!(|| ()); info!( log, "serial console client disconnected"; "client_id" => client_id.0 @@ -530,5 +600,5 @@ async fn serial_task( ); } - client_tasks.lock().unwrap().remove_by_id(client_id); + client_tasks.lock().unwrap().ensure_removed_by_id(client_id); } From a4b0c219fe9e4f5cf210daed9a4d40c925668a00 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Sat, 18 Jan 2025 05:12:40 +0000 Subject: [PATCH 10/14] server: improve docs --- bin/propolis-server/src/lib/serial/backend.rs | 50 +++++++++++++------ bin/propolis-server/src/lib/serial/mod.rs | 2 +- 2 files changed, 37 insertions(+), 15 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index 1ceb4bfd1..97386527e 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -4,6 +4,33 @@ //! A backend that provides external clients with access to a Propolis character //! device and tracks the history of bytes the guest has written to that device. +//! +//! The [`ConsoleBackend`] type provides an interface to read and write a +//! character device. The backend itself holds a reference to the device and +//! manages a task that reads bytes from the device and issues them to any +//! registered readers. +//! +//! Users of the backend call [`ConsoleBackend::attach_read_only_client`] or +//! [`ConsoleBackend::attach_read_write_client`] to obtain a [`Client`] +//! representing the new connection. Each client has a corresponding [`Reader`] +//! owned by the backend's [`read_task`]; each `Reader` contains a tokio +//! [`SimplexStream`] to which the read task sends bytes written by the guest. +//! Read-write clients own a reference to a [`Writer`] that can write bytes to a +//! sink associated with the backend's character device. +//! +//! Clients may be disconnected in one of two ways: +//! +//! - A client's owner can just drop its `Client` struct. +//! - If the client was configured to use the "close on full channel" read +//! discipline, and the read task is unable to send some bytes to a reader's +//! stream because the channel is full, the client is invalidated. +//! +//! To avoid circular references, clients and their readers don't refer to each +//! other directly. Instead, they share both sides of a `tokio::watch` to which +//! they publish `true` when a connection has been invalidated, regardless of +//! who invalidated it. The receiver end of this channel is available to users +//! through [`Client::get_defunct_rx`] to allow clients' users to learn when +//! their clients have been closed. use std::{ future::Future, @@ -74,22 +101,14 @@ impl Drop for Reader { struct Writer { /// The backend to which this writer directs its writes. backend: Arc, - - /// Set to `true` when this writer's associated [`Reader`] is dropped. - defunct_rx: watch::Receiver, } impl Writer { - /// Implements [`Client::write`] for the client that owns this [`Writer`]. + /// Sends the bytes in `buf` to the sink associated with this writer's + /// backend. async fn write(&mut self, buf: &[u8]) -> Result { assert!(!buf.is_empty()); - if *self.defunct_rx.borrow_and_update() { - return Err(std::io::Error::from( - std::io::ErrorKind::ConnectionAborted, - )); - } - Ok(self .backend .sink_buffer @@ -158,6 +177,12 @@ impl Client { )); }; + if *self.defunct_rx.borrow_and_update() { + return Err(std::io::Error::from( + std::io::ErrorKind::ConnectionAborted, + )); + } + writer.write(buf).await } } @@ -276,10 +301,7 @@ impl ConsoleBackend { ) -> Client { let (defunct_tx, defunct_rx) = watch::channel(false); let writer = match permissions { - Permissions::ReadWrite => Some(Writer { - backend: self.clone(), - defunct_rx: defunct_rx.clone(), - }), + Permissions::ReadWrite => Some(Writer { backend: self.clone() }), Permissions::ReadOnly => None, }; diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index e20617978..2f550f476 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -310,7 +310,7 @@ struct SerialTaskContext { /// Receives output bytes from the guest. console_rx: ReadHalf, - /// TODO(gjc) + /// Receives commands and notifications from the console manager. control_rx: mpsc::UnboundedReceiver, /// Signaled to tell a task to shut down. From 9b0ec101f4faa9b98953b3933b4a19cb84306b8b Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Wed, 22 Jan 2025 00:46:17 +0000 Subject: [PATCH 11/14] server: just use a regular channel --- bin/propolis-server/src/lib/serial/mod.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index 2f550f476..af4e1c28b 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -107,7 +107,7 @@ struct ClientTask { hdl: JoinHandle<()>, /// Receives server-level commands and notifications about this connection. - control_tx: mpsc::UnboundedSender, + control_tx: mpsc::Sender, /// Triggered when the serial console manager shuts down. done_tx: oneshot::Sender<()>, @@ -196,7 +196,7 @@ impl SerialConsoleManager { ) { let (console_rx, console_tx) = tokio::io::simplex(SERIAL_READ_BUFFER_SIZE); - let (control_tx, control_rx) = mpsc::unbounded_channel(); + let (control_tx, control_rx) = mpsc::channel(1); let (task_done_tx, task_done_rx) = oneshot::channel(); let discipline = match kind { ClientKind::ReadWrite => backend::FullReadChannelDiscipline::Block, @@ -267,7 +267,7 @@ impl SerialConsoleManager { let _ = task.hdl.await; } - let _ = control_tx.send(ControlEvent::Start(new_id, backend_hdl)); + let _ = control_tx.send(ControlEvent::Start(new_id, backend_hdl)).await; } /// Notifies all active clients that the instance is migrating to a new @@ -289,12 +289,14 @@ impl SerialConsoleManager { }; for client in clients { - let _ = client.send(ControlEvent::Message( - InstanceSerialConsoleControlMessage::Migrating { - destination, - from_start, - }, - )); + let _ = client + .send(ControlEvent::Message( + InstanceSerialConsoleControlMessage::Migrating { + destination, + from_start, + }, + )) + .await; } } } @@ -311,7 +313,7 @@ struct SerialTaskContext { console_rx: ReadHalf, /// Receives commands and notifications from the console manager. - control_rx: mpsc::UnboundedReceiver, + control_rx: mpsc::Receiver, /// Signaled to tell a task to shut down. done_rx: oneshot::Receiver<()>, From 7f1f20c602a0e043e351c70810c44a2f3a0605d4 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Wed, 22 Jan 2025 00:53:42 +0000 Subject: [PATCH 12/14] server: add explanatory comment --- bin/propolis-server/src/lib/serial/mod.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index af4e1c28b..bf58ed17a 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -130,6 +130,11 @@ impl ClientTasks { /// Ensures the task with ID `id` is removed from this task set. fn ensure_removed_by_id(&mut self, id: ClientId) { + // `slab` will panic if asked to remove an ID that isn't present in the + // collection. This can happen if a read-write task is evicted by a new + // read-write task, and then the old task exits. Handling this case here + // allows new client connections to take ownership of a retired + // read-write task and await its completion while holding no locks. if self.tasks.contains(id.0) { self.tasks.remove(id.0); match self.rw_client_id { From bca35fe4e1a4ffc66216bcf4a7c6dc678ac23081 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Wed, 22 Jan 2025 01:02:47 +0000 Subject: [PATCH 13/14] server: misc PR feedback --- bin/propolis-server/src/lib/serial/backend.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/bin/propolis-server/src/lib/serial/backend.rs b/bin/propolis-server/src/lib/serial/backend.rs index 97386527e..7ab958f22 100644 --- a/bin/propolis-server/src/lib/serial/backend.rs +++ b/bin/propolis-server/src/lib/serial/backend.rs @@ -256,9 +256,7 @@ impl ConsoleBackend { }); let inner = this.inner.clone(); - tokio::spawn(async move { - read_task(inner, source, reader_rx, done_rx).await; - }); + tokio::spawn(read_task(inner, source, reader_rx, done_rx)); this } @@ -416,7 +414,9 @@ async fn read_task( } bytes_read = buf.read(bytes.as_mut_slice(), source.as_ref()) => { - Event::BytesRead(bytes_read.unwrap()) + Event::BytesRead( + bytes_read.expect("SourceBuffer reads are infallible") + ) } }; From d55ad7e835a400729bdb3c8c1d773addee260c58 Mon Sep 17 00:00:00 2001 From: Greg Colombo Date: Wed, 22 Jan 2025 01:09:02 +0000 Subject: [PATCH 14/14] server: don't keep write bytes from read-only clients --- bin/propolis-server/src/lib/serial/mod.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/bin/propolis-server/src/lib/serial/mod.rs b/bin/propolis-server/src/lib/serial/mod.rs index bf58ed17a..1f2f881da 100644 --- a/bin/propolis-server/src/lib/serial/mod.rs +++ b/bin/propolis-server/src/lib/serial/mod.rs @@ -566,7 +566,15 @@ async fn serial_task( ); if let Message::Binary(bytes) = msg { probes::serial_task_ws_recv!(|| (bytes.len())); - remaining_to_send.extend(bytes.as_slice()); + + // Throw away the incoming bytes if they can't be written to + // the backend. This allows the next loop iteration to read + // from the socket agian, which gives it a convenient way of + // noticing that a client has disconnected even if nothing + // is currently being echoed to it. + if backend_hdl.can_write() { + remaining_to_send.extend(bytes.as_slice()); + } } } Event::WebsocketError(e) => {