diff --git a/Cargo.lock b/Cargo.lock index 286f665ec19..d3fa7496f80 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -935,7 +935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2409,7 +2409,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -2497,7 +2497,7 @@ dependencies = [ "hyper 1.6.0", "libc", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -2843,6 +2843,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4092,7 +4103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.101", @@ -5250,6 +5261,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spacetime-module" version = "0.1.0" @@ -5718,6 +5739,7 @@ dependencies = [ "spacetimedb-commitlog", "spacetimedb-paths", "spacetimedb-sats", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -6063,7 +6085,7 @@ dependencies = [ "serde", "serde_json", "sled", - "socket2", + "socket2 0.5.9", "spacetimedb-client-api", "spacetimedb-client-api-messages", "spacetimedb-core", @@ -6793,20 +6815,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.0" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio 1.0.3", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6862,7 +6886,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.9.1", - "socket2", + "socket2 0.5.9", "tokio", "tokio-util", "whoami", @@ -7856,7 +7880,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 4221cb9f009..5d768046a7f 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -3,7 +3,7 @@ use std::process::ExitCode; use clap::{Arg, Command}; use spacetimedb_cli::*; use spacetimedb_paths::cli::CliTomlPath; -use spacetimedb_paths::{RootDir, SpacetimePaths}; +use spacetimedb_paths::RootDir; // Note that the standalone server is invoked through standaline/src/main.rs, so you will // also want to set the allocator there. @@ -24,6 +24,8 @@ static GLOBAL: MiMalloc = MiMalloc; #[cfg(not(feature = "markdown-docs"))] #[tokio::main] async fn main() -> anyhow::Result { + use spacetimedb_paths::SpacetimePaths; + // Compute matches before loading the config, because `Config` has an observable `drop` method // (which deletes a lockfile), // and Clap calls `exit` on parse failure rather than panicking, so destructors never run. diff --git a/crates/cli/src/subcommands/sql.rs b/crates/cli/src/subcommands/sql.rs index 5a79190a45b..bde8ec00b25 100644 --- a/crates/cli/src/subcommands/sql.rs +++ b/crates/cli/src/subcommands/sql.rs @@ -34,6 +34,13 @@ pub fn cli() -> clap::Command { .conflicts_with("query") .help("Instead of using a query, run an interactive command prompt for `SQL` expressions"), ) + .arg( + Arg::new("confirmed") + .required(false) + .long("confirmed") + .action(ArgAction::SetTrue) + .help("Instruct the server to deliver only updates of confirmed transactions"), + ) .arg(common_args::anonymous()) .arg(common_args::server().help("The nickname, host name or URL of the server hosting the database")) .arg(common_args::yes()) @@ -178,11 +185,15 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error crate::repl::exec(con).await?; } else { let query = args.get_one::("query").unwrap(); + let confirmed = args.get_flag("confirmed"); let con = parse_req(config, args).await?; - let api = ClientApi::new(con); + let mut api = ClientApi::new(con).sql(); + if confirmed { + api = api.query(&[("confirmed", "true")]); + } - run_sql(api.sql(), query, false).await?; + run_sql(api, query, false).await?; } Ok(()) } diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs index c7b5058ba42..82255dc406e 100644 --- a/crates/cli/src/subcommands/subscribe.rs +++ b/crates/cli/src/subcommands/subscribe.rs @@ -2,7 +2,7 @@ use anyhow::Context; use clap::{value_parser, Arg, ArgAction, ArgMatches}; use futures::{Sink, SinkExt, TryStream, TryStreamExt}; use http::header; -use http::uri::Scheme; +use http::uri::{PathAndQuery, Scheme}; use serde_json::Value; use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat}; use spacetimedb_data_structures::map::HashMap; @@ -65,6 +65,13 @@ pub fn cli() -> clap::Command { .action(ArgAction::SetTrue) .help("Print the initial update for the queries."), ) + .arg( + Arg::new("confirmed") + .required(false) + .long("confirmed") + .action(ArgAction::SetTrue) + .help("Instruct the server to deliver only updates of confirmed transactions"), + ) .arg(common_args::anonymous()) .arg(common_args::yes()) .arg(common_args::server().help("The nickname, host name or URL of the server hosting the database")) @@ -130,6 +137,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error let num = args.get_one::("num-updates").copied(); let timeout = args.get_one::("timeout").copied(); let print_initial_update = args.get_flag("print_initial_update"); + let confirmed = args.get_flag("confirmed"); let conn = parse_req(config, args).await?; let api = ClientApi::new(conn); @@ -146,6 +154,9 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error s } }); + if confirmed { + append_query_param(&mut uri, ("confirmed", "true")); + } // Create the websocket request. let mut req = http::Uri::from_parts(uri)?.into_client_request()?; @@ -334,3 +345,21 @@ fn format_output_json(msg: &ws::DatabaseUpdate, schema: &RawModuleDe Ok(output) } + +fn append_query_param(uri: &mut http::uri::Parts, (k, v): (&str, &str)) { + let (mut path, query) = uri + .path_and_query + .as_ref() + .map(|pq| (pq.path().to_owned(), pq.query())) + .unwrap_or_default(); + path.push('?'); + if let Some(query) = query { + path.push_str(query); + path.push('&'); + } + path.push_str(k); + path.push('='); + path.push_str(v); + + uri.path_and_query = Some(PathAndQuery::from_maybe_shared(path).unwrap()); +} diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index ac82a3e142d..71638a0897b 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -67,6 +67,7 @@ impl Host { &self, auth: AuthCtx, database: Database, + confirmed_read: bool, body: String, ) -> axum::response::Result>> { let module_host = self @@ -74,7 +75,7 @@ impl Host { .await .map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?; - let json = self + let (tx_offset, durable_offset, json) = self .host_controller .using_database( database, @@ -115,17 +116,28 @@ impl Host { .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) .collect(); - Ok(vec![SqlStmtResult { - schema, - rows: result.rows, - total_duration_micros: total_duration.as_micros() as u64, - stats: SqlStmtStats::from_metrics(&result.metrics), - }]) + Ok(( + result.tx_offset, + db.durable_tx_offset(), + vec![SqlStmtResult { + schema, + rows: result.rows, + total_duration_micros: total_duration.as_micros() as u64, + stats: SqlStmtStats::from_metrics(&result.metrics), + }], + )) }, ) .await .map_err(log_and_500)??; + if confirmed_read { + if let Some(mut durable_offset) = durable_offset { + let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?; + durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?; + } + } + Ok(json) } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 91a67f9c406..601d96744ce 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -382,12 +382,17 @@ pub struct SqlParams { } #[derive(Deserialize)] -pub struct SqlQueryParams {} +pub struct SqlQueryParams { + /// If `true`, return the query result only after its transaction offset + /// is confirmed to be durable. + #[serde(default)] + confirmed: bool, +} pub async fn sql( State(worker_ctx): State, Path(SqlParams { name_or_identity }): Path, - Query(SqlQueryParams {}): Query, + Query(SqlQueryParams { confirmed }): Query, Extension(auth): Extension, body: String, ) -> axum::response::Result @@ -410,7 +415,7 @@ where .await .map_err(log_and_500)? .ok_or(StatusCode::NOT_FOUND)?; - let json = host.exec_sql(auth, database, body).await?; + let json = host.exec_sql(auth, database, confirmed, body).await?; let total_duration = json.iter().fold(0, |acc, x| acc + x.total_duration_micros); diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index 2aa2829ed1d..91c27f7bfc2 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -25,8 +25,8 @@ use spacetimedb::client::messages::{ serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol, }; use spacetimedb::client::{ - ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError, - MeteredReceiver, MeteredSender, Protocol, + ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError, + MessageHandleError, MeteredReceiver, MeteredSender, Protocol, }; use spacetimedb::host::module_host::ClientConnectedError; use spacetimedb::host::NoSuchModule; @@ -80,6 +80,12 @@ pub struct SubscribeQueryParams { /// This knob works by setting other, more specific, knobs to the value. #[serde(default)] pub light: bool, + /// If `true`, send the subscription updates only after the transaction + /// offset they're computed from is confirmed to be durable. + /// + /// If `false`, send them immediately. + #[serde(default)] + pub confirmed: bool, } pub fn generate_random_connection_id() -> ConnectionId { @@ -93,6 +99,7 @@ pub async fn handle_websocket( connection_id, compression, light, + confirmed, }): Query, forwarded_for: Option>, Extension(auth): Extension, @@ -127,6 +134,7 @@ where protocol, compression, tx_update_full: !light, + confirmed_reads: confirmed, }; // TODO: Should also maybe refactor the code and the protocol to allow a single websocket @@ -199,7 +207,7 @@ where "websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" ); - let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); + let actor = |client, receiver| ws_client_actor(ws_opts, client, ws, receiver); let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await; @@ -213,7 +221,7 @@ where token: identity_token, connection_id, }; - if let Err(e) = client.send_message(message) { + if let Err(e) = client.send_message(None, message) { log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}"); } }); @@ -341,7 +349,7 @@ async fn ws_client_actor( options: WebSocketOptions, client: ClientConnection, ws: WebSocketStream, - sendrx: MeteredReceiver, + sendrx: ClientConnectionReceiver, ) { // ensure that even if this task gets cancelled, we always cleanup the connection let mut client = scopeguard::guard(client, |client| { @@ -357,7 +365,7 @@ async fn ws_client_actor_inner( client: &mut ClientConnection, config: WebSocketOptions, ws: WebSocketStream, - sendrx: MeteredReceiver, + sendrx: ClientConnectionReceiver, ) { let database = client.module().info().database_identity; let client_id = client.id; @@ -965,6 +973,33 @@ enum UnorderedWsMessage { Error(MessageExecutionError), } +/// Abstraction over [`ClientConnectionReceiver`], so tests can use a plain +/// [`mpsc::Receiver`]. +trait Receiver { + fn recv(&mut self) -> impl Future> + Send; + fn close(&mut self); +} + +impl Receiver for ClientConnectionReceiver { + async fn recv(&mut self) -> Option { + ClientConnectionReceiver::recv(self).await + } + + fn close(&mut self) { + ClientConnectionReceiver::close(self); + } +} + +impl Receiver for mpsc::Receiver { + async fn recv(&mut self) -> Option { + mpsc::Receiver::recv(self).await + } + + fn close(&mut self) { + mpsc::Receiver::close(self); + } +} + /// Sink that sends outgoing messages to the `ws` sink. /// /// Consumes `messages`, which yields subscription updates and reducer call @@ -990,10 +1025,9 @@ async fn ws_send_loop( state: Arc, config: ClientConfig, mut ws: impl Sink + Unpin, - mut messages: MeteredReceiver, + mut messages: impl Receiver, mut unordered: mpsc::UnboundedReceiver, ) { - let mut messages_buf = Vec::with_capacity(32); let mut serialize_buf = SerializeBuffer::new(config); loop { @@ -1057,26 +1091,35 @@ async fn ws_send_loop( } }, - n = messages.recv_many(&mut messages_buf, 32), if !closed => { - if n == 0 { - continue; - } - log::trace!("sending {n} outgoing messages"); - for msg in messages_buf.drain(..n) { - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - msg.workload().zip(msg.num_rows()), - &mut ws, - msg - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - return; + maybe_message = messages.recv(), if !closed => { + let Some(message) = maybe_message else { + // The message sender was dropped, even though no close + // handshake is in progress. This should not normally happen, + // but initiating close seems like the correct thing to do. + log::warn!("message sender dropped without close handshake"); + if let Err(e) = ws.send(WsMessage::Close(None)).await { + log::warn!("error sending close frame: {e:#}"); + break; } + state.close(); + // Continue so that `ws_client_actor` keeps waiting for an + // acknowledgement from the client. + continue; + }; + log::trace!("sending outgoing message"); + let (msg_alloc, res) = send_message( + &state.database, + config, + serialize_buf, + message.workload().zip(message.num_rows()), + &mut ws, + message + ).await; + serialize_buf = msg_alloc; + + if let Err(e) = res { + log::warn!("websocket send error: {e}"); + return; } }, } @@ -1211,7 +1254,7 @@ mod tests { sink, stream, }; use pretty_assertions::assert_matches; - use spacetimedb::client::ClientName; + use spacetimedb::client::{messages::SerializableMessage, ClientName}; use tokio::time::sleep; use super::*; @@ -1389,10 +1432,15 @@ mod tests { async fn send_loop_terminates_when_unordered_closed() { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); - let send_loop = ws_send_loop(state, ClientConfig::for_test(), sink::drain(), messages, unordered_rx); + let send_loop = ws_send_loop( + state, + ClientConfig::for_test(), + sink::drain(), + messages_rx, + unordered_rx, + ); pin_mut!(send_loop); assert!(is_pending(&mut send_loop).await); @@ -1407,14 +1455,13 @@ mod tests { async fn send_loop_close_message_closes_state_and_messages() { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), sink::drain(), - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); @@ -1455,24 +1502,23 @@ mod tests { })), ]; - for msg in input { + for message in input { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), UnfeedableSink, - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); - match msg { + match message { Either::Left(unordered) => unordered_tx.send(unordered).unwrap(), - Either::Right(msg) => messages_tx.send(msg).await.unwrap(), + Either::Right(message) => messages_tx.send(message).await.unwrap(), } send_loop.await; } @@ -1502,24 +1548,23 @@ mod tests { })), ]; - for msg in input { + for message in input { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), UnflushableSink, - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); - match msg { + match message { Either::Left(unordered) => unordered_tx.send(unordered).unwrap(), - Either::Right(msg) => messages_tx.send(msg).await.unwrap(), + Either::Right(message) => messages_tx.send(message).await.unwrap(), } send_loop.await; } diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index 069912a9c92..025bf23c05f 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -7,8 +7,8 @@ mod message_handlers; pub mod messages; pub use client_connection::{ - ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredDeque, - MeteredReceiver, MeteredSender, Protocol, + ClientConfig, ClientConnection, ClientConnectionReceiver, ClientConnectionSender, ClientSendError, DataMessage, + MeteredDeque, MeteredReceiver, MeteredSender, Protocol, }; pub use client_connection_index::ClientActorIndex; pub use message_handlers::{MessageExecutionError, MessageHandleError}; diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index b3c2e2fe909..8088825c9e9 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -9,6 +9,7 @@ use std::time::Instant; use super::messages::{OneOffQueryResponseMessage, SerializableMessage}; use super::{message_handlers, ClientActorId, MessageHandleError}; +use crate::db::relational_db::RelationalDB; use crate::error::DBError; use crate::host::module_host::ClientConnectedError; use crate::host::{ModuleHost, NoSuchModule, ReducerArgs, ReducerCallError, ReducerCallResult}; @@ -25,12 +26,14 @@ use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe, UnsubscribeMulti, }; +use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use tokio::sync::mpsc::error::{SendError, TrySendError}; use tokio::sync::{mpsc, oneshot, watch}; use tokio::task::AbortHandle; +use tracing::trace; #[derive(PartialEq, Eq, Clone, Copy, Hash, Debug)] pub enum Protocol { @@ -64,6 +67,10 @@ pub struct ClientConfig { /// rather than [`TransactionUpdateLight`]s on a successful update. // TODO(centril): As more knobs are added, make this into a bitfield (when there's time). pub tx_update_full: bool, + /// If `true`, the client requests to receive updates for transactions + /// confirmed to be durable. If `false`, updates will be delivered + /// immediately. + pub confirmed_reads: bool, } impl ClientConfig { @@ -72,15 +79,143 @@ impl ClientConfig { protocol: Protocol::Binary, compression: <_>::default(), tx_update_full: true, + confirmed_reads: false, } } } +/// A message to be sent to the client, along with the transaction offset it +/// was computed at, if available. +/// +// TODO: Consider a different name, "ClientUpdate" is used elsewhere already. +#[derive(Debug)] +struct ClientUpdate { + /// Transaction offset at which `message` was computed. + /// + /// This is only `Some` if `message` is a query result. + /// + /// If `Some` and [`ClientConfig::confirmed_reads`] is `true`, + /// [`ClientConnectionReceiver`] will delay delivery until the durable + /// offset of the database is equal to or greater than `tx_offset`. + pub tx_offset: Option, + /// Type-erased outgoing message. + pub message: SerializableMessage, +} + +/// Types with access to the [`DurableOffset`] of a database. +/// +/// Provided implementors are [`watch::Receiver`] and [`RelationalDB`]. +/// +/// The latter is mostly useful for tests, where no managed [`ModuleHost`] is +/// available, while the former supports module hotswapping. +pub trait DurableOffsetSupply: Send { + /// Obtain the current [`DurableOffset`] handle. + /// + /// Returns: + /// + /// - `Err(NoSuchModule)` if the database was shut down + /// - `Ok(None)` if the database is configured without durability + /// - `Ok(DurableOffset)` otherwise + /// + fn durable_offset(&mut self) -> Result, NoSuchModule>; +} + +impl DurableOffsetSupply for watch::Receiver { + fn durable_offset(&mut self) -> Result, NoSuchModule> { + let module = if self.has_changed().map_err(|_| NoSuchModule)? { + self.borrow_and_update() + } else { + self.borrow() + }; + + Ok(module.replica_ctx().relational_db.durable_tx_offset()) + } +} + +impl DurableOffsetSupply for RelationalDB { + fn durable_offset(&mut self) -> Result, NoSuchModule> { + Ok(self.durable_tx_offset()) + } +} + +/// Receiving end of [`ClientConnectionSender`]. +/// +/// The [`ClientConnection`] actor reads messages from this channel and sends +/// them to the client over its websocket connection. +/// +/// The [`ClientConnectionReceiver`] takes care of confirmed reads semantics, +/// if requested by the client. +pub struct ClientConnectionReceiver { + confirmed_reads: bool, + channel: MeteredReceiver, + offset_supply: Box, +} + +impl ClientConnectionReceiver { + /// Receive the next message from this channel. + /// + /// If this method returns `None`, the channel is closed and no more messages + /// are in the internal buffers. No more messages can ever be received from + /// the channel. + /// + /// Messages are returned immediately if: + /// + /// - The (internal) [`ClientUpdate`] does not have a `tx_offset` + /// (such as for error messages). + /// - The client hasn't requested confirmed reads (i.e. + /// [`ClientConfig::confirmed_reads`] is `false`). + /// - The database is configured to not persist transactions. + /// + /// Otherwise, the update's `tx_offset` is compared against the module's + /// durable offset. If the durable offset is behind the `tx_offset`, the + /// method waits until it catches up before returning the message. + /// + /// If the database is shut down while waiting for the durable offset, + /// `None` is returned. In this case, no more messages can ever be received + /// from the channel. + // + // TODO: Can we make a cancel-safe `recv_many` with confirmed reads semantics? + pub async fn recv(&mut self) -> Option { + let ClientUpdate { tx_offset, message } = self.channel.recv().await?; + if !self.confirmed_reads { + return Some(message); + } + + if let Some(tx_offset) = tx_offset { + match self.offset_supply.durable_offset() { + Ok(Some(mut durable)) => { + trace!("waiting for offset {tx_offset} to become durable"); + durable.wait_for(tx_offset).await.ok()?; + } + // Database shut down or crashed. + Err(NoSuchModule) => return None, + // In-memory database. + Ok(None) => return Some(message), + } + } + + trace!("returning durable message"); + Some(message) + } + + /// Close the receiver without dropping it. + /// + /// This is used to notify the [`ClientConnectionSender`] that the receiver + /// will not consume any more messages from the channel, usually because the + /// connection has been closed or is about to be closed. + /// + /// After calling this method, the sender will not be able to send more + /// messages, preventing the internal buffer from filling up. + pub fn close(&mut self) { + self.channel.close(); + } +} + #[derive(Debug)] pub struct ClientConnectionSender { pub id: ClientActorId, pub config: ClientConfig, - sendtx: mpsc::Sender, + sendtx: mpsc::Sender, abort_handle: AbortHandle, cancelled: AtomicBool, @@ -137,15 +272,23 @@ pub enum ClientSendError { } impl ClientConnectionSender { - pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, MeteredReceiver) { - let (sendtx, rx) = mpsc::channel(1); + pub fn dummy_with_channel( + id: ClientActorId, + config: ClientConfig, + offset_supply: impl DurableOffsetSupply + 'static, + ) -> (Self, ClientConnectionReceiver) { + let (sendtx, rx) = mpsc::channel(CLIENT_CHANNEL_CAPACITY_TEST); // just make something up, it doesn't need to be attached to a real task let abort_handle = match tokio::runtime::Handle::try_current() { Ok(h) => h.spawn(async {}).abort_handle(), Err(_) => tokio::runtime::Runtime::new().unwrap().spawn(async {}).abort_handle(), }; - let rx = MeteredReceiver::new(rx); + let receiver = ClientConnectionReceiver { + confirmed_reads: config.confirmed_reads, + channel: MeteredReceiver::new(rx), + offset_supply: Box::new(offset_supply), + }; let cancelled = AtomicBool::new(false); let sender = Self { id, @@ -155,11 +298,11 @@ impl ClientConnectionSender { cancelled, metrics: None, }; - (sender, rx) + (sender, receiver) } - pub fn dummy(id: ClientActorId, config: ClientConfig) -> Self { - Self::dummy_with_channel(id, config).0 + pub fn dummy(id: ClientActorId, config: ClientConfig, offset_supply: impl DurableOffsetSupply + 'static) -> Self { + Self::dummy_with_channel(id, config, offset_supply).0 } pub fn is_cancelled(&self) -> bool { @@ -168,11 +311,18 @@ impl ClientConnectionSender { /// Send a message to the client. For data-related messages, you should probably use /// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order. - pub fn send_message(&self, message: impl Into) -> Result<(), ClientSendError> { - self.send(message.into()) + pub fn send_message( + &self, + tx_offset: Option, + message: impl Into, + ) -> Result<(), ClientSendError> { + self.send(ClientUpdate { + tx_offset, + message: message.into(), + }) } - fn send(&self, message: SerializableMessage) -> Result<(), ClientSendError> { + fn send(&self, message: ClientUpdate) -> Result<(), ClientSendError> { if self.cancelled.load(Relaxed) { return Err(ClientSendError::Cancelled); } @@ -181,7 +331,12 @@ impl ClientConnectionSender { Err(mpsc::error::TrySendError::Full(_)) => { // we've hit CLIENT_CHANNEL_CAPACITY messages backed up in // the channel, so forcibly kick the client - tracing::warn!(identity = %self.id.identity, connection_id = %self.id.connection_id, "client channel capacity exceeded"); + tracing::warn!( + identity = %self.id.identity, + connection_id = %self.id.connection_id, + confirmed_reads = self.config.confirmed_reads, + "client channel capacity exceeded" + ); self.abort_handle.abort(); self.cancelled.store(true, Ordering::Relaxed); return Err(ClientSendError::Cancelled); @@ -425,6 +580,9 @@ impl MeteredSender { // if a client racks up this many messages in the queue without ACK'ing // anything, we boot 'em. const CLIENT_CHANNEL_CAPACITY: usize = 16 * KB; +// use a smaller value for tests +const CLIENT_CHANNEL_CAPACITY_TEST: usize = 8; + const KB: usize = 1024; /// Value returned by [`ClientConnection::call_client_connected_maybe_reject`] @@ -467,7 +625,7 @@ impl ClientConnection { config: ClientConfig, replica_id: u64, mut module_rx: watch::Receiver, - actor: impl FnOnce(ClientConnection, MeteredReceiver) -> Fut, + actor: impl FnOnce(ClientConnection, ClientConnectionReceiver) -> Fut, _proof_of_client_connected_call: Connected, ) -> ClientConnection where @@ -479,7 +637,7 @@ impl ClientConnection { // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); - let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); + let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); let (fut_tx, fut_rx) = oneshot::channel::(); // weird dance so that we can get an abort_handle into ClientConnection @@ -502,7 +660,11 @@ impl ClientConnection { .abort_handle(); let metrics = ClientConnectionMetrics::new(database_identity, config.protocol); - let sendrx = MeteredReceiver::with_gauge(sendrx, metrics.sendtx_queue_size.clone()); + let receiver = ClientConnectionReceiver { + confirmed_reads: config.confirmed_reads, + channel: MeteredReceiver::with_gauge(sendrx, metrics.sendtx_queue_size.clone()), + offset_supply: Box::new(module_rx.clone()), + }; let sender = Arc::new(ClientConnectionSender { id, @@ -518,7 +680,7 @@ impl ClientConnection { module_rx, }; - let actor_fut = actor(this.clone(), sendrx); + let actor_fut = actor(this.clone(), receiver); // if this fails, the actor() function called .abort(), which like... okay, I guess? let _ = fut_tx.send(actor_fut); @@ -532,7 +694,7 @@ impl ClientConnection { module_rx: watch::Receiver, ) -> Self { Self { - sender: Arc::new(ClientConnectionSender::dummy(id, config)), + sender: Arc::new(ClientConnectionSender::dummy(id, config, module_rx.clone())), replica_id, module_rx, } diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 6fec0c1b62b..872d93093b6 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -32,7 +32,7 @@ use spacetimedb_datastore::{ }, traits::TxData, }; -use spacetimedb_durability::{self as durability, TxOffset}; +use spacetimedb_durability as durability; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::st_var::StVarValue; @@ -63,6 +63,8 @@ use std::path::Path; use std::sync::Arc; use tokio::sync::watch; +pub use durability::{DurableOffset, TxOffset}; + // NOTE(cloutiertyler): We should be using the associated types, but there is // a bug in the Rust compiler that prevents us from doing so. pub type MutTx = MutTxId; //::MutTx; @@ -369,7 +371,9 @@ impl RelationalDB { .as_ref() .map(|pair| pair.0.clone()) .as_deref() - .and_then(|durability| durability.durable_tx_offset()); + .map(|durability| durability.durable_tx_offset().get()) + .transpose()? + .flatten(); let (min_commitlog_offset, _) = history.tx_range_hint(); log::info!("[{database_identity}] DATABASE: durable_tx_offset is {durable_tx_offset:?}"); @@ -801,18 +805,18 @@ impl RelationalDB { } #[tracing::instrument(level = "trace", skip_all)] - pub fn release_tx(&self, tx: Tx) -> (TxMetrics, String) { + pub fn release_tx(&self, tx: Tx) -> (TxOffset, TxMetrics, String) { log::trace!("RELEASE TX"); self.inner.release_tx(tx) } #[tracing::instrument(level = "trace", skip_all)] - pub fn commit_tx(&self, tx: MutTx) -> Result, DBError> { + pub fn commit_tx(&self, tx: MutTx) -> Result, DBError> { log::trace!("COMMIT MUT TX"); // TODO: Never returns `None` -- should it? let reducer_context = tx.ctx.reducer_context().cloned(); - let Some((tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { + let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { return Ok(None); }; @@ -822,7 +826,7 @@ impl RelationalDB { Self::do_durability(&**durability, reducer_context.as_ref(), &tx_data) } - Ok(Some((tx_data, tx_metrics, reducer))) + Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } #[tracing::instrument(level = "trace", skip_all)] @@ -898,6 +902,14 @@ impl RelationalDB { } } + /// Get the [`DurableOffset`] of this database, or `None` if this is an + /// in-memory instance. + pub fn durable_tx_offset(&self) -> Option { + self.durability + .as_ref() + .map(|durability| durability.durable_tx_offset()) + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// @@ -1000,7 +1012,7 @@ impl RelationalDB { { let mut tx = self.begin_tx(workload); let res = f(&mut tx); - let (tx_metrics, reducer) = self.release_tx(tx); + let (_tx_offset, tx_metrics, reducer) = self.release_tx(tx); self.report_read_tx_metrics(reducer, tx_metrics); res } @@ -1015,7 +1027,7 @@ impl RelationalDB { self.report_mut_tx_metrics(reducer, tx_metrics, None); } else { match self.commit_tx(tx).map_err(E::from)? { - Some((tx_data, tx_metrics, reducer)) => { + Some((_tx_offset, tx_data, tx_metrics, reducer)) => { self.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } None => panic!("TODO: retry?"), @@ -2092,7 +2104,7 @@ mod tests { fn table( name: &str, columns: ProductType, - f: impl FnOnce(RawTableDefBuilder) -> RawTableDefBuilder, + f: impl FnOnce(RawTableDefBuilder<'_>) -> RawTableDefBuilder, ) -> TableSchema { let mut builder = RawModuleDefV9Builder::new(); f(builder.build_table_with_new_type(name, columns, true)); diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 4994ef69431..b4e9030be9f 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -6,6 +6,7 @@ use std::sync::{MutexGuard, PoisonError}; use enum_as_inner::EnumAsInner; use hex::FromHexError; use spacetimedb_commitlog::repo::TxOffset; +use spacetimedb_durability::DurabilityExited; use spacetimedb_expr::errors::TypingError; use spacetimedb_lib::Identity; use spacetimedb_schema::error::ValidationErrors; @@ -144,6 +145,8 @@ pub enum DBError { }, #[error(transparent)] RestoreSnapshot(#[from] RestoreSnapshotError), + #[error(transparent)] + DurabilityGone(#[from] DurabilityExited), } impl DBError { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 29749f651db..b80ce91df87 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -49,6 +49,7 @@ use spacetimedb_vm::relation::RelValue; use std::fmt; use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; +use tokio::sync::oneshot; #[derive(Debug, Default, Clone, From)] pub struct DatabaseUpdate { @@ -395,7 +396,7 @@ fn init_database( let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) { None => { - if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { + if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } None @@ -1109,74 +1110,79 @@ impl ModuleHost { log::debug!("One-off query: {query}"); let metrics = self .on_module_thread("one_off_query", move || { - db.with_read_only(Workload::Sql, |tx| { - // We wrap the actual query in a closure so we can use ? to handle errors without making - // the entire transaction abort with an error. - let result: Result<(OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { - let tx = SchemaViewer::new(tx, &auth); - - let ( - // A query may compile down to several plans. - // This happens when there are multiple RLS rules per table. - // The original query is the union of these plans. - plans, - _, - table_name, - _, - ) = compile_subscription(&query, &tx, &auth)?; - - // Optimize each fragment - let optimized = plans - .into_iter() - .map(|plan| plan.optimize()) - .collect::, _>>()?; - - check_row_limit( - &optimized, - &db, - &tx, - // Estimate the number of rows this query will scan - |plan, tx| estimate_rows_scanned(tx, plan), - &auth, - )?; - - let optimized = optimized - .into_iter() - // Convert into something we can execute - .map(PipelinedProject::from) - .collect::>(); - - // Execute the union and return the results - execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) - .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) - .context("One-off queries are not allowed to modify the database") - })(); - - let total_host_execution_duration = timer.elapsed().into(); - let (message, metrics): (SerializableMessage, Option) = match result { - Ok((rows, metrics)) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: None, - results: vec![rows], - total_host_execution_duration, - }), - Some(metrics), - ), - Err(err) => ( - into_message(OneOffQueryResponseMessage { - message_id, - error: Some(format!("{err}")), - results: vec![], - total_host_execution_duration, - }), - None, - ), - }; - - subscriptions.send_client_message(client, message, tx)?; - Ok::, anyhow::Error>(metrics) - }) + let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); + let tx = scopeguard::guard(db.begin_tx(Workload::Sql), |tx| { + let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); + let _ = tx_offset_sender.send(tx_offset); + db.report_read_tx_metrics(reducer, tx_metrics); + }); + + // We wrap the actual query in a closure so we can use ? to handle errors without making + // the entire transaction abort with an error. + let result: Result<(OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { + let tx = SchemaViewer::new(&*tx, &auth); + + let ( + // A query may compile down to several plans. + // This happens when there are multiple RLS rules per table. + // The original query is the union of these plans. + plans, + _, + table_name, + _, + ) = compile_subscription(&query, &tx, &auth)?; + + // Optimize each fragment + let optimized = plans + .into_iter() + .map(|plan| plan.optimize()) + .collect::, _>>()?; + + check_row_limit( + &optimized, + &db, + &tx, + // Estimate the number of rows this query will scan + |plan, tx| estimate_rows_scanned(tx, plan), + &auth, + )?; + + let optimized = optimized + .into_iter() + // Convert into something we can execute + .map(PipelinedProject::from) + .collect::>(); + + // Execute the union and return the results + execute_plan::<_, F>(&optimized, &DeltaTx::from(&*tx)) + .map(|(rows, _, metrics)| (OneOffTable { table_name, rows }, metrics)) + .context("One-off queries are not allowed to modify the database") + })(); + + let total_host_execution_duration = timer.elapsed().into(); + let (message, metrics): (SerializableMessage, Option) = match result { + Ok((rows, metrics)) => ( + into_message(OneOffQueryResponseMessage { + message_id, + error: None, + results: vec![rows], + total_host_execution_duration, + }), + Some(metrics), + ), + Err(err) => ( + into_message(OneOffQueryResponseMessage { + message_id, + error: Some(format!("{err}")), + results: vec![], + total_host_execution_duration, + }), + None, + ), + }; + + subscriptions.send_client_message(client, message, (&*tx, tx_offset_receiver))?; + Ok::, anyhow::Error>(metrics) }) .await??; diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index 7b1f2f50485..2d3b320ced0 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -307,7 +307,7 @@ impl InstanceCommon { Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } Ok(()) => { - if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { + if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } system_logger.info("Database updated"); @@ -613,7 +613,7 @@ fn commit_and_broadcast_event( .commit_and_broadcast_event(client, event, tx) .unwrap() { - Ok((event, _)) => event, + Ok(res) => res.event, Err(WriteConflict) => todo!("Write skew, you need to implement retries my man, T-dawg."), } } diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index d12e811ca7a..089148cdd2e 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -9,6 +9,7 @@ use crate::estimation::estimate_rows_scanned; use crate::host::module_host::{DatabaseTableUpdate, DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; use crate::host::ArgsTuple; use crate::subscription::module_subscription_actor::{ModuleSubscriptions, WriteConflict}; +use crate::subscription::module_subscription_manager::TransactionOffset; use crate::subscription::tx::DeltaTx; use crate::util::slow::SlowQueryLogger; use crate::vm::{check_row_limit, DbProgram, TxMode}; @@ -26,6 +27,7 @@ use spacetimedb_schema::relation::FieldName; use spacetimedb_vm::eval::run_ast; use spacetimedb_vm::expr::{CodeResult, CrudExpr, Expr}; use spacetimedb_vm::relation::MemTable; +use tokio::sync::oneshot; pub struct StmtResult { pub schema: ProductType, @@ -172,6 +174,11 @@ pub fn execute_sql_tx<'a>( } pub struct SqlResult { + /// The offset of the SQL operation's transaction. + /// + /// Used to determine visibility of the transaction wrt the durability + /// requirements requested by the caller. + pub tx_offset: TransactionOffset, pub rows: Vec, /// These metrics will be reported via `report_tx_metrics`. /// They should not be reported separately to avoid double counting. @@ -200,9 +207,12 @@ pub fn run( // and hence there are no deltas to process. let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql); - // Release the tx on drop, so that we record metrics. + let (tx_offset_send, tx_offset) = oneshot::channel(); + // Release the tx on drop, so that we record metrics + // and set the transaction offset. let mut tx = scopeguard::guard(tx, |tx| { - let (tx_metrics_downgrade, reducer) = db.release_tx(tx); + let (offset, tx_metrics_downgrade, reducer) = db.release_tx(tx); + let _ = tx_offset_send.send(offset); db.report_tx_metrics( reducer, Some(Arc::new(tx_data)), @@ -232,6 +242,7 @@ pub fn run( tx.metrics.merge(metrics); Ok(SqlResult { + tx_offset, rows, metrics: tx.metrics, }) @@ -252,10 +263,17 @@ pub fn run( if subs.is_none() { let metrics = tx.metrics; return db.commit_tx(tx).map(|tx_opt| { - if let Some((tx_data, tx_metrics, reducer)) = tx_opt { - db.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); + let (tx_offset, tx_data, tx_metrics, reducer) = tx_opt.unwrap(); + + let (tx_offset_sender, tx_offset_receiver) = oneshot::channel(); + let _ = tx_offset_sender.send(tx_offset); + + db.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); + SqlResult { + tx_offset: tx_offset_receiver, + rows: vec![], + metrics, } - SqlResult { rows: vec![], metrics } }); } @@ -289,7 +307,11 @@ pub fn run( Err(WriteConflict) => { todo!("See module_host_actor::call_reducer_with_tx") } - Ok(_) => Ok(SqlResult { rows: vec![], metrics }), + Ok(res) => Ok(SqlResult { + tx_offset: res.tx_offset, + rows: vec![], + metrics, + }), } } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 48c2af5fb95..64e09ac5e16 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,6 +1,7 @@ use super::execution_unit::QueryHash; use super::module_subscription_manager::{ spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, + TransactionOffset, }; use super::query::compile_query_with_hashes; use super::tx::DeltaTx; @@ -22,18 +23,23 @@ use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use parking_lot::RwLock; use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge}; +use scopeguard::ScopeGuard; use spacetimedb_client_api_messages::websocket::{ self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe, UnsubscribeMulti, }; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::TxId; +use spacetimedb_datastore::traits::TxData; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; use std::{sync::Arc, time::Instant}; +use tokio::sync::oneshot; type Subscriptions = Arc>; @@ -123,6 +129,16 @@ impl SubscriptionMetrics { } } +/// Inner result type of [`ModuleSubscriptions::commit_and_broadcast_event`]. +pub type CommitAndBroadcastEventResult = Result; + +/// `Ok` side of a [`CommitAndBroadcastEventResult`]. +pub struct CommitAndBroadcastEventSuccess { + pub tx_offset: TransactionOffset, + pub event: Arc, + pub metrics: ExecutionMetrics, +} + type AssertTxFn = Arc; type SubscriptionUpdate = FormatSwitch, TableUpdate>; type FullSubscriptionUpdate = FormatSwitch, ws::DatabaseUpdate>; @@ -299,6 +315,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -316,10 +333,7 @@ impl ModuleSubscriptions { let hash = QueryHash::from_string(&sql, auth.caller, false); let hash_with_param = QueryHash::from_string(&sql, auth.caller, true); - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, tx_offset) = self.begin_tx(Workload::Subscribe); let existing_query = { let guard = self.subscriptions.read(); @@ -365,6 +379,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx_offset), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -390,6 +405,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -418,10 +434,7 @@ impl ModuleSubscriptions { return Ok(None); }; - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, tx_offset) = self.begin_tx(Workload::Unsubscribe); let auth = AuthCtx::new(self.owner_identity, sender.id.identity); let (table_rows, metrics) = return_on_err_with_sql!( self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe), @@ -438,6 +451,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx_offset), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -464,6 +478,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -480,10 +495,7 @@ impl ModuleSubscriptions { let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Unsubscribe); // Always lock the db before the subscription lock to avoid deadlocks. - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, tx_offset) = self.begin_tx(Workload::Unsubscribe); let removed_queries = { let _compile_timer = subscription_metrics.compilation_time.start_timer(); @@ -527,6 +539,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender, + Some(tx_offset), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -576,10 +589,7 @@ impl ModuleSubscriptions { let auth = AuthCtx::new(self.owner_identity, sender); // We always get the db lock before the subscription lock to avoid deadlocks. - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, _tx_offset) = self.begin_tx(Workload::Subscribe); let compile_timer = metrics.compilation_time.start_timer(); @@ -631,9 +641,10 @@ impl ModuleSubscriptions { &self, recipient: Arc, message: impl Into, - _tx_id: &TxId, + (_tx, tx_offset): (&TxId, TransactionOffset), ) -> Result<(), BroadcastError> { - self.broadcast_queue.send_client_message(recipient, message) + self.broadcast_queue + .send_client_message(recipient, Some(tx_offset), message) } #[tracing::instrument(level = "trace", skip_all)] @@ -648,6 +659,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { let _ = self.broadcast_queue.send_client_message( sender.clone(), + None, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -678,10 +690,7 @@ impl ModuleSubscriptions { send_err_msg, None ); - let tx = scopeguard::guard(tx, |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, tx_offset) = self.guard_tx(tx, <_>::default()); // We minimize locking so that other clients can add subscriptions concurrently. // We are protected from race conditions with broadcasts, because we have the db lock, @@ -738,6 +747,7 @@ impl ModuleSubscriptions { let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx_offset), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -772,10 +782,7 @@ impl ModuleSubscriptions { num_queries, &subscription_metrics, )?; - let tx = scopeguard::guard(tx, |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let (tx, tx_offset) = self.guard_tx(tx, <_>::default()); check_row_limit( &queries, @@ -830,6 +837,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender, + Some(tx_offset), SubscriptionUpdateMessage { database_update, request_id: Some(subscription.request_id), @@ -854,7 +862,7 @@ impl ModuleSubscriptions { caller: Option>, mut event: ModuleEvent, tx: MutTx, - ) -> Result, ExecutionMetrics), WriteConflict>, DBError> { + ) -> Result { let database_identity = self.relational_db.database_identity(); let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Update); @@ -887,11 +895,11 @@ impl ModuleSubscriptions { let tx_data = tx_data.map(Arc::new); // When we're done with this method, release the tx and report metrics. - let mut read_tx = scopeguard::guard(read_tx, |tx| { - let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx); - self.relational_db - .report_tx_metrics(reducer, tx_data.clone(), Some(tx_metrics_mut), Some(tx_metrics_read)); - }); + let (extra_tx_offset_sender, extra_tx_offset) = oneshot::channel(); + let (mut read_tx, tx_offset) = self.guard_tx( + read_tx, + GuardTxOptions::full(extra_tx_offset_sender, tx_data.clone(), tx_metrics_mut), + ); // Create the delta transaction we'll use to eval updates against. let delta_read_tx = tx_data .as_ref() @@ -904,7 +912,8 @@ impl ModuleSubscriptions { match &event.status { EventStatus::Committed(_) => { - update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller); + update_metrics = + subscriptions.eval_updates_sequential((&delta_read_tx, tx_offset), event.clone(), caller); } EventStatus::Failed(_) => { if let Some(client) = caller { @@ -913,7 +922,7 @@ impl ModuleSubscriptions { database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None), }; - let _ = self.broadcast_queue.send_client_message(client, message); + let _ = self.broadcast_queue.send_client_message(client, None, message); } else { log::trace!("Reducer failed but there is no client to send the failure to!") } @@ -924,7 +933,81 @@ impl ModuleSubscriptions { // Merge in the subscription evaluation metrics. read_tx.metrics.merge(update_metrics); - Ok(Ok((event, update_metrics))) + Ok(Ok(CommitAndBroadcastEventSuccess { + tx_offset: extra_tx_offset, + event, + metrics: update_metrics, + })) + } + + /// Helper that starts a new read transaction, and guards it using + /// [`Self::guard_tx`] with the default configuration. + fn begin_tx(&self, workload: Workload) -> (ScopeGuard, TransactionOffset) { + self.guard_tx(self.relational_db.begin_tx(workload), <_>::default()) + } + + /// Helper wrapping `tx` in a scopegard, with a configurable drop fn. + /// + /// By default, `tx` is released when the returned [`ScopeGuard`] is dropped, + /// and reports the transaction metrics via [`RelationalDB::report_tx_metrics`]. + /// The `tx_data` and `tx_metrics_mut` parameters are passed to the metrics + /// reporting method as-is; they can be used to report additional metrics + /// about a previous mutable transaction that was downgraded to `tx` after + /// committing. + /// + /// The method returns a [`ScopeGuard`] along with a [`TransactionOffset`]. + /// When the transaction commits, its transaction offset is sent to the + /// latter (a [`oneshot::Receiver`]). + /// If another receiver of the transaction offset is needed, its sending + /// side can be passed in as `extra_tx_offset_sender`. It will be sent the + /// offset as well. + fn guard_tx( + &self, + tx: TxId, + GuardTxOptions { + extra_tx_offset_sender, + tx_data, + tx_metrics_mut, + }: GuardTxOptions, + ) -> (ScopeGuard, TransactionOffset) { + let (offset_tx, offset_rx) = oneshot::channel(); + let guard = scopeguard::guard(tx, |tx| { + let (tx_offset, tx_metrics, reducer) = self.relational_db.release_tx(tx); + log::trace!("read tx released with offset {tx_offset}"); + let _ = offset_tx.send(tx_offset); + if let Some(extra) = extra_tx_offset_sender { + let _ = extra.send(tx_offset); + } + self.relational_db + .report_tx_metrics(reducer, tx_data, tx_metrics_mut, Some(tx_metrics)); + }); + + (guard, offset_rx) + } +} + +/// Extra parameters for [`ModuleSubscriptions::guard_tx`]. +#[derive(Default)] +struct GuardTxOptions { + /// Sender for an extra [`oneshot::Receiver`] for the transaction offset. + extra_tx_offset_sender: Option>, + /// [`TxData`] of a preceding mutable transaction. + tx_data: Option>, + /// [`TxMetrics`] of a preceding mutable transaction. + tx_metrics_mut: Option, +} + +impl GuardTxOptions { + fn full( + extra_tx_offset_sender: oneshot::Sender, + tx_data: Option>, + tx_metrics_mut: TxMetrics, + ) -> Self { + Self { + extra_tx_offset_sender: extra_tx_offset_sender.into(), + tx_data, + tx_metrics_mut: tx_metrics_mut.into(), + } } } @@ -937,11 +1020,13 @@ mod tests { SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionUpdateMessage, TransactionUpdateMessage, }; - use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, MeteredReceiver, Protocol}; + use crate::client::{ + ClientActorId, ClientConfig, ClientConnectionReceiver, ClientConnectionSender, ClientName, Protocol, + }; use crate::db::relational_db::tests_utils::{ - begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB, + begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TempReplicaDir, TestDB, }; - use crate::db::relational_db::RelationalDB; + use crate::db::relational_db::{RelationalDB, Txdata}; use crate::error::DBError; use crate::host::module_host::{DatabaseUpdate, EventStatus, ModuleEvent, ModuleFunctionCall}; use crate::messages::websocket as ws; @@ -949,6 +1034,7 @@ mod tests { use crate::subscription::module_subscription_manager::{spawn_send_worker, SubscriptionManager}; use crate::subscription::query::compile_read_only_query; use crate::subscription::TableUpdateType; + use core::fmt; use hashbrown::HashMap; use itertools::Itertools; use pretty_assertions::assert_matches; @@ -957,7 +1043,9 @@ mod tests { CompressableQueryUpdate, Compression, FormatSwitch, QueryId, Subscribe, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe, UnsubscribeMulti, }; + use spacetimedb_commitlog::{commitlog, repo}; use spacetimedb_datastore::system_tables::{StRowLevelSecurityRow, ST_ROW_LEVEL_SECURITY_ID}; + use spacetimedb_durability::{Durability, EmptyHistory, TxOffset}; use spacetimedb_execution::dml::MutDatastore; use spacetimedb_lib::bsatn::ToBsatn; use spacetimedb_lib::db::auth::StAccess; @@ -967,9 +1055,14 @@ mod tests { use spacetimedb_lib::{error::ResultTest, AlgebraicType, Identity}; use spacetimedb_primitives::TableId; use spacetimedb_sats::product; + use std::future::Future; + use std::pin::pin; + use std::sync::RwLock; + use std::task::Poll; use std::time::Instant; use std::{sync::Arc, time::Duration}; use tokio::sync::mpsc::{self}; + use tokio::sync::watch; fn add_subscriber(db: Arc, sql: &str, assert: Option) -> Result<(), DBError> { // Create and enter a Tokio runtime to run the `ModuleSubscriptions`' background workers in parallel. @@ -978,7 +1071,7 @@ mod tests { let owner = Identity::from_byte_array([1; 32]); let client = ClientActorId::for_test(Identity::ZERO); let config = ClientConfig::for_test(); - let sender = Arc::new(ClientConnectionSender::dummy(client, config)); + let sender = Arc::new(ClientConnectionSender::dummy(client, config, (*db).clone())); let send_worker_queue = spawn_send_worker(None); let module_subscriptions = ModuleSubscriptions::new( db.clone(), @@ -995,12 +1088,88 @@ mod tests { Ok(()) } + /// A [`Durability`] for which the durable offset is marked manually. + struct ManualDurability { + commitlog: Arc>>, + durable_offset: watch::Sender>, + } + + impl ManualDurability { + #[allow(unused)] + fn mark_durable_at(&self, offset: TxOffset) { + assert!( + self.committed_offset().is_some_and(|committed| committed >= offset), + "given offset is not in the commitlog" + ); + self.durable_offset.send_modify(|val| { + val.replace(offset); + }); + } + + fn mark_durable(&self) { + if let Some(offset) = self.committed_offset() { + self.durable_offset.send_modify(|val| { + val.replace(offset); + }); + } + } + + fn committed_offset(&self) -> Option { + self.commitlog.read().unwrap().max_committed_offset() + } + } + + impl Durability for ManualDurability { + type TxData = Txdata; + + fn append_tx(&self, tx: Self::TxData) { + let mut commitlog = self.commitlog.write().unwrap(); + if let Err(tx) = commitlog.append(tx) { + commitlog.commit().expect("error flushing commitlog"); + commitlog.append(tx).expect("should be able to append after flush"); + } + commitlog.commit().expect("error flushing commitlog"); + } + + fn durable_tx_offset(&self) -> spacetimedb_durability::DurableOffset { + self.durable_offset.subscribe().into() + } + } + + impl Default for ManualDurability { + fn default() -> Self { + let (durable_offset, ..) = watch::channel(None); + Self { + commitlog: Arc::new(RwLock::new( + commitlog::Generic::open(repo::Memory::new(), <_>::default()).unwrap(), + )), + durable_offset, + } + } + } + /// An in-memory `RelationalDB` for testing fn relational_db() -> anyhow::Result> { let TestDB { db, .. } = TestDB::in_memory()?; Ok(Arc::new(db)) } + /// An in-memory `RelationalDB` with `ManualDurability`. + fn relational_db_with_manual_durability() -> anyhow::Result<(Arc, Arc)> { + let dir = TempReplicaDir::new()?; + let durability = Arc::new(ManualDurability::default()); + let db = TestDB::open_db( + &dir, + EmptyHistory::new(), + Some((durability.clone(), Arc::new(|| Ok(0)))), + None, + None, + 0, + )?; + + Ok((Arc::new(db), durability)) + } + /// A [SubscribeSingle] message for testing fn single_subscribe(sql: &str, query_id: u32) -> SubscribeSingle { SubscribeSingle { @@ -1073,27 +1242,57 @@ mod tests { } } + fn client_connection_with_config( + client_id: ClientActorId, + db: &RelationalDB, + config: ClientConfig, + ) -> (Arc, ClientConnectionReceiver) { + let (sender, receiver) = ClientConnectionSender::dummy_with_channel(client_id, config, db.clone()); + (Arc::new(sender), receiver) + } + /// Instantiate a client connection with compression fn client_connection_with_compression( client_id: ClientActorId, + db: &RelationalDB, compression: Compression, - ) -> (Arc, MeteredReceiver) { - let (sender, rx) = ClientConnectionSender::dummy_with_channel( + ) -> (Arc, ClientConnectionReceiver) { + client_connection_with_config( client_id, + db, ClientConfig { protocol: Protocol::Binary, compression, tx_update_full: true, + confirmed_reads: false, }, - ); - (Arc::new(sender), rx) + ) } /// Instantiate a client connection fn client_connection( client_id: ClientActorId, - ) -> (Arc, MeteredReceiver) { - client_connection_with_compression(client_id, Compression::None) + db: &RelationalDB, + ) -> (Arc, ClientConnectionReceiver) { + client_connection_with_compression(client_id, db, Compression::None) + } + + /// Instantiate a client connection with confirmed reads turned on or off. + fn client_connection_with_confirmed_reads( + client_id: ClientActorId, + db: &RelationalDB, + confirmed_reads: bool, + ) -> (Arc, ClientConnectionReceiver) { + client_connection_with_config( + client_id, + db, + ClientConfig { + protocol: Protocol::Binary, + compression: Compression::None, + tx_update_full: true, + confirmed_reads, + }, + ) } /// Insert rules into the RLS system table @@ -1166,13 +1365,13 @@ mod tests { /// Pull a message from receiver and assert that it is a `TxUpdate` with the expected rows async fn assert_tx_update_for_table( - rx: &mut MeteredReceiver, + rx: impl Future>, table_id: TableId, schema: &ProductType, inserts: impl IntoIterator, deletes: impl IntoIterator, ) { - match rx.recv().await { + match rx.await { Some(SerializableMessage::TxUpdate(TransactionUpdateMessage { database_update: SubscriptionUpdateMessage { @@ -1242,6 +1441,22 @@ mod tests { } } + /// Assert that the future `f` completes only after `durability` is marked + /// durable. + /// + /// Namely: + /// + /// - assert that polling `f` once returns [`Poll::Pending`] + /// - call `durability.mark_durable()` + /// - assert that polling `f` returns [`Poll::Ready`]. + /// + async fn assert_after_durable(durability: &ManualDurability, f: impl Future) { + let mut g = pin!(f); + assert_matches!(futures::poll!(&mut g), Poll::Pending); + durability.mark_durable(); + assert_matches!(futures::poll!(g), Poll::Ready(_)); + } + /// Commit a set of row updates and broadcast to subscribers fn commit_tx( db: &RelationalDB, @@ -1257,18 +1472,19 @@ mod tests { db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?; } - let Ok(Ok((_, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else { + let Ok(Ok(success)) = subs.commit_and_broadcast_event(None, module_event(), tx) else { panic!("Encountered an error in `commit_and_broadcast_event`"); }; - Ok(metrics) + Ok(success.metrics) } #[test] fn test_subscribe_metrics() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (sender, _) = client_connection(client_id); + let (sender, _) = client_connection(client_id, &db); - let db = relational_db()?; let (subs, _runtime) = ModuleSubscriptions::for_test_new_runtime(db.clone()); // Create a table `t` with index on `id` @@ -1320,10 +1536,11 @@ mod tests { /// Test that clients receive error messages on subscribe #[tokio::test] async fn subscribe_single_error() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (tx, mut rx) = client_connection(client_id); + let (tx, mut rx) = client_connection(client_id, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?; @@ -1340,10 +1557,11 @@ mod tests { /// Test that clients receive error messages on subscribe #[tokio::test] async fn subscribe_multi_error() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (tx, mut rx) = client_connection(client_id); + let (tx, mut rx) = client_connection(client_id, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?; @@ -1360,10 +1578,11 @@ mod tests { /// Test that clients receive error messages on unsubscribe #[tokio::test] async fn unsubscribe_single_error() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (tx, mut rx) = client_connection(client_id); + let (tx, mut rx) = client_connection(client_id, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); // Create a table `t` with an index on `id` @@ -1414,10 +1633,11 @@ mod tests { /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel. #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn unsubscribe_multi_error() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (tx, mut rx) = client_connection(client_id); + let (tx, mut rx) = client_connection(client_id, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); // Create a table `t` with an index on `id` @@ -1468,10 +1688,11 @@ mod tests { /// Test that clients receive error messages on tx updates #[tokio::test] async fn tx_update_error() -> anyhow::Result<()> { + let db = relational_db()?; + let client_id = client_id_from_u8(1); - let (tx, mut rx) = client_connection(client_id); + let (tx, mut rx) = client_connection(client_id, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); // Create two tables `t` and `s` with indexes on their `id` columns @@ -1523,6 +1744,8 @@ mod tests { /// Test that two clients can subscribe to a parameterized query and get the correct rows. #[tokio::test] async fn test_parameterized_subscription() -> anyhow::Result<()> { + let db = relational_db()?; + // Create identities for two different clients let id_for_a = identity_from_u8(1); let id_for_b = identity_from_u8(2); @@ -1531,10 +1754,9 @@ mod tests { let client_id_for_b = client_id_from_u8(2); // Establish a connection for each client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let schema = [("identity", AlgebraicType::identity())]; @@ -1581,14 +1803,16 @@ mod tests { let schema = ProductType::from([AlgebraicType::identity()]); // Both clients should only receive their identities and not the other's. - assert_tx_update_for_table(&mut rx_for_a, table_id, &schema, [product![id_for_a]], []).await; - assert_tx_update_for_table(&mut rx_for_b, table_id, &schema, [product![id_for_b]], []).await; + assert_tx_update_for_table(rx_for_a.recv(), table_id, &schema, [product![id_for_a]], []).await; + assert_tx_update_for_table(rx_for_b.recv(), table_id, &schema, [product![id_for_b]], []).await; Ok(()) } /// Test that two clients can subscribe to a table with RLS rules and get the correct rows #[tokio::test] async fn test_rls_subscription() -> anyhow::Result<()> { + let db = relational_db()?; + // Create identities for two different clients let id_for_a = identity_from_u8(1); let id_for_b = identity_from_u8(2); @@ -1597,10 +1821,9 @@ mod tests { let client_id_for_b = client_id_from_u8(2); // Establish a connection for each client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_for_a, &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_for_b, &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let schema = [("id", AlgebraicType::identity())]; @@ -1655,19 +1878,20 @@ mod tests { let schema = ProductType::from([AlgebraicType::identity()]); // Both clients should only receive their identities and not the other's. - assert_tx_update_for_table(&mut rx_for_a, w_id, &schema, [product![id_for_a]], []).await; - assert_tx_update_for_table(&mut rx_for_b, w_id, &schema, [product![id_for_b]], []).await; + assert_tx_update_for_table(rx_for_a.recv(), w_id, &schema, [product![id_for_a]], []).await; + assert_tx_update_for_table(rx_for_b.recv(), w_id, &schema, [product![id_for_b]], []).await; Ok(()) } /// Test that a client and the database owner can subscribe to the same query #[tokio::test] async fn test_rls_for_owner() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a connection for owner and client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(0)); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(1)); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(0), &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(1), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); // Create table `t` @@ -1715,7 +1939,7 @@ mod tests { )?; assert_tx_update_for_table( - &mut rx_for_a, + rx_for_a.recv(), table_id, &schema, // The owner should receive both identities @@ -1725,7 +1949,7 @@ mod tests { .await; assert_tx_update_for_table( - &mut rx_for_b, + rx_for_b.recv(), table_id, &schema, // Client `b` should only receive its identity @@ -1740,10 +1964,11 @@ mod tests { /// Test that we do not send empty updates to clients #[tokio::test] async fn test_no_empty_updates() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection - let (tx, mut rx) = client_connection(client_id_from_u8(1)); + let (tx, mut rx) = client_connection(client_id_from_u8(1), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let schema = [("x", AlgebraicType::U8)]; @@ -1778,7 +2003,7 @@ mod tests { // If the server sends empty updates, this assertion will fail, // because we will receive one for the first transaction. - assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8]], []).await; + assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8]], []).await; Ok(()) } @@ -1789,10 +2014,11 @@ mod tests { /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel. #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_no_compression_for_subscribe() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection with compression - let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli); + let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), &db, Compression::Brotli); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?; @@ -1833,10 +2059,11 @@ mod tests { /// Test that we receive subscription updates for DML #[tokio::test] async fn test_updates_for_dml() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection - let (tx, mut rx) = client_connection(client_id_from_u8(1)); + let (tx, mut rx) = client_connection(client_id_from_u8(1), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let schema = [("x", AlgebraicType::U8), ("y", AlgebraicType::U8)]; let t_id = db.create_table_for_test("t", &schema, &[])?; @@ -1861,17 +2088,17 @@ mod tests { )?; // Client should receive insert - assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 1_u8]], []).await; + assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 1_u8]], []).await; run(&db, "UPDATE t SET y=2 WHERE x=0", auth, Some(&subs), &mut vec![])?; // Client should receive update - assert_tx_update_for_table(&mut rx, t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await; + assert_tx_update_for_table(rx.recv(), t_id, &schema, [product![0_u8, 2_u8]], [product![0_u8, 1_u8]]).await; run(&db, "DELETE FROM t WHERE x=0", auth, Some(&subs), &mut vec![])?; // Client should receive delete - assert_tx_update_for_table(&mut rx, t_id, &schema, [], [product![0_u8, 2_u8]]).await; + assert_tx_update_for_table(rx.recv(), t_id, &schema, [], [product![0_u8, 2_u8]]).await; Ok(()) } @@ -1880,10 +2107,11 @@ mod tests { /// but we don't care about that for this test. #[tokio::test] async fn test_no_compression_for_update() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection with compression - let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), Compression::Brotli); + let (tx, mut rx) = client_connection_with_compression(client_id_from_u8(1), &db, Compression::Brotli); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let table_id = db.create_table_for_test("t", &[("x", AlgebraicType::U64)], &[])?; @@ -1930,10 +2158,11 @@ mod tests { #[tokio::test] async fn test_update_for_join() -> anyhow::Result<()> { async fn test_subscription_updates(queries: &[&'static str]) -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection - let (sender, mut rx) = client_connection(client_id_from_u8(1)); + let (sender, mut rx) = client_connection(client_id_from_u8(1), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let p_schema = [("id", AlgebraicType::U64), ("signed_in", AlgebraicType::Bool)]; @@ -1967,7 +2196,7 @@ mod tests { // We should receive both matching player rows assert_tx_update_for_table( - &mut rx, + rx.recv(), p_id, &schema, [product![1_u64, true], product![2_u64, true]], @@ -1985,7 +2214,7 @@ mod tests { // We should receive an update for it because it is still matching assert_tx_update_for_table( - &mut rx, + rx.recv(), p_id, &schema, [product![2_u64, false]], @@ -2003,7 +2232,7 @@ mod tests { // We should receive an update for it because it is still matching assert_tx_update_for_table( - &mut rx, + rx.recv(), p_id, &schema, [product![2_u64, true]], @@ -2043,11 +2272,12 @@ mod tests { /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel. #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_query_pruning() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a connection for each client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1)); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2)); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1), &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let u_id = db.create_table_for_test( @@ -2138,7 +2368,7 @@ mod tests { let metrics = commit_tx(&db, &subs, [], [(v_id, product![2u64, 6u64, 6u64])])?; assert_tx_update_for_table( - &mut rx_for_a, + rx_for_a.recv(), u_id, &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), [product![2u64, 3u64, 3u64]], @@ -2159,7 +2389,7 @@ mod tests { )?; assert_tx_update_for_table( - &mut rx_for_b, + rx_for_b.recv(), u_id, &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), [product![1u64, 2u64, 3u64]], @@ -2184,9 +2414,10 @@ mod tests { /// Test that we do not evaluate queries that we know will not match row updates #[tokio::test] async fn test_join_pruning() -> anyhow::Result<()> { - let (tx, mut rx) = client_connection(client_id_from_u8(1)); - let db = relational_db()?; + + let (tx, mut rx) = client_connection(client_id_from_u8(1), &db); + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let u_id = db.create_table_for_test_with_the_works( @@ -2255,7 +2486,7 @@ mod tests { // Insert a new row into `u` that joins with `x = 1` let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 2u64, 3u64])])?; - assert_tx_update_for_table(&mut rx, u_id, &schema, [product![1u64, 2u64, 3u64]], []).await; + assert_tx_update_for_table(rx.recv(), u_id, &schema, [product![1u64, 2u64, 3u64]], []).await; // We should only have evaluated a single query assert_eq!(metrics.delta_queries_evaluated, 1); @@ -2282,7 +2513,7 @@ mod tests { )?; // Results in a no-op - assert_tx_update_for_table(&mut rx, u_id, &schema, [], []).await; + assert_tx_update_for_table(rx.recv(), u_id, &schema, [], []).await; // We should have evaluated queries for `x = 1` and `x = 2` assert_eq!(metrics.delta_queries_evaluated, 2); @@ -2297,7 +2528,7 @@ mod tests { [(v_id, product![3u64, 4u64, 3u64]), (u_id, product![3u64, 4u64, 5u64])], )?; - assert_tx_update_for_table(&mut rx, u_id, &schema, [product![3u64, 4u64, 5u64]], []).await; + assert_tx_update_for_table(rx.recv(), u_id, &schema, [product![3u64, 4u64, 5u64]], []).await; // We should have evaluated queries for `x = 3` and `x = 4` assert_eq!(metrics.delta_queries_evaluated, 2); @@ -2311,7 +2542,7 @@ mod tests { [(v_id, product![3u64, 0u64, 3u64])], )?; - assert_tx_update_for_table(&mut rx, u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await; + assert_tx_update_for_table(rx.recv(), u_id, &schema, [], [product![3u64, 4u64, 5u64]]).await; // We should only have evaluated the query for `x = 4` assert_eq!(metrics.delta_queries_evaluated, 1); @@ -2337,11 +2568,12 @@ mod tests { /// Test that one client subscribing does not affect another #[tokio::test] async fn test_subscribe_distinct_queries_same_plan() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a connection for each client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1)); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2)); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1), &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let u_id = db.create_table_for_test_with_the_works( @@ -2407,7 +2639,7 @@ mod tests { commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?; assert_tx_update_for_table( - &mut rx_for_a, + rx_for_a.recv(), u_id, &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), [product![1u64, 0u64, 0u64]], @@ -2416,7 +2648,7 @@ mod tests { .await; assert_tx_update_for_table( - &mut rx_for_b, + rx_for_b.recv(), u_id, &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), [product![1u64, 0u64, 0u64]], @@ -2430,11 +2662,12 @@ mod tests { /// Test that one client unsubscribing does not affect another #[tokio::test] async fn test_unsubscribe_distinct_queries_same_plan() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a connection for each client - let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1)); - let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2)); + let (tx_for_a, mut rx_for_a) = client_connection(client_id_from_u8(1), &db); + let (tx_for_b, mut rx_for_b) = client_connection(client_id_from_u8(2), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let u_id = db.create_table_for_test_with_the_works( @@ -2509,7 +2742,7 @@ mod tests { let metrics = commit_tx(&db, &subs, [], [(u_id, product![1u64, 0u64, 0u64])])?; assert_tx_update_for_table( - &mut rx_for_a, + rx_for_a.recv(), u_id, &ProductType::from([AlgebraicType::U64, AlgebraicType::U64, AlgebraicType::U64]), [product![1u64, 0u64, 0u64]], @@ -2541,10 +2774,11 @@ mod tests { /// Needs a multi-threaded tokio runtime so that the module subscription worker can run in parallel. #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn test_query_pruning_for_empty_tables() -> anyhow::Result<()> { + let db = relational_db()?; + // Establish a client connection - let (tx, mut rx) = client_connection(client_id_from_u8(1)); + let (tx, mut rx) = client_connection(client_id_from_u8(1), &db); - let db = relational_db()?; let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); let schema = &[("id", AlgebraicType::U64), ("a", AlgebraicType::U64)]; @@ -2662,4 +2896,64 @@ mod tests { Ok(()) } + + #[tokio::test] + async fn test_confirmed_reads() -> anyhow::Result<()> { + let (db, durability) = relational_db_with_manual_durability()?; + + let (tx_for_confirmed, mut rx_for_confirmed) = + client_connection_with_confirmed_reads(client_id_from_u8(1), &db, true); + let (tx_for_unconfirmed, mut rx_for_unconfirmed) = + client_connection_with_confirmed_reads(client_id_from_u8(2), &db, false); + + let subs = ModuleSubscriptions::for_test_enclosing_runtime(db.clone()); + let table = db.create_table_for_test("t", &[("x", AlgebraicType::U8)], &[])?; + let schema = ProductType::from([AlgebraicType::U8]); + + // Subscribe both clients. + subscribe_multi(&subs, &["select * from t"], tx_for_confirmed, &mut 0)?; + subscribe_multi(&subs, &["select * from t"], tx_for_unconfirmed, &mut 0)?; + + assert_matches!( + rx_for_unconfirmed.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + assert_after_durable(&durability, async { + assert_matches!( + rx_for_confirmed.recv().await, + Some(SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + })) + ); + }) + .await; + + // Insert a row. + let mut tx = begin_mut_tx(&db); + db.insert(&mut tx, table, &bsatn::to_vec(&product![1_u8])?)?; + assert!(matches!( + subs.commit_and_broadcast_event(None, module_event(), tx), + Ok(Ok(_)) + )); + // Insert another row, using SQL. + let auth = AuthCtx::new(identity_from_u8(0), identity_from_u8(0)); + run(&db, "INSERT INTO t (x) VALUES (2)", auth, Some(&subs), &mut vec![])?; + + // Unconfirmed client should have received both rows. + assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![1_u8]], []).await; + assert_tx_update_for_table(rx_for_unconfirmed.recv(), table, &schema, [product![2_u8]], []).await; + + // Confirmed client should receive the rows after the tx becomes durable. + assert_after_durable(&durability, async { + assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![1_u8]], []).await; + assert_tx_update_for_table(rx_for_confirmed.recv(), table, &schema, [product![2_u8]], []).await + }) + .await; + + Ok(()) + } } diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 9820a739dc8..7e8798a6e5e 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -21,6 +21,7 @@ use spacetimedb_client_api_messages::websocket::{ }; use spacetimedb_data_structures::map::{Entry, IntMap}; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; +use spacetimedb_durability::TxOffset; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue}; use spacetimedb_primitives::{ColId, IndexId, TableId}; @@ -29,7 +30,7 @@ use std::collections::BTreeMap; use std::fmt::Debug; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; /// Clients are uniquely identified by their Identity and ConnectionId. /// Identity is insufficient because different ConnectionIds can use the same Identity. @@ -548,12 +549,30 @@ impl SenderWithGauge { } } +/// The offset used to control visibility of the message if the client has +/// requested confirmed reads. +/// +/// [`SendWorkerMessage`]s are sent while holding the database lock, i.e. +/// without committing the transaction. When the transaction commits, the +/// message sender is expected to send the transaction offset along this channel. +/// +/// NOTE: If the send end is dropped before sending the offset, the +/// [`SendWorker`] will assume that the message sender was cancelled, and exit +/// itself. +pub type TransactionOffset = oneshot::Receiver; + /// Message sent by the [`SubscriptionManager`] to the [`SendWorker`]. #[derive(Debug)] enum SendWorkerMessage { /// A transaction has completed and the [`SubscriptionManager`] has evaluated the incremental queries, /// so the [`SendWorker`] should broadcast them to clients. - Broadcast(ComputedQueries), + /// + /// The `tx_offset` of the transaction is used to control visibility of + /// the results if the client has requested confirmed reads. + Broadcast { + tx_offset: TransactionOffset, + queries: ComputedQueries, + }, /// A new client has been registered in the [`SubscriptionManager`], /// so the [`SendWorker`] should also record its existence. @@ -566,9 +585,14 @@ enum SendWorkerMessage { outbound_ref: Client, }, - // Send a message to a client. + /// Send a message to a client. + /// + /// In some cases, `message` may contain query results. In this case, + /// `tx_offset` is `Some`, and later used to control visibility of the + /// message if the the client has requested confirmed reads. SendMessage { recipient: Arc, + tx_offset: Option, message: SerializableMessage, }, @@ -1100,7 +1124,7 @@ impl SubscriptionManager { #[tracing::instrument(level = "trace", skip_all)] pub fn eval_updates_sequential( &self, - tx: &DeltaTx, + (tx, tx_offset): (&DeltaTx, TransactionOffset), event: Arc, caller: Option>, ) -> ExecutionMetrics { @@ -1266,12 +1290,15 @@ impl SubscriptionManager { // then return ASAP in order to unlock the datastore and start running the next transaction. // See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation. self.send_worker_queue - .send(SendWorkerMessage::Broadcast(ComputedQueries { - updates, - errs, - event, - caller, - })) + .send(SendWorkerMessage::Broadcast { + tx_offset, + queries: ComputedQueries { + updates, + errs, + event, + caller, + }, + }) .expect("send worker has panicked, or otherwise dropped its recv queue!"); drop(span); @@ -1370,10 +1397,12 @@ impl BroadcastQueue { pub fn send_client_message( &self, recipient: Arc, + tx_offset: Option, message: impl Into, ) -> Result<(), BroadcastError> { self.0.send(SendWorkerMessage::SendMessage { recipient, + tx_offset, message: message.into(), })?; Ok(()) @@ -1427,14 +1456,31 @@ impl SendWorker { self.clients .insert(client_id, SendWorkerClient { dropped, outbound_ref }); } - SendWorkerMessage::SendMessage { recipient, message } => { - let _ = recipient.send_message(message); - } + SendWorkerMessage::SendMessage { + recipient, + tx_offset, + message, + } => match tx_offset { + None => { + let _ = recipient.send_message(None, message); + } + Some(tx_offset) => { + let Ok(tx_offset) = tx_offset.await else { + tracing::error!("tx offset sender dropped, exiting send worker"); + return; + }; + let _ = recipient.send_message(Some(tx_offset), message); + } + }, SendWorkerMessage::RemoveClient(client_id) => { self.clients.remove(&client_id); } - SendWorkerMessage::Broadcast(queries) => { - self.send_one_computed_queries(queries); + SendWorkerMessage::Broadcast { tx_offset, queries } => { + let Ok(tx_offset) = tx_offset.await else { + tracing::error!("tx offset sender dropped, exiting send worker"); + return; + }; + self.send_one_computed_queries(tx_offset, queries); } } } @@ -1442,6 +1488,7 @@ impl SendWorker { fn send_one_computed_queries( &mut self, + tx_offset: TxOffset, ComputedQueries { updates, errs, @@ -1527,7 +1574,7 @@ impl SendWorker { event: Some(event.clone()), database_update, }; - send_to_client(&caller, message); + send_to_client(&caller, Some(tx_offset), message); } // Send all the other updates. @@ -1537,7 +1584,7 @@ impl SendWorker { // Conditionally send out a full update or a light one otherwise. let event = client.config.tx_update_full.then(|| event.clone()); let message = TransactionUpdateMessage { event, database_update }; - send_to_client(&client, message); + send_to_client(&client, Some(tx_offset), message); } // Put back the aggregation maps into the worker. @@ -1550,6 +1597,7 @@ impl SendWorker { client.dropped.store(true, Ordering::Release); send_to_client( &client.outbound_ref, + None, SubscriptionMessage { request_id: None, query_id: None, @@ -1565,8 +1613,13 @@ impl SendWorker { } } -fn send_to_client(client: &ClientConnectionSender, message: impl Into) { - if let Err(e) = client.send_message(message) { +fn send_to_client( + client: &ClientConnectionSender, + tx_offset: Option, + message: impl Into, +) { + tracing::trace!(client = %client.id, tx_offset, "send_to_client"); + if let Err(e) = client.send_message(tx_offset, message) { tracing::warn!(%client.id, "failed to send update message to client: {e}") } } @@ -1581,12 +1634,14 @@ mod tests { use spacetimedb_primitives::{ColId, TableId}; use spacetimedb_sats::product; use spacetimedb_subscription::SubscriptionPlan; + use tokio::sync::oneshot; use super::{Plan, SubscriptionManager}; use crate::db::relational_db::tests_utils::with_read_only; use crate::host::module_host::DatabaseTableUpdate; use crate::sql::ast::SchemaViewer; use crate::subscription::module_subscription_manager::ClientQueryId; + use crate::subscription::tx::DeltaTx; use crate::{ client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName}, db::relational_db::{tests_utils::TestDB, RelationalDB}, @@ -1617,7 +1672,7 @@ mod tests { (Identity::ZERO, ConnectionId::from_u128(connection_id)) } - fn client(connection_id: u128) -> ClientConnectionSender { + fn client(connection_id: u128, db: &RelationalDB) -> ClientConnectionSender { let (identity, connection_id) = id(connection_id); ClientConnectionSender::dummy( ClientActorId { @@ -1626,6 +1681,7 @@ mod tests { name: ClientName(0), }, ClientConfig::for_test(), + db.clone(), ) } @@ -1639,7 +1695,7 @@ mod tests { let hash = plan.hash(); let id = id(0); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -1663,7 +1719,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -1686,7 +1742,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -1712,7 +1768,7 @@ mod tests { let sql = "select * from T"; let plan = compile_plan(&db, sql)?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -1737,7 +1793,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -1766,7 +1822,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -1803,7 +1859,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let clients = (0..3).map(|i| Arc::new(client(i))).collect::>(); + let clients = (0..3).map(|i| Arc::new(client(i, &db))).collect::>(); // All of the clients are using the same query id. let query_id: ClientQueryId = QueryId::new(1); @@ -1844,7 +1900,7 @@ mod tests { let plan = compile_plan(&db, sql)?; let hash = plan.hash(); - let clients = (0..3).map(|i| Arc::new(client(i))).collect::>(); + let clients = (0..3).map(|i| Arc::new(client(i, &db))).collect::>(); // All of the clients are using the same query id. let query_id: ClientQueryId = QueryId::new(1); @@ -1895,7 +1951,7 @@ mod tests { .map(|sql| compile_plan(&db, &sql)) .collect::>>()?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -1940,7 +1996,7 @@ mod tests { .map(|sql| compile_plan(&db, &sql)) .collect::>>()?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -1984,7 +2040,7 @@ mod tests { let table_id = create_table(&db, "t")?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2057,7 +2113,7 @@ mod tests { let table_id = create_table(&db, "t")?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2127,7 +2183,7 @@ mod tests { let t_id = db.create_table_for_test("t", &schema, &[0.into()])?; let s_id = db.create_table_for_test("s", &schema, &[0.into()])?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2199,7 +2255,7 @@ mod tests { let sql = "select * from T"; let plan = compile_plan(&db, sql)?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -2224,7 +2280,7 @@ mod tests { let sql = "select * from T"; let plan = compile_plan(&db, sql)?; - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let query_id: ClientQueryId = QueryId::new(1); @@ -2252,7 +2308,7 @@ mod tests { let hash = plan.hash(); let id = id(0); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2278,7 +2334,7 @@ mod tests { let hash = plan.hash(); let id = id(0); - let client = Arc::new(client(0)); + let client = Arc::new(client(0, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2310,10 +2366,10 @@ mod tests { let hash = plan.hash(); let id0 = id(0); - let client0 = Arc::new(client(0)); + let client0 = Arc::new(client(0, &db)); let id1 = id(1); - let client1 = Arc::new(client(1)); + let client1 = Arc::new(client(1, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2358,10 +2414,10 @@ mod tests { let hash_select1 = plan_select1.hash(); let id0 = id(0); - let client0 = Arc::new(client(0)); + let client0 = Arc::new(client(0, &db)); let id1 = id(1); - let client1 = Arc::new(client(1)); + let client1 = Arc::new(client(1, &db)); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2420,7 +2476,7 @@ mod tests { let id0 = Identity::ZERO; let client0 = ClientActorId::for_test(id0); let config = ClientConfig::for_test(); - let (client0, mut rx) = ClientConnectionSender::dummy_with_channel(client0, config); + let (client0, mut rx) = ClientConnectionSender::dummy_with_channel(client0, config, (*db).clone()); let runtime = tokio::runtime::Runtime::new().unwrap(); let _rt = runtime.enter(); @@ -2442,9 +2498,20 @@ mod tests { timer: None, }); - db.with_read_only(Workload::Update, |tx| { - subscriptions.eval_updates_sequential(&(&*tx).into(), event, Some(Arc::new(client0))) - }); + // This block ensures that the transaction is released before waiting + // for a message to appear on `rx`. + // The message won't be sent until the transaction offset is known, + // and it is known when the transaction commits. + { + let (offset_tx, offset_rx) = oneshot::channel(); + let tx = scopeguard::guard(db.begin_tx(Workload::Update), |tx| { + let (tx_offset, tx_metrics, reducer) = db.release_tx(tx); + let _ = offset_tx.send(tx_offset); + db.report_read_tx_metrics(reducer, tx_metrics); + }); + let delta_tx = DeltaTx::from(&*tx); + subscriptions.eval_updates_sequential((&delta_tx, offset_rx), event, Some(Arc::new(client0))); + } runtime.block_on(async move { tokio::time::timeout(Duration::from_millis(20), async move { diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index 0a46d472df6..af77913e067 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -50,4 +50,4 @@ spacetimedb-commitlog = { path = "../commitlog", features = ["test"] } # Also as dev-dependencies for use in _this_ crate's tests. proptest.workspace = true -pretty_assertions.workspace = true \ No newline at end of file +pretty_assertions.workspace = true diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index 1577a379c43..0f23ad58154 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -355,9 +355,10 @@ impl Tx for Locking { /// allowing new mutable transactions to start if this was the last read-only transaction. /// /// Returns: + /// - [`TxOffset`], the smallest transaction offset visible to this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - fn release_tx(&self, tx: Self::Tx) -> (TxMetrics, String) { + fn release_tx(&self, tx: Self::Tx) -> (TxOffset, TxMetrics, String) { tx.release() } } @@ -874,7 +875,7 @@ impl MutTx for Locking { tx.rollback() } - fn commit_mut_tx(&self, tx: Self::MutTx) -> Result> { + fn commit_mut_tx(&self, tx: Self::MutTx) -> Result> { Ok(Some(tx.commit())) } } @@ -914,7 +915,7 @@ pub struct Replay { } impl Replay { - fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor) -> T) -> T { + fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T { let mut committed_state = self.committed_state.write_arc(); let mut visitor = ReplayVisitor { database_identity: &self.database_identity, @@ -1425,7 +1426,8 @@ mod tests { } fn commit(datastore: &Locking, tx: MutTxId) -> ResultTest { - Ok(datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`").0) + let (_, tx_data, _, _) = datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`"); + Ok(tx_data) } #[rustfmt::skip] diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index c3346cd231a..af3504fbdd9 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -25,6 +25,7 @@ use core::ops::RangeBounds; use core::{cell::RefCell, mem}; use core::{iter, ops::Bound}; use smallvec::SmallVec; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; use spacetimedb_lib::{ @@ -1184,7 +1185,8 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - pub fn commit(mut self) -> (TxData, TxMetrics, String) { + pub fn commit(mut self) -> (TxOffset, TxData, TxMetrics, String) { + let tx_offset = self.committed_state_write_lock.next_tx_offset; let tx_data = self.committed_state_write_lock.merge(self.tx_state, &self.ctx); // Compute and keep enough info that we can @@ -1202,7 +1204,7 @@ impl MutTxId { ); let reducer = self.ctx.into_reducer_name(); - (tx_data, tx_metrics, reducer) + (tx_offset, tx_data, tx_metrics, reducer) } /// Commits this transaction, applying its changes to the committed state. diff --git a/crates/datastore/src/locking_tx_datastore/tx.rs b/crates/datastore/src/locking_tx_datastore/tx.rs index 1fadd3e94be..a57233bec19 100644 --- a/crates/datastore/src/locking_tx_datastore/tx.rs +++ b/crates/datastore/src/locking_tx_datastore/tx.rs @@ -6,6 +6,7 @@ use super::{ }; use crate::execution_context::ExecutionContext; use crate::locking_tx_datastore::state_view::IterTx; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::Datastore; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_primitives::{ColList, TableId}; @@ -13,8 +14,8 @@ use spacetimedb_sats::AlgebraicValue; use spacetimedb_schema::schema::TableSchema; use spacetimedb_table::blob_store::BlobStore; use spacetimedb_table::table::Table; -use std::num::NonZeroU64; use std::sync::Arc; +use std::{future, num::NonZeroU64}; use std::{ ops::RangeBounds, time::{Duration, Instant}, @@ -89,9 +90,17 @@ impl TxId { /// allowing new mutable transactions to start if this was the last read-only transaction. /// /// Returns: + /// - [`TxOffset`], the smallest transaction offset visible to this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - pub(super) fn release(self) -> (TxMetrics, String) { + pub(super) fn release(self) -> (TxOffset, TxMetrics, String) { + // A read tx doesn't consume `next_tx_offset`, so subtract one to obtain + // the offset that was visible to the transaction. + // Note that technically the tx could have run against an empty database, + // in which case we'd wrongly return zero (a non-existent transaction). + // This doesn not happen in practice, however, as [RelationalDB::set_initialized] + // creates a transaction. + let tx_offset = self.committed_state_shared_lock.next_tx_offset.saturating_sub(1); let tx_metrics = TxMetrics::new( &self.ctx, self.timer, @@ -102,7 +111,7 @@ impl TxId { &self.committed_state_shared_lock, ); let reducer = self.ctx.into_reducer_name(); - (tx_metrics, reducer) + (tx_offset, tx_metrics, reducer) } /// The Number of Distinct Values (NDV) for a column or list of columns, @@ -120,4 +129,8 @@ impl TxId { let (_, index) = table.get_index_by_cols(cols)?; NonZeroU64::new(index.num_keys() as u64) } + + pub fn tx_offset(&self) -> future::Ready { + future::ready(self.committed_state_shared_lock.next_tx_offset) + } } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 9626f992bae..db39a9bc90e 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -9,6 +9,7 @@ use super::Result; use crate::execution_context::{ReducerContext, Workload}; use crate::system_tables::ST_TABLE_ID; use spacetimedb_data_structures::map::IntMap; +use spacetimedb_durability::TxOffset; use spacetimedb_lib::{hash_bytes, Identity}; use spacetimedb_primitives::*; use spacetimedb_sats::hash::Hash; @@ -177,13 +178,13 @@ pub struct TxData { deletes: BTreeMap>, /// Map of all `TableId`s in both `inserts` and `deletes` to their /// corresponding table name. + // TODO: Store table name as ref counted string. tables: IntMap, /// Tx offset of the transaction which performed these operations. /// /// `None` implies that `inserts` and `deletes` are both empty, /// but `Some` does not necessarily imply that either is non-empty. tx_offset: Option, - // TODO: Store an `Arc` or equivalent instead. } impl TxData { @@ -327,9 +328,19 @@ pub trait Tx { /// Release this read-only transaction. /// /// Returns: + /// - [`TxOffset`], the smallest transaction offset visible to this transaction. + /// + /// Note that, if the transaction was running under an isolation level + /// weaker than [`IsolationLevel::Snapshot`], it may have observed + /// transactions at a later offset than when it started. + /// + /// Implementations must uphold that the returned transaction offset + /// accounts for such read anomalies, i.e. the offset must include the + /// observed transactions. + /// /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - fn release_tx(&self, tx: Self::Tx) -> (TxMetrics, String); + fn release_tx(&self, tx: Self::Tx) -> (TxOffset, TxMetrics, String); } pub trait MutTx { @@ -341,10 +352,20 @@ pub trait MutTx { /// Commits `tx`, applying its changes to the committed state. /// /// Returns: + /// - [`TxOffset`], the offset this transaction was committed at. + /// + /// Note that, if the transaction was running under an isolation level + /// weaker than [`IsolationLevel::Snapshot`], it may have observed + /// transactions at a later offset than when it started. + /// + /// Implementations must uphold that the returned transaction offset + /// accounts for such read anomalies, i.e. the offset must include the + /// observed transactions. + /// /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - fn commit_mut_tx(&self, tx: Self::MutTx) -> Result>; + fn commit_mut_tx(&self, tx: Self::MutTx) -> Result>; /// Rolls back this transaction, discarding its changes. /// diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 34816a05437..3fc9eb19e6d 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -14,6 +14,7 @@ log.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true +thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 4a5c07feec5..1e2862aca92 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -3,10 +3,7 @@ use std::{ num::NonZeroU16, panic, sync::{ - atomic::{ - AtomicI64, AtomicU64, - Ordering::{Acquire, Relaxed, Release}, - }, + atomic::{AtomicU64, Ordering::Relaxed}, Arc, Weak, }, time::Duration, @@ -18,13 +15,13 @@ use log::{info, trace, warn}; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; use spacetimedb_paths::server::CommitLogDir; use tokio::{ - sync::mpsc, + sync::{mpsc, watch}, task::{spawn_blocking, AbortHandle, JoinHandle}, time::{interval, MissedTickBehavior}, }; use tracing::instrument; -use crate::{Durability, History, TxOffset}; +use crate::{Durability, DurableOffset, History, TxOffset}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] @@ -62,17 +59,7 @@ pub struct Local { clog: Arc>>, /// The durable transaction offset, as reported by the background /// [`FlushAndSyncTask`]. - /// - /// A negative number indicates that we haven't flushed yet, or that the - /// number overflowed. In either case, appending new transactions shall panic. - /// - /// The offset will be used by the datastore to squash durable transactions - /// into the committed state, thereby making them visible to durable-only - /// readers. - /// - /// We don't want to hang on to those transactions longer than needed, so - /// acquire / release or stronger should be used to prevent stale reads. - durable_offset: Arc, + durable_offset: watch::Receiver>, /// Backlog of transactions to be written to disk by the background /// [`PersisterTask`]. /// @@ -100,10 +87,7 @@ impl Local { let clog = Arc::new(Commitlog::open(root, opts.commitlog)?); let (queue, rx) = mpsc::unbounded_channel(); let queue_depth = Arc::new(AtomicU64::new(0)); - let offset = { - let offset = clog.max_committed_offset().map(|x| x as i64).unwrap_or(-1); - Arc::new(AtomicI64::new(offset)) - }; + let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let persister_task = rt.spawn( PersisterTask { @@ -118,7 +102,7 @@ impl Local { FlushAndSyncTask { clog: Arc::downgrade(&clog), period: opts.sync_interval, - offset: offset.clone(), + offset: durable_tx, abort: persister_task.abort_handle(), } .run(), @@ -126,7 +110,7 @@ impl Local { Ok(Self { clog, - durable_offset: offset, + durable_offset: durable_rx, queue, queue_depth, persister_task, @@ -256,7 +240,7 @@ fn flush_error(e: io::Error) { struct FlushAndSyncTask { clog: Weak>>, period: Duration, - offset: Arc, + offset: watch::Sender>, /// Handle to abort the [`PersisterTask`] if fsync panics. abort: AbortHandle, } @@ -277,8 +261,7 @@ impl FlushAndSyncTask { }; // Skip if nothing changed. if let Some(committed) = clog.max_committed_offset() { - let durable = self.offset.load(Acquire); - if durable.is_positive() && committed == durable as _ { + if self.offset.borrow().is_some_and(|durable| durable == committed) { continue; } } @@ -297,8 +280,9 @@ impl FlushAndSyncTask { } Ok(Ok(Some(new_offset))) => { trace!("synced to offset {new_offset}"); - // NOTE: Overflow will make `durable_tx_offset` return `None` - self.offset.store(new_offset as i64, Release); + self.offset.send_modify(|val| { + val.replace(new_offset); + }); } // No data to flush. Ok(Ok(None)) => {} @@ -317,9 +301,8 @@ impl Durability for Local { self.queue_depth.fetch_add(1, Relaxed); } - fn durable_tx_offset(&self) -> Option { - let offset = self.durable_offset.load(Acquire); - (offset > -1).then_some(offset as u64) + fn durable_tx_offset(&self) -> DurableOffset { + self.durable_offset.clone().into() } } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 1ab5fac1bd4..ddda60423a0 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -1,5 +1,8 @@ use std::{iter, marker::PhantomData, sync::Arc}; +use thiserror::Error; +use tokio::sync::watch; + pub use spacetimedb_commitlog::{error, payload::Txdata, Decoder, Transaction}; mod imp; @@ -15,6 +18,71 @@ pub use imp::{local, Local}; /// of all offsets smaller than it. pub type TxOffset = u64; +#[derive(Debug, Error)] +#[error("the database's durability layer went away")] +pub struct DurabilityExited; + +/// Handle to the durable offset, obtained via [`Durability::durable_tx_offset`]. +/// +/// The handle can be used to read the current durable offset, or wait for a +/// provided offset to be reached. +/// +/// The handle is valid for as long as the [`Durability`] instance it was +/// obtained from is live, i.e. able to persist transactions. When the instance +/// shuts down or crashes, methods will return errors of type [`DurabilityExited`]. +pub struct DurableOffset { + // TODO: `watch::Receiver::wait_for` will hold a shared lock until all + // subscribers have seen the current value. Although it may skip entries, + // this may cause unacceptable contention. We may consider a custom watch + // channel that operates on an `AtomicU64` instead of an `RwLock`. + inner: watch::Receiver>, +} + +impl DurableOffset { + /// Get the current durable offset, or `None` if no transaction has been + /// made durable yet. + /// + /// Returns `Err` if the associated durablity is no longer live. + pub fn get(&self) -> Result, DurabilityExited> { + self.guard_closed().map(|()| self.inner.borrow().as_ref().copied()) + } + + /// Get the current durable offset, even if the associated durability is + /// no longer live. + pub fn last_seen(&self) -> Option { + self.inner.borrow().as_ref().copied() + } + + /// Wait for `offset` to become durable, i.e. + /// + /// ```ignore + /// self.get().unwrap().is_some_and(|durable| durable >= offset) + /// ``` + /// + /// Returns the actual durable offset at which above condition evaluated to + /// `true`, or an `Err` if the durability is no longer live. + /// + /// Returns immediately if the condition evaluates to `true` for the current + /// durable offset. + pub async fn wait_for(&mut self, offset: TxOffset) -> Result { + self.inner + .wait_for(|durable| durable.is_some_and(|val| val >= offset)) + .await + .map(|r| r.as_ref().copied().unwrap()) + .map_err(|_| DurabilityExited) + } + + fn guard_closed(&self) -> Result<(), DurabilityExited> { + self.inner.has_changed().map(drop).map_err(|_| DurabilityExited) + } +} + +impl From>> for DurableOffset { + fn from(inner: watch::Receiver>) -> Self { + Self { inner } + } +} + /// The durability API. /// /// NOTE: This is a preliminary definition, still under consideration. @@ -41,7 +109,7 @@ pub trait Durability: Send + Sync { /// A `None` return value indicates that the durable offset is not known, /// either because nothing has been persisted yet, or because the status /// cannot be retrieved. - fn durable_tx_offset(&self) -> Option; + fn durable_tx_offset(&self) -> DurableOffset; } /// Access to the durable history. diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 8fb8e50e59b..ec083dbb4f5 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -255,7 +255,7 @@ impl<'a> Iterator for DeltaScanIter<'a> { /// Execute a query plan. /// The actual execution is driven by `f`. -pub fn execute_plan(plan: &ProjectPlan, tx: &T, f: impl Fn(PlanIter) -> R) -> Result +pub fn execute_plan(plan: &ProjectPlan, tx: &T, f: impl Fn(PlanIter<'_>) -> R) -> Result where T: Datastore + DeltaStore, { diff --git a/docs/docs/cli-reference.md b/docs/docs/cli-reference.md index 69ebbbd5ccd..d0eb020f26d 100644 --- a/docs/docs/cli-reference.md +++ b/docs/docs/cli-reference.md @@ -236,6 +236,7 @@ Runs a SQL query on the database. WARNING: This command is UNSTABLE and subject ###### Options: * `--interactive` — Instead of using a query, run an interactive command prompt for `SQL` expressions +* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions * `--anonymous` — Perform this action with an anonymous identity * `-s`, `--server ` — The nickname, host name or URL of the server hosting the database * `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com). @@ -540,6 +541,7 @@ Subscribe to SQL queries on the database. WARNING: This command is UNSTABLE and * `-t`, `--timeout ` — The timeout, in seconds, after which to disconnect and stop receiving subscription messages. If `-n` is specified, it will stop after whichever one comes first. * `--print-initial-update` — Print the initial update for the queries. +* `--confirmed` — Instruct the server to deliver only updates of confirmed transactions * `--anonymous` — Perform this action with an anonymous identity * `-y`, `--yes` — Run non-interactively wherever possible. This will answer "yes" to almost all prompts, but will sometimes answer "no" to preserve non-interactivity (e.g. when prompting whether to log in with spacetimedb.com). * `-s`, `--server ` — The nickname, host name or URL of the server hosting the database diff --git a/sdks/rust/src/db_connection.rs b/sdks/rust/src/db_connection.rs index 10bcb149320..7306f45160e 100644 --- a/sdks/rust/src/db_connection.rs +++ b/sdks/rust/src/db_connection.rs @@ -962,6 +962,24 @@ but you must call one of them, or else the connection will never progress. self } + /// Sets whether to use confirmed reads. + /// + /// When enabled, the server will send query results only after they are + /// confirmed to be durable. + /// + /// What durable means depends on the server configuration: a single node + /// server may consider a transaction durable once it is `fsync`'ed to disk, + /// a cluster after some number of replicas have acknowledged that they + /// have stored the transaction. + /// + /// Note that enabling confirmed reads will increase the latency between a + /// reducer call and the corresponding subscription update arriving at the + /// client. + pub fn with_confirmed_reads(mut self, confirmed: bool) -> Self { + self.params.confirmed = confirmed; + self + } + /// Register a callback to run when the connection is successfully initiated. /// /// The callback will receive three arguments: diff --git a/sdks/rust/src/websocket.rs b/sdks/rust/src/websocket.rs index 0595b3dfcc9..09fa7680633 100644 --- a/sdks/rust/src/websocket.rs +++ b/sdks/rust/src/websocket.rs @@ -107,6 +107,8 @@ fn parse_scheme(scheme: Option) -> Result { pub(crate) struct WsParams { pub compression: Compression, pub light: bool, + /// `true` to enable confirmed reads for the connection. + pub confirmed: bool, } fn make_uri(host: Uri, db_name: &str, connection_id: Option, params: WsParams) -> Result { @@ -152,6 +154,11 @@ fn make_uri(host: Uri, db_name: &str, connection_id: Option, param path.push_str("&light=true"); } + // Enable confirmed reads if requested. + if params.confirmed { + path.push_str("&confirmed=true"); + } + parts.path_and_query = Some(path.parse().map_err(|source: InvalidUri| UriError::InvalidUri { source: Arc::new(source), })?); diff --git a/smoketests/__init__.py b/smoketests/__init__.py index ed9d16cac69..b1bcd12471a 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -118,6 +118,17 @@ def extract_fields(cmd_output, field_name): out.append(val) return out +def parse_sql_result(res: str) -> list[dict]: + """Parse tabular output from an SQL query into a list of dicts.""" + lines = res.splitlines() + headers = lines[0].split('|') if '|' in lines[0] else [lines[0]] + headers = [header.strip() for header in headers] + rows = [] + for row in lines[2:]: + cols = [col.strip() for col in row.split('|')] + rows.append(dict(zip(headers, cols))) + return rows + def extract_field(cmd_output, field_name): field, = extract_fields(cmd_output, field_name) return field @@ -232,11 +243,22 @@ def fingerprint(self): def new_identity(self): new_identity(self.__class__.config_path) - def subscribe(self, *queries, n): + def subscribe(self, *queries, n, confirmed = False): self._check_published() assert isinstance(n, int) - args = [SPACETIME_BIN, "--config-path", str(self.config_path),"subscribe", self.database_identity, "-t", "600", "-n", str(n), "--print-initial-update", "--", *queries] + args = [ + SPACETIME_BIN, + "--config-path", str(self.config_path), + "subscribe", self.database_identity, + "-t", "600", + "-n", str(n), + "--print-initial-update", + ] + if confirmed: + args.append("--confirmed") + args.extend(["--", *queries]) + fake_args = ["spacetime", *args[1:]] log_cmd(fake_args) diff --git a/smoketests/config.toml b/smoketests/config.toml index bc7409327ef..3bc097d0d79 100644 --- a/smoketests/config.toml +++ b/smoketests/config.toml @@ -1,5 +1,5 @@ default_server = "localhost" -spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwYzc3NDY1NTE5MDM2MTE4M2JiNjFmMWMxYzY3NDUzMzYzY2MxMTY4MmM1NTUwNWZiNjdlYzI0ZWMyMWViIiwic3ViIjoiOTJlMmNkOGQtNTk5Ny00NjZlLWIwNmYtZDNjOGQ1NzU3ODI4IiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTc1MjA0NjgwMCwiZXhwIjpudWxsfQ.dgefoxC7eCOONVUufu2JTVFo9876zQ4Mqwm0ivZ0PQK7Hacm3Ip_xqyav4bilZ0vIEf8IM8AB0_xawk8WcbvMg" +spacetimedb_token = "eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NiJ9.eyJoZXhfaWRlbnRpdHkiOiJjMjAwMTU3NGEwMjgyNDRjNzZhNTE1MjU1NGMzY2ZjMWJiNmIzNzZlNjY4YmU1Yjg2MzE0MDAyYWRmOTMyYWVlIiwic3ViIjoiYzYwOWJkYjUtMDAyNS00YzZkLWIyZTktOGYyODEwM2IzNWUzIiwiaXNzIjoibG9jYWxob3N0IiwiYXVkIjpbInNwYWNldGltZWRiIl0sImlhdCI6MTc1NjkwOTQ3NywiZXhwIjpudWxsfQ.t6Aobx9fTe6kwvq7H01-2RO7vdK4SjQB7Uw-Lh4Daz0lG43WzIw3oVG_65txqlsFSkpx40wYElByj4jMolutpA" [[server_configs]] nickname = "localhost" diff --git a/smoketests/tests/confirmed_reads.py b/smoketests/tests/confirmed_reads.py new file mode 100644 index 00000000000..4d8a844c4bf --- /dev/null +++ b/smoketests/tests/confirmed_reads.py @@ -0,0 +1,52 @@ +from .. import Smoketest, parse_sql_result + +# +# TODO: We only test that we can pass a --confirmed flag and that things +# appear to works as if we hadn't. Without controlling the server, we can't +# test that there is any difference in behavior. +# + +class ConfirmedReads(Smoketest): + def test_confirmed_reads_receive_updates(self): + """Tests that subscribing with confirmed=true receives updates""" + + sub = self.subscribe("select * from person", n = 2, confirmed = True) + self.call("add", "Horst") + self.spacetime( + "sql", + self.database_identity, + "insert into person (name) values ('Egon')") + + events = sub() + self.assertEqual([ + { + 'person': { + 'deletes': [], + 'inserts': [{'name': 'Horst'}] + } + }, + { + 'person': { + 'deletes': [], + 'inserts': [{'name': 'Egon'}] + } + } + ], events) + +class ConfirmedReadsSql(Smoketest): + def test_sql_with_confirmed_reads_receives_result(self): + """Tests that an SQL operations with confirmed=true returns a result""" + + self.spacetime( + "sql", + "--confirmed", + self.database_identity, + "insert into person (name) values ('Horst')") + + res = self.spacetime( + "sql", + "--confirmed", + self.database_identity, + "select * from person") + res = parse_sql_result(str(res)) + self.assertEqual([{'name': '"Horst"'}], res) diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index eb988078d84..1936a6f0c42 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -1,4 +1,4 @@ -from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime +from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime, parse_sql_result from ..docker import DockerManager import time @@ -18,17 +18,6 @@ def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2): print("Max retries reached. Skipping the exception.") return False -def parse_sql_result(res: str) -> list[dict]: - """Parse tabular output from an SQL query into a list of dicts.""" - lines = res.splitlines() - headers = lines[0].split('|') if '|' in lines[0] else [lines[0]] - headers = [header.strip() for header in headers] - rows = [] - for row in lines[2:]: - cols = [col.strip() for col in row.split('|')] - rows.append(dict(zip(headers, cols))) - return rows - def int_vals(rows: list[dict]) -> list[dict]: """For all dicts in list, cast all values in dict to int.""" return [{k: int(v) for k, v in row.items()} for row in rows]