diff --git a/Cargo.lock b/Cargo.lock index fb75d8d9438..dcf2b5026ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -935,7 +935,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -2409,7 +2409,7 @@ dependencies = [ "httpdate", "itoa", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -2497,7 +2497,7 @@ dependencies = [ "hyper 1.6.0", "libc", "pin-project-lite", - "socket2", + "socket2 0.5.9", "tokio", "tower-service", "tracing", @@ -2843,6 +2843,17 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "io-uring" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4" +dependencies = [ + "bitflags 2.9.0", + "cfg-if", + "libc", +] + [[package]] name = "ipnet" version = "2.11.0" @@ -4092,7 +4103,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" dependencies = [ "anyhow", - "itertools 0.11.0", + "itertools 0.12.1", "proc-macro2", "quote", "syn 2.0.101", @@ -5250,6 +5261,16 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "socket2" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807" +dependencies = [ + "libc", + "windows-sys 0.59.0", +] + [[package]] name = "spacetime-module" version = "0.1.0" @@ -5706,6 +5727,7 @@ dependencies = [ "strum", "thin-vec", "thiserror 1.0.69", + "tokio", ] [[package]] @@ -5718,6 +5740,7 @@ dependencies = [ "spacetimedb-commitlog", "spacetimedb-paths", "spacetimedb-sats", + "thiserror 1.0.69", "tokio", "tracing", ] @@ -6061,7 +6084,7 @@ dependencies = [ "serde", "serde_json", "sled", - "socket2", + "socket2 0.5.9", "spacetimedb-client-api", "spacetimedb-client-api-messages", "spacetimedb-core", @@ -6791,20 +6814,22 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.45.0" +version = "1.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2513ca694ef9ede0fb23fe71a4ee4107cb102b9dc1930f6d0fd77aae068ae165" +checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038" dependencies = [ "backtrace", "bytes", + "io-uring", "libc", "mio 1.0.3", "parking_lot 0.12.3", "pin-project-lite", "signal-hook-registry", - "socket2", + "slab", + "socket2 0.6.0", "tokio-macros", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -6860,7 +6885,7 @@ dependencies = [ "postgres-protocol", "postgres-types", "rand 0.9.1", - "socket2", + "socket2 0.5.9", "tokio", "tokio-util", "whoami", @@ -7854,7 +7879,7 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] diff --git a/crates/cli/src/subcommands/subscribe.rs b/crates/cli/src/subcommands/subscribe.rs index c7b5058ba42..82255dc406e 100644 --- a/crates/cli/src/subcommands/subscribe.rs +++ b/crates/cli/src/subcommands/subscribe.rs @@ -2,7 +2,7 @@ use anyhow::Context; use clap::{value_parser, Arg, ArgAction, ArgMatches}; use futures::{Sink, SinkExt, TryStream, TryStreamExt}; use http::header; -use http::uri::Scheme; +use http::uri::{PathAndQuery, Scheme}; use serde_json::Value; use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat}; use spacetimedb_data_structures::map::HashMap; @@ -65,6 +65,13 @@ pub fn cli() -> clap::Command { .action(ArgAction::SetTrue) .help("Print the initial update for the queries."), ) + .arg( + Arg::new("confirmed") + .required(false) + .long("confirmed") + .action(ArgAction::SetTrue) + .help("Instruct the server to deliver only updates of confirmed transactions"), + ) .arg(common_args::anonymous()) .arg(common_args::yes()) .arg(common_args::server().help("The nickname, host name or URL of the server hosting the database")) @@ -130,6 +137,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error let num = args.get_one::("num-updates").copied(); let timeout = args.get_one::("timeout").copied(); let print_initial_update = args.get_flag("print_initial_update"); + let confirmed = args.get_flag("confirmed"); let conn = parse_req(config, args).await?; let api = ClientApi::new(conn); @@ -146,6 +154,9 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error s } }); + if confirmed { + append_query_param(&mut uri, ("confirmed", "true")); + } // Create the websocket request. let mut req = http::Uri::from_parts(uri)?.into_client_request()?; @@ -334,3 +345,21 @@ fn format_output_json(msg: &ws::DatabaseUpdate, schema: &RawModuleDe Ok(output) } + +fn append_query_param(uri: &mut http::uri::Parts, (k, v): (&str, &str)) { + let (mut path, query) = uri + .path_and_query + .as_ref() + .map(|pq| (pq.path().to_owned(), pq.query())) + .unwrap_or_default(); + path.push('?'); + if let Some(query) = query { + path.push_str(query); + path.push('&'); + } + path.push_str(k); + path.push('='); + path.push_str(v); + + uri.path_and_query = Some(PathAndQuery::from_maybe_shared(path).unwrap()); +} diff --git a/crates/client-api/src/lib.rs b/crates/client-api/src/lib.rs index ac82a3e142d..7fc2fc00aea 100644 --- a/crates/client-api/src/lib.rs +++ b/crates/client-api/src/lib.rs @@ -67,6 +67,7 @@ impl Host { &self, auth: AuthCtx, database: Database, + confirmed_read: bool, body: String, ) -> axum::response::Result>> { let module_host = self @@ -74,7 +75,7 @@ impl Host { .await .map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?; - let json = self + let (tx_offset, durable_offset, json) = self .host_controller .using_database( database, @@ -115,17 +116,27 @@ impl Host { .map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name))) .collect(); - Ok(vec![SqlStmtResult { - schema, - rows: result.rows, - total_duration_micros: total_duration.as_micros() as u64, - stats: SqlStmtStats::from_metrics(&result.metrics), - }]) + Ok(( + result.tx_offset, + db.durable_tx_offset(), + vec![SqlStmtResult { + schema, + rows: result.rows, + total_duration_micros: total_duration.as_micros() as u64, + stats: SqlStmtStats::from_metrics(&result.metrics), + }], + )) }, ) .await .map_err(log_and_500)??; + if confirmed_read { + if let Some(mut durable_offset) = durable_offset { + durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?; + } + } + Ok(json) } diff --git a/crates/client-api/src/routes/database.rs b/crates/client-api/src/routes/database.rs index 91a67f9c406..601d96744ce 100644 --- a/crates/client-api/src/routes/database.rs +++ b/crates/client-api/src/routes/database.rs @@ -382,12 +382,17 @@ pub struct SqlParams { } #[derive(Deserialize)] -pub struct SqlQueryParams {} +pub struct SqlQueryParams { + /// If `true`, return the query result only after its transaction offset + /// is confirmed to be durable. + #[serde(default)] + confirmed: bool, +} pub async fn sql( State(worker_ctx): State, Path(SqlParams { name_or_identity }): Path, - Query(SqlQueryParams {}): Query, + Query(SqlQueryParams { confirmed }): Query, Extension(auth): Extension, body: String, ) -> axum::response::Result @@ -410,7 +415,7 @@ where .await .map_err(log_and_500)? .ok_or(StatusCode::NOT_FOUND)?; - let json = host.exec_sql(auth, database, body).await?; + let json = host.exec_sql(auth, database, confirmed, body).await?; let total_duration = json.iter().fold(0, |acc, x| acc + x.total_duration_micros); diff --git a/crates/client-api/src/routes/subscribe.rs b/crates/client-api/src/routes/subscribe.rs index f59f8cd68dd..74f7caf5809 100644 --- a/crates/client-api/src/routes/subscribe.rs +++ b/crates/client-api/src/routes/subscribe.rs @@ -25,8 +25,8 @@ use spacetimedb::client::messages::{ serialize, IdentityTokenMessage, SerializableMessage, SerializeBuffer, SwitchedServerMessage, ToProtocol, }; use spacetimedb::client::{ - ClientActorId, ClientConfig, ClientConnection, DataMessage, MessageExecutionError, MessageHandleError, - MeteredReceiver, Protocol, + ClientActorId, ClientConfig, ClientConnection, ClientConnectionReceiver, DataMessage, MessageExecutionError, + MessageHandleError, MeteredReceiver, Protocol, }; use spacetimedb::host::module_host::ClientConnectedError; use spacetimedb::host::NoSuchModule; @@ -80,6 +80,12 @@ pub struct SubscribeQueryParams { /// This knob works by setting other, more specific, knobs to the value. #[serde(default)] pub light: bool, + /// If `true`, send the subscription updates only after the transaction + /// offset they're computed from is confirmed to be durable. + /// + /// If `false`, send them immediately. + #[serde(default)] + pub confirmed: bool, } pub fn generate_random_connection_id() -> ConnectionId { @@ -93,6 +99,7 @@ pub async fn handle_websocket( connection_id, compression, light, + confirmed, }): Query, forwarded_for: Option>, Extension(auth): Extension, @@ -127,6 +134,7 @@ where protocol, compression, tx_update_full: !light, + confirmed_reads: confirmed, }; // TODO: Should also maybe refactor the code and the protocol to allow a single websocket @@ -199,7 +207,7 @@ where "websocket: Database accepted connection from {client_log_string}; spawning ws_client_actor and ClientConnection" ); - let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx); + let actor = |client, receiver| ws_client_actor(ws_opts, client, ws, receiver); let client = ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await; @@ -213,7 +221,7 @@ where token: identity_token, connection_id, }; - if let Err(e) = client.send_message(message) { + if let Err(e) = client.send_message(None, message) { log::warn!("websocket: Error sending IdentityToken message to {client_log_string}: {e}"); } }); @@ -341,7 +349,7 @@ async fn ws_client_actor( options: WebSocketOptions, client: ClientConnection, ws: WebSocketStream, - sendrx: MeteredReceiver, + sendrx: ClientConnectionReceiver, ) { // ensure that even if this task gets cancelled, we always cleanup the connection let mut client = scopeguard::guard(client, |client| { @@ -357,7 +365,7 @@ async fn ws_client_actor_inner( client: &mut ClientConnection, config: WebSocketOptions, ws: WebSocketStream, - sendrx: MeteredReceiver, + sendrx: ClientConnectionReceiver, ) { let database = client.module.info().database_identity; let client_id = client.id; @@ -961,6 +969,33 @@ enum UnorderedWsMessage { Error(MessageExecutionError), } +/// Abstraction over [`ClientConnectionReceiver`], so tests can use a plain +/// [`mpsc::Receiver`]. +trait Receiver { + fn recv(&mut self) -> impl Future> + Send; + fn close(&mut self); +} + +impl Receiver for ClientConnectionReceiver { + async fn recv(&mut self) -> Option { + ClientConnectionReceiver::recv(self).await + } + + fn close(&mut self) { + ClientConnectionReceiver::close(self); + } +} + +impl Receiver for mpsc::Receiver { + async fn recv(&mut self) -> Option { + mpsc::Receiver::recv(self).await + } + + fn close(&mut self) { + mpsc::Receiver::close(self); + } +} + /// Sink that sends outgoing messages to the `ws` sink. /// /// Consumes `messages`, which yields subscription updates and reducer call @@ -986,10 +1021,9 @@ async fn ws_send_loop( state: Arc, config: ClientConfig, mut ws: impl Sink + Unpin, - mut messages: MeteredReceiver, + mut messages: impl Receiver, mut unordered: mpsc::UnboundedReceiver, ) { - let mut messages_buf = Vec::with_capacity(32); let mut serialize_buf = SerializeBuffer::new(config); loop { @@ -1053,26 +1087,24 @@ async fn ws_send_loop( } }, - n = messages.recv_many(&mut messages_buf, 32), if !closed => { - if n == 0 { - continue; - } - log::trace!("sending {n} outgoing messages"); - for msg in messages_buf.drain(..n) { - let (msg_alloc, res) = send_message( - &state.database, - config, - serialize_buf, - msg.workload().zip(msg.num_rows()), - &mut ws, - msg - ).await; - serialize_buf = msg_alloc; - - if let Err(e) = res { - log::warn!("websocket send error: {e}"); - return; - } + maybe_message = messages.recv(), if !closed => { + let Some(message) = maybe_message else { + break; + }; + log::trace!("sending outgoing message"); + let (msg_alloc, res) = send_message( + &state.database, + config, + serialize_buf, + message.workload().zip(message.num_rows()), + &mut ws, + message + ).await; + serialize_buf = msg_alloc; + + if let Err(e) = res { + log::warn!("websocket send error: {e}"); + return; } }, } @@ -1207,7 +1239,7 @@ mod tests { sink, stream, }; use pretty_assertions::assert_matches; - use spacetimedb::client::ClientName; + use spacetimedb::client::{messages::SerializableMessage, ClientName}; use tokio::time::sleep; use super::*; @@ -1385,10 +1417,15 @@ mod tests { async fn send_loop_terminates_when_unordered_closed() { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); - let send_loop = ws_send_loop(state, ClientConfig::for_test(), sink::drain(), messages, unordered_rx); + let send_loop = ws_send_loop( + state, + ClientConfig::for_test(), + sink::drain(), + messages_rx, + unordered_rx, + ); pin_mut!(send_loop); assert!(is_pending(&mut send_loop).await); @@ -1403,14 +1440,13 @@ mod tests { async fn send_loop_close_message_closes_state_and_messages() { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), sink::drain(), - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); @@ -1451,24 +1487,23 @@ mod tests { })), ]; - for msg in input { + for message in input { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), UnfeedableSink, - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); - match msg { + match message { Either::Left(unordered) => unordered_tx.send(unordered).unwrap(), - Either::Right(msg) => messages_tx.send(msg).await.unwrap(), + Either::Right(message) => messages_tx.send(message).await.unwrap(), } send_loop.await; } @@ -1498,24 +1533,23 @@ mod tests { })), ]; - for msg in input { + for message in input { let state = Arc::new(dummy_actor_state()); let (messages_tx, messages_rx) = mpsc::channel(64); - let messages = MeteredReceiver::new(messages_rx); let (unordered_tx, unordered_rx) = mpsc::unbounded_channel(); let send_loop = ws_send_loop( state.clone(), ClientConfig::for_test(), UnflushableSink, - messages, + messages_rx, unordered_rx, ); pin_mut!(send_loop); - match msg { + match message { Either::Left(unordered) => unordered_tx.send(unordered).unwrap(), - Either::Right(msg) => messages_tx.send(msg).await.unwrap(), + Either::Right(message) => messages_tx.send(message).await.unwrap(), } send_loop.await; } diff --git a/crates/core/src/client.rs b/crates/core/src/client.rs index 11103824dca..d6fab69b671 100644 --- a/crates/core/src/client.rs +++ b/crates/core/src/client.rs @@ -7,8 +7,8 @@ mod message_handlers; pub mod messages; pub use client_connection::{ - ClientConfig, ClientConnection, ClientConnectionSender, ClientSendError, DataMessage, MeteredDeque, - MeteredReceiver, Protocol, + ClientConfig, ClientConnection, ClientConnectionReceiver, ClientConnectionSender, ClientSendError, ClientUpdate, + DataMessage, MeteredDeque, MeteredReceiver, Protocol, }; pub use client_connection_index::ClientActorIndex; pub use message_handlers::{MessageExecutionError, MessageHandleError}; diff --git a/crates/core/src/client/client_connection.rs b/crates/core/src/client/client_connection.rs index cf3e5511fff..a69b4ae68c0 100644 --- a/crates/core/src/client/client_connection.rs +++ b/crates/core/src/client/client_connection.rs @@ -25,6 +25,7 @@ use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CallReducerFlags, Compression, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, Unsubscribe, UnsubscribeMulti, }; +use spacetimedb_durability::{DurableOffset, TxOffset}; use spacetimedb_lib::identity::RequestId; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::Identity; @@ -63,6 +64,10 @@ pub struct ClientConfig { /// rather than [`TransactionUpdateLight`]s on a successful update. // TODO(centril): As more knobs are added, make this into a bitfield (when there's time). pub tx_update_full: bool, + /// If `true`, the client requests to receive updates for transactions + /// confirmed to be durable. If `false`, updates will be delivered + /// immediately. + pub confirmed_reads: bool, } impl ClientConfig { @@ -71,15 +76,120 @@ impl ClientConfig { protocol: Protocol::Binary, compression: <_>::default(), tx_update_full: true, + confirmed_reads: false, } } } +/// A message to be sent to the client, along with the transaction offset it +/// was computed at, if available. +/// +// TODO: Consider a different name, "ClientUpdate" is used elsewhere already. +#[derive(Debug)] +pub struct ClientUpdate { + /// Transaction offset at which `message` was computed. + /// + /// This is only `Some` if `message` is a query result. + /// + /// If `Some` and [`ClientConfig::confirmed_reads`] is `true`, + /// [`ClientConnectionReceiver`] will delay delivery until the durable + /// offset of the database is equal to or greater than `tx_offset`. + pub tx_offset: Option, + /// Type-erased outgoing message. + pub message: SerializableMessage, +} + +/// Receiving end of [`ClientConnectionSender`]. +/// +/// The [`ClientConnection`] actor reads messages from this channel and sends +/// them to the client over its websocket connection. +/// +/// The [`ClientConnectionReceiver`] takes care of confirmed reads semantics, +/// if requested by the client. +pub struct ClientConnectionReceiver { + confirmed_reads: bool, + channel: MeteredReceiver, + module: ModuleHost, + module_rx: watch::Receiver, +} + +impl ClientConnectionReceiver { + /// Receive the next message from this channel. + /// + /// If this method returns `None`, the channel is closed and no more messages + /// are in the internal buffers. No more messages can ever be received from + /// the channel. + /// + /// Messages are returned immediately if: + /// + /// - The (internal) [`ClientUpdate`] does not have a `tx_offset` + /// (such as for error messages). + /// - The client hasn't requested confirmed reads (i.e. + /// [`ClientConfig::confirmed_reads`] is `false`). + /// - The database is configured to not persist transactions. + /// + /// Otherwise, the update's `tx_offset` is compared against the module's + /// durable offset. If the durable offset is behind the `tx_offset`, the + /// method waits until it catches up before returning the message. + /// + /// If the database is shut down while waiting for the durable offset, + /// `None` is returned. In this case, no more messages can ever be received + /// from the channel. + // + // TODO: Can we make a cancel-safe `recv_many` with confirmed reads semantics? + pub async fn recv(&mut self) -> Option { + let ClientUpdate { tx_offset, message } = self.channel.recv().await?; + if !self.confirmed_reads { + return Some(message); + } + + if let Some(tx_offset) = tx_offset { + match self.durable_tx_offset().await { + Ok(Some(mut durable)) => { + durable.wait_for(tx_offset).await.ok()?; + } + // Database shut down or crashed. + Err(NoSuchModule) => return None, + // In-memory database. + Ok(None) => return Some(message), + } + } + + Some(message) + } + + /// Close the receiver without dropping it. + /// + /// This is used to notify the [`ClientConnectionSender`] that the receiver + /// will not consume any more messages from the channel, usually because the + /// connection has been closed or is about to be closed. + /// + /// After calling this method, the sender will not be able to send more + /// messages, preventing the internal buffer from filling up. + pub fn close(&mut self) { + self.channel.close(); + } + + /// Check whether the module has been updated or shut down, and return + /// its [`DurableOffset`]. + /// + /// Returns `Err(NoSuchModule)` if the module has exited. + /// Returns `Some(None)` if the module is live but configured without durability. + /// Otherwise, returns `Some(DurableOffset)`. + async fn durable_tx_offset(&mut self) -> Result, NoSuchModule> { + if self.module_rx.has_changed().map_err(|_| NoSuchModule)? { + self.module = self.module_rx.borrow_and_update().clone(); + } + + Ok(self.module.replica_ctx().relational_db.durable_tx_offset()) + } +} + #[derive(Debug)] pub struct ClientConnectionSender { pub id: ClientActorId, pub config: ClientConfig, - sendtx: mpsc::Sender, + sendtx: mpsc::Sender, abort_handle: AbortHandle, cancelled: AtomicBool, @@ -136,7 +246,7 @@ pub enum ClientSendError { } impl ClientConnectionSender { - pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, MeteredReceiver) { + pub fn dummy_with_channel(id: ClientActorId, config: ClientConfig) -> (Self, MeteredReceiver) { let (sendtx, rx) = mpsc::channel(1); // just make something up, it doesn't need to be attached to a real task let abort_handle = match tokio::runtime::Handle::try_current() { @@ -167,11 +277,18 @@ impl ClientConnectionSender { /// Send a message to the client. For data-related messages, you should probably use /// `BroadcastQueue::send` to ensure that the client sees data messages in a consistent order. - pub fn send_message(&self, message: impl Into) -> Result<(), ClientSendError> { - self.send(message.into()) + pub fn send_message( + &self, + tx_offset: Option, + message: impl Into, + ) -> Result<(), ClientSendError> { + self.send(ClientUpdate { + tx_offset, + message: message.into(), + }) } - fn send(&self, message: SerializableMessage) -> Result<(), ClientSendError> { + fn send(&self, message: ClientUpdate) -> Result<(), ClientSendError> { if self.cancelled.load(Relaxed) { return Err(ClientSendError::Cancelled); } @@ -432,7 +549,7 @@ impl ClientConnection { config: ClientConfig, replica_id: u64, mut module_rx: watch::Receiver, - actor: impl FnOnce(ClientConnection, MeteredReceiver) -> Fut, + actor: impl FnOnce(ClientConnection, ClientConnectionReceiver) -> Fut, _proof_of_client_connected_call: Connected, ) -> ClientConnection where @@ -444,7 +561,7 @@ impl ClientConnection { // them and stuff. Not right now though. let module = module_rx.borrow_and_update().clone(); - let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); + let (sendtx, sendrx) = mpsc::channel::(CLIENT_CHANNEL_CAPACITY); let (fut_tx, fut_rx) = oneshot::channel::(); // weird dance so that we can get an abort_handle into ClientConnection @@ -467,7 +584,12 @@ impl ClientConnection { .abort_handle(); let metrics = ClientConnectionMetrics::new(database_identity, config.protocol); - let sendrx = MeteredReceiver::with_gauge(sendrx, metrics.sendtx_queue_size.clone()); + let receiver = ClientConnectionReceiver { + confirmed_reads: config.confirmed_reads, + channel: MeteredReceiver::with_gauge(sendrx, metrics.sendtx_queue_size.clone()), + module: module.clone(), + module_rx: module_rx.clone(), + }; let sender = Arc::new(ClientConnectionSender { id, @@ -484,7 +606,7 @@ impl ClientConnection { module_rx, }; - let actor_fut = actor(this.clone(), sendrx); + let actor_fut = actor(this.clone(), receiver); // if this fails, the actor() function called .abort(), which like... okay, I guess? let _ = fut_tx.send(actor_fut); diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 6fec0c1b62b..872d93093b6 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -32,7 +32,7 @@ use spacetimedb_datastore::{ }, traits::TxData, }; -use spacetimedb_durability::{self as durability, TxOffset}; +use spacetimedb_durability as durability; use spacetimedb_lib::db::auth::StAccess; use spacetimedb_lib::db::raw_def::v9::{btree, RawModuleDefV9Builder, RawSql}; use spacetimedb_lib::st_var::StVarValue; @@ -63,6 +63,8 @@ use std::path::Path; use std::sync::Arc; use tokio::sync::watch; +pub use durability::{DurableOffset, TxOffset}; + // NOTE(cloutiertyler): We should be using the associated types, but there is // a bug in the Rust compiler that prevents us from doing so. pub type MutTx = MutTxId; //::MutTx; @@ -369,7 +371,9 @@ impl RelationalDB { .as_ref() .map(|pair| pair.0.clone()) .as_deref() - .and_then(|durability| durability.durable_tx_offset()); + .map(|durability| durability.durable_tx_offset().get()) + .transpose()? + .flatten(); let (min_commitlog_offset, _) = history.tx_range_hint(); log::info!("[{database_identity}] DATABASE: durable_tx_offset is {durable_tx_offset:?}"); @@ -801,18 +805,18 @@ impl RelationalDB { } #[tracing::instrument(level = "trace", skip_all)] - pub fn release_tx(&self, tx: Tx) -> (TxMetrics, String) { + pub fn release_tx(&self, tx: Tx) -> (TxOffset, TxMetrics, String) { log::trace!("RELEASE TX"); self.inner.release_tx(tx) } #[tracing::instrument(level = "trace", skip_all)] - pub fn commit_tx(&self, tx: MutTx) -> Result, DBError> { + pub fn commit_tx(&self, tx: MutTx) -> Result, DBError> { log::trace!("COMMIT MUT TX"); // TODO: Never returns `None` -- should it? let reducer_context = tx.ctx.reducer_context().cloned(); - let Some((tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { + let Some((tx_offset, tx_data, tx_metrics, reducer)) = self.inner.commit_mut_tx(tx)? else { return Ok(None); }; @@ -822,7 +826,7 @@ impl RelationalDB { Self::do_durability(&**durability, reducer_context.as_ref(), &tx_data) } - Ok(Some((tx_data, tx_metrics, reducer))) + Ok(Some((tx_offset, tx_data, tx_metrics, reducer))) } #[tracing::instrument(level = "trace", skip_all)] @@ -898,6 +902,14 @@ impl RelationalDB { } } + /// Get the [`DurableOffset`] of this database, or `None` if this is an + /// in-memory instance. + pub fn durable_tx_offset(&self) -> Option { + self.durability + .as_ref() + .map(|durability| durability.durable_tx_offset()) + } + /// Decide based on the `committed_state.next_tx_offset` /// whether to request that the [`SnapshotWorker`] in `self` capture a snapshot of the database. /// @@ -1000,7 +1012,7 @@ impl RelationalDB { { let mut tx = self.begin_tx(workload); let res = f(&mut tx); - let (tx_metrics, reducer) = self.release_tx(tx); + let (_tx_offset, tx_metrics, reducer) = self.release_tx(tx); self.report_read_tx_metrics(reducer, tx_metrics); res } @@ -1015,7 +1027,7 @@ impl RelationalDB { self.report_mut_tx_metrics(reducer, tx_metrics, None); } else { match self.commit_tx(tx).map_err(E::from)? { - Some((tx_data, tx_metrics, reducer)) => { + Some((_tx_offset, tx_data, tx_metrics, reducer)) => { self.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } None => panic!("TODO: retry?"), @@ -2092,7 +2104,7 @@ mod tests { fn table( name: &str, columns: ProductType, - f: impl FnOnce(RawTableDefBuilder) -> RawTableDefBuilder, + f: impl FnOnce(RawTableDefBuilder<'_>) -> RawTableDefBuilder, ) -> TableSchema { let mut builder = RawModuleDefV9Builder::new(); f(builder.build_table_with_new_type(name, columns, true)); diff --git a/crates/core/src/error.rs b/crates/core/src/error.rs index 4994ef69431..b4e9030be9f 100644 --- a/crates/core/src/error.rs +++ b/crates/core/src/error.rs @@ -6,6 +6,7 @@ use std::sync::{MutexGuard, PoisonError}; use enum_as_inner::EnumAsInner; use hex::FromHexError; use spacetimedb_commitlog::repo::TxOffset; +use spacetimedb_durability::DurabilityExited; use spacetimedb_expr::errors::TypingError; use spacetimedb_lib::Identity; use spacetimedb_schema::error::ValidationErrors; @@ -144,6 +145,8 @@ pub enum DBError { }, #[error(transparent)] RestoreSnapshot(#[from] RestoreSnapshotError), + #[error(transparent)] + DurabilityGone(#[from] DurabilityExited), } impl DBError { diff --git a/crates/core/src/host/module_host.rs b/crates/core/src/host/module_host.rs index 29749f651db..f46430d2473 100644 --- a/crates/core/src/host/module_host.rs +++ b/crates/core/src/host/module_host.rs @@ -32,6 +32,7 @@ use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap}; use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType}; use spacetimedb_datastore::locking_tx_datastore::MutTxId; use spacetimedb_datastore::traits::{IsolationLevel, Program, TxData}; +use spacetimedb_durability::DurableOffset; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::db::raw_def::v9::Lifecycle; use spacetimedb_lib::identity::{AuthCtx, RequestId}; @@ -395,7 +396,7 @@ fn init_database( let rcr = match module_def.lifecycle_reducer(Lifecycle::Init) { None => { - if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { + if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } None @@ -1110,6 +1111,7 @@ impl ModuleHost { let metrics = self .on_module_thread("one_off_query", move || { db.with_read_only(Workload::Sql, |tx| { + let tx_offset = tx.tx_offset(); // We wrap the actual query in a closure so we can use ? to handle errors without making // the entire transaction abort with an error. let result: Result<(OneOffTable, ExecutionMetrics), anyhow::Error> = (|| { @@ -1174,7 +1176,7 @@ impl ModuleHost { ), }; - subscriptions.send_client_message(client, message, tx)?; + subscriptions.send_client_message(client, message, (tx, tx_offset))?; Ok::, anyhow::Error>(metrics) }) }) @@ -1224,6 +1226,10 @@ impl ModuleHost { &self.replica_ctx().database } + pub fn durable_tx_offset(&self) -> Option { + self.replica_ctx().relational_db.durable_tx_offset() + } + pub(crate) fn replica_ctx(&self) -> &ReplicaContext { self.module.replica_ctx() } diff --git a/crates/core/src/host/wasm_common/module_host_actor.rs b/crates/core/src/host/wasm_common/module_host_actor.rs index c337140ac94..c65d047a690 100644 --- a/crates/core/src/host/wasm_common/module_host_actor.rs +++ b/crates/core/src/host/wasm_common/module_host_actor.rs @@ -272,7 +272,7 @@ impl ModuleInstance for WasmModuleInstance { Ok(UpdateDatabaseResult::ErrorExecutingMigration(e)) } Ok(()) => { - if let Some((tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { + if let Some((_tx_offset, tx_data, tx_metrics, reducer)) = stdb.commit_tx(tx)? { stdb.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); } self.system_logger().info("Database updated"); @@ -485,7 +485,7 @@ impl WasmModuleInstance { request_id, timer, }; - let (event, _) = match self + let (_tx_offset, event, _) = match self .info .subscriptions .commit_and_broadcast_event(client, event, tx) diff --git a/crates/core/src/sql/execute.rs b/crates/core/src/sql/execute.rs index d12e811ca7a..375184179ab 100644 --- a/crates/core/src/sql/execute.rs +++ b/crates/core/src/sql/execute.rs @@ -13,9 +13,13 @@ use crate::subscription::tx::DeltaTx; use crate::util::slow::SlowQueryLogger; use crate::vm::{check_row_limit, DbProgram, TxMode}; use anyhow::anyhow; +use scopeguard::ScopeGuard; use spacetimedb_datastore::execution_context::Workload; +use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; -use spacetimedb_datastore::traits::IsolationLevel; +use spacetimedb_datastore::locking_tx_datastore::TxId; +use spacetimedb_datastore::traits::{IsolationLevel, TxData}; +use spacetimedb_durability::TxOffset; use spacetimedb_expr::statement::Statement; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -172,6 +176,11 @@ pub fn execute_sql_tx<'a>( } pub struct SqlResult { + /// The offset of the SQL operation's transaction. + /// + /// Used to determine visibility of the transaction wrt the durability + /// requirements requestest by the caller. + pub tx_offset: TxOffset, pub rows: Vec, /// These metrics will be reported via `report_tx_metrics`. /// They should not be reported separately to avoid double counting. @@ -199,16 +208,20 @@ pub fn run( // Up to this point, the tx has been read-only, // and hence there are no deltas to process. let (tx_data, tx_metrics_mut, tx) = tx.commit_downgrade(Workload::Sql); + let tx_data = Arc::new(tx_data); + let mut tx_metrics_mut = Some(tx_metrics_mut); + + let release_tx = + |tx: TxId, tx_data: Arc, tx_metrics_mut: Option| -> (TxOffset, ExecutionMetrics) { + let execution_metrics = tx.metrics; + let (offset, tx_metrics_downgrade, reducer) = db.release_tx(tx); + db.report_tx_metrics(reducer, Some(tx_data), tx_metrics_mut, Some(tx_metrics_downgrade)); + (offset, execution_metrics) + }; // Release the tx on drop, so that we record metrics. let mut tx = scopeguard::guard(tx, |tx| { - let (tx_metrics_downgrade, reducer) = db.release_tx(tx); - db.report_tx_metrics( - reducer, - Some(Arc::new(tx_data)), - Some(tx_metrics_mut), - Some(tx_metrics_downgrade), - ); + release_tx(tx, tx_data.clone(), tx_metrics_mut.take()); }); // Compute the header for the result set @@ -231,9 +244,13 @@ pub fn run( // Update transaction metrics tx.metrics.merge(metrics); + // Release the tx and get the tx offset. + let (tx_offset, metrics) = release_tx(ScopeGuard::into_inner(tx), tx_data, tx_metrics_mut.take()); + Ok(SqlResult { + tx_offset, rows, - metrics: tx.metrics, + metrics, }) } Statement::DML(stmt) => { @@ -252,10 +269,13 @@ pub fn run( if subs.is_none() { let metrics = tx.metrics; return db.commit_tx(tx).map(|tx_opt| { - if let Some((tx_data, tx_metrics, reducer)) = tx_opt { - db.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); + let (tx_offset, tx_data, tx_metrics, reducer) = tx_opt.unwrap(); + db.report_mut_tx_metrics(reducer, tx_metrics, Some(tx_data)); + SqlResult { + tx_offset, + rows: vec![], + metrics, } - SqlResult { rows: vec![], metrics } }); } @@ -289,7 +309,11 @@ pub fn run( Err(WriteConflict) => { todo!("See module_host_actor::call_reducer_with_tx") } - Ok(_) => Ok(SqlResult { rows: vec![], metrics }), + Ok((tx_offset, _, _)) => Ok(SqlResult { + tx_offset, + rows: vec![], + metrics, + }), } } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 48c2af5fb95..12c115bf5e0 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1,6 +1,7 @@ use super::execution_unit::QueryHash; use super::module_subscription_manager::{ spawn_send_worker, BroadcastError, BroadcastQueue, Plan, SubscriptionGaugeStats, SubscriptionManager, + TransactionOffset, }; use super::query::compile_query_with_hashes; use super::tx::DeltaTx; @@ -22,13 +23,17 @@ use crate::vm::check_row_limit; use crate::worker_metrics::WORKER_METRICS; use parking_lot::RwLock; use prometheus::{Histogram, HistogramTimer, IntCounter, IntGauge}; +use scopeguard::ScopeGuard; use spacetimedb_client_api_messages::websocket::{ self as ws, BsatnFormat, FormatSwitch, JsonFormat, SubscribeMulti, SubscribeSingle, TableUpdate, Unsubscribe, UnsubscribeMulti, }; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::execution_context::{Workload, WorkloadType}; +use spacetimedb_datastore::locking_tx_datastore::datastore::TxMetrics; use spacetimedb_datastore::locking_tx_datastore::TxId; +use spacetimedb_datastore::traits::TxData; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::pipelined::PipelinedProject; use spacetimedb_lib::identity::AuthCtx; use spacetimedb_lib::metrics::ExecutionMetrics; @@ -299,6 +304,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None::, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -316,10 +322,7 @@ impl ModuleSubscriptions { let hash = QueryHash::from_string(&sql, auth.caller, false); let hash_with_param = QueryHash::from_string(&sql, auth.caller, true); - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.begin_tx(Workload::Subscribe); let existing_query = { let guard = self.subscriptions.read(); @@ -365,6 +368,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx.tx_offset()), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -390,6 +394,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None::, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -418,10 +423,7 @@ impl ModuleSubscriptions { return Ok(None); }; - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.begin_tx(Workload::Unsubscribe); let auth = AuthCtx::new(self.owner_identity, sender.id.identity); let (table_rows, metrics) = return_on_err_with_sql!( self.evaluate_initial_subscription(sender.clone(), query.clone(), &tx, &auth, TableUpdateType::Unsubscribe), @@ -438,6 +440,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx.tx_offset()), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -464,6 +467,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { self.broadcast_queue.send_client_message( sender.clone(), + None::, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -480,10 +484,7 @@ impl ModuleSubscriptions { let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Unsubscribe); // Always lock the db before the subscription lock to avoid deadlocks. - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Unsubscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.begin_tx(Workload::Unsubscribe); let removed_queries = { let _compile_timer = subscription_metrics.compilation_time.start_timer(); @@ -527,6 +528,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender, + Some(tx.tx_offset()), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -576,10 +578,7 @@ impl ModuleSubscriptions { let auth = AuthCtx::new(self.owner_identity, sender); // We always get the db lock before the subscription lock to avoid deadlocks. - let tx = scopeguard::guard(self.relational_db.begin_tx(Workload::Subscribe), |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.begin_tx(Workload::Subscribe); let compile_timer = metrics.compilation_time.start_timer(); @@ -631,9 +630,10 @@ impl ModuleSubscriptions { &self, recipient: Arc, message: impl Into, - _tx_id: &TxId, + (_tx, tx_offset): (&TxId, impl Into), ) -> Result<(), BroadcastError> { - self.broadcast_queue.send_client_message(recipient, message) + self.broadcast_queue + .send_client_message(recipient, Some(tx_offset.into()), message) } #[tracing::instrument(level = "trace", skip_all)] @@ -648,6 +648,7 @@ impl ModuleSubscriptions { let send_err_msg = |message| { let _ = self.broadcast_queue.send_client_message( sender.clone(), + None::, SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -678,10 +679,7 @@ impl ModuleSubscriptions { send_err_msg, None ); - let tx = scopeguard::guard(tx, |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.guard_tx(tx, None, None); // We minimize locking so that other clients can add subscriptions concurrently. // We are protected from race conditions with broadcasts, because we have the db lock, @@ -738,6 +736,7 @@ impl ModuleSubscriptions { let _ = self.broadcast_queue.send_client_message( sender.clone(), + Some(tx.tx_offset()), SubscriptionMessage { request_id: Some(request.request_id), query_id: Some(request.query_id), @@ -772,10 +771,7 @@ impl ModuleSubscriptions { num_queries, &subscription_metrics, )?; - let tx = scopeguard::guard(tx, |tx| { - let (tx_metrics, reducer) = self.relational_db.release_tx(tx); - self.relational_db.report_read_tx_metrics(reducer, tx_metrics); - }); + let tx = self.guard_tx(tx, None, None); check_row_limit( &queries, @@ -830,6 +826,7 @@ impl ModuleSubscriptions { // Holding a write lock on `self.subscriptions` would also be sufficient. let _ = self.broadcast_queue.send_client_message( sender, + Some(tx.tx_offset()), SubscriptionUpdateMessage { database_update, request_id: Some(subscription.request_id), @@ -854,7 +851,7 @@ impl ModuleSubscriptions { caller: Option>, mut event: ModuleEvent, tx: MutTx, - ) -> Result, ExecutionMetrics), WriteConflict>, DBError> { + ) -> Result, ExecutionMetrics), WriteConflict>, DBError> { let database_identity = self.relational_db.database_identity(); let subscription_metrics = SubscriptionMetrics::new(&database_identity, &WorkloadType::Update); @@ -887,11 +884,7 @@ impl ModuleSubscriptions { let tx_data = tx_data.map(Arc::new); // When we're done with this method, release the tx and report metrics. - let mut read_tx = scopeguard::guard(read_tx, |tx| { - let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx); - self.relational_db - .report_tx_metrics(reducer, tx_data.clone(), Some(tx_metrics_mut), Some(tx_metrics_read)); - }); + let mut read_tx = self.guard_tx(read_tx, tx_data.clone(), Some(tx_metrics_mut)); // Create the delta transaction we'll use to eval updates against. let delta_read_tx = tx_data .as_ref() @@ -904,7 +897,8 @@ impl ModuleSubscriptions { match &event.status { EventStatus::Committed(_) => { - update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller); + update_metrics = + subscriptions.eval_updates_sequential((&delta_read_tx, read_tx.tx_offset()), event.clone(), caller); } EventStatus::Failed(_) => { if let Some(client) = caller { @@ -913,7 +907,9 @@ impl ModuleSubscriptions { database_update: SubscriptionUpdateMessage::default_for_protocol(client.config.protocol, None), }; - let _ = self.broadcast_queue.send_client_message(client, message); + let _ = self + .broadcast_queue + .send_client_message(client, Some(read_tx.tx_offset()), message); } else { log::trace!("Reducer failed but there is no client to send the failure to!") } @@ -924,7 +920,32 @@ impl ModuleSubscriptions { // Merge in the subscription evaluation metrics. read_tx.metrics.merge(update_metrics); - Ok(Ok((event, update_metrics))) + let tx_offset = self.release_tx(ScopeGuard::into_inner(read_tx), None, None); + Ok(Ok((tx_offset, event, update_metrics))) + } + + // NOTE: Do not `mem::forget` the `ScopeGuard`. + fn begin_tx(&self, workload: Workload) -> ScopeGuard { + self.guard_tx(self.relational_db.begin_tx(workload), None, None) + } + + // NOTE: Do not `mem::forget` the `ScopeGuard`. + fn guard_tx( + &self, + tx: TxId, + tx_data: Option>, + tx_metrics_mut: Option, + ) -> ScopeGuard { + scopeguard::guard(tx, |tx| { + self.release_tx(tx, tx_data, tx_metrics_mut); + }) + } + + fn release_tx(&self, tx: TxId, tx_data: Option>, tx_metrics_mut: Option) -> TxOffset { + let (tx_offset, tx_metrics, reducer) = self.relational_db.release_tx(tx); + self.relational_db + .report_tx_metrics(reducer, tx_data, tx_metrics_mut, Some(tx_metrics)); + tx_offset } } @@ -937,7 +958,9 @@ mod tests { SerializableMessage, SubscriptionData, SubscriptionError, SubscriptionMessage, SubscriptionResult, SubscriptionUpdateMessage, TransactionUpdateMessage, }; - use crate::client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName, MeteredReceiver, Protocol}; + use crate::client::{ + ClientActorId, ClientConfig, ClientConnectionSender, ClientName, ClientUpdate, MeteredReceiver, Protocol, + }; use crate::db::relational_db::tests_utils::{ begin_mut_tx, begin_tx, insert, with_auto_commit, with_read_only, TestDB, }; @@ -1077,22 +1100,21 @@ mod tests { fn client_connection_with_compression( client_id: ClientActorId, compression: Compression, - ) -> (Arc, MeteredReceiver) { + ) -> (Arc, MeteredReceiver) { let (sender, rx) = ClientConnectionSender::dummy_with_channel( client_id, ClientConfig { protocol: Protocol::Binary, compression, tx_update_full: true, + confirmed_reads: false, }, ); (Arc::new(sender), rx) } /// Instantiate a client connection - fn client_connection( - client_id: ClientActorId, - ) -> (Arc, MeteredReceiver) { + fn client_connection(client_id: ClientActorId) -> (Arc, MeteredReceiver) { client_connection_with_compression(client_id, Compression::None) } @@ -1166,21 +1188,25 @@ mod tests { /// Pull a message from receiver and assert that it is a `TxUpdate` with the expected rows async fn assert_tx_update_for_table( - rx: &mut MeteredReceiver, + rx: &mut MeteredReceiver, table_id: TableId, schema: &ProductType, inserts: impl IntoIterator, deletes: impl IntoIterator, ) { match rx.recv().await { - Some(SerializableMessage::TxUpdate(TransactionUpdateMessage { - database_update: - SubscriptionUpdateMessage { - database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { mut tables }), + Some(ClientUpdate { + tx_offset: _, + message: + SerializableMessage::TxUpdate(TransactionUpdateMessage { + database_update: + SubscriptionUpdateMessage { + database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { mut tables }), + .. + }, .. - }, - .. - })) => { + }), + }) => { // Assume an update for only one table assert_eq!(tables.len(), 1); @@ -1257,7 +1283,7 @@ mod tests { db.insert(&mut tx, table_id, &bsatn::to_vec(&row)?)?; } - let Ok(Ok((_, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else { + let Ok(Ok((_, _, metrics))) = subs.commit_and_broadcast_event(None, module_event(), tx) else { panic!("Encountered an error in `commit_and_broadcast_event`"); }; Ok(metrics) @@ -1302,11 +1328,15 @@ mod tests { Ok(()) } - fn check_subscription_err(sql: &str, result: Option) { - if let Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::Error(SubscriptionError { message, .. }), - .. - })) = result + fn check_subscription_err(sql: &str, result: Option) { + if let Some(ClientUpdate { + tx_offset: _, + message: + SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::Error(SubscriptionError { message, .. }), + .. + }), + }) = result { assert!( message.contains(sql), @@ -1387,10 +1417,13 @@ mod tests { // The initial subscription should succeed assert!(matches!( rx.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::Subscribe(..), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::Subscribe(..), + .. + }) + }) )); // Remove the index from `id` @@ -1443,10 +1476,13 @@ mod tests { // The initial subscription should succeed assert!(matches!( rx.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(..), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(..), + .. + }) + }) )); // Remove the index from `id` @@ -1492,10 +1528,13 @@ mod tests { // The initial subscription should succeed assert!(matches!( rx.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::Subscribe(..), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::Subscribe(..), + .. + }) + }) )); // Remove the index from `s` @@ -1561,11 +1600,17 @@ mod tests { // Wait for both subscriptions assert!(matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(_)) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) )); assert!(matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(_)) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) )); // Insert two identities - one for each caller - into the table @@ -1631,11 +1676,17 @@ mod tests { // Wait for both subscriptions assert!(matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(_)) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) )); assert!(matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(_)) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) )); // Insert a row into `u` for client "a". @@ -1685,17 +1736,23 @@ mod tests { // Wait for both subscriptions assert_matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); assert_matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); let schema = ProductType::from([AlgebraicType::identity()]); @@ -1754,7 +1811,13 @@ mod tests { subscribe_multi(&subs, &["select * from t where x = 0"], tx, &mut 0)?; // Wait to receive the initial subscription message - assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)))); + assert!(matches!( + rx.recv().await, + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) + )); // Insert a row that does not match the query let mut tx = begin_mut_tx(&db); @@ -1812,13 +1875,17 @@ mod tests { // Assert the table updates within this message are all be uncompressed match rx.recv().await { - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: - SubscriptionResult::SubscribeMulti(SubscriptionData { - data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }), + Some(ClientUpdate { + tx_offset: _, + message: + SerializableMessage::Subscription(SubscriptionMessage { + result: + SubscriptionResult::SubscribeMulti(SubscriptionData { + data: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }), + }), + .. }), - .. - })) => { + }) => { assert!(tables.iter().all(|TableUpdate { updates, .. }| updates .iter() .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_))))); @@ -1845,7 +1912,13 @@ mod tests { subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?; // Wait to receive the initial subscription message - assert_matches!(rx.recv().await, Some(SerializableMessage::Subscription(_))); + assert_matches!( + rx.recv().await, + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) + ); let schema = ProductType::from([AlgebraicType::U8, AlgebraicType::U8]); @@ -1898,7 +1971,13 @@ mod tests { subscribe_multi(&subs, &["select * from t"], tx, &mut 0)?; // Wait to receive the initial subscription message - assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)))); + assert!(matches!( + rx.recv().await, + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) + )); // Insert a lot of rows into `t`. // We want to insert enough to cross any threshold there might be for compression. @@ -1906,14 +1985,18 @@ mod tests { // Assert the table updates within this message are all be uncompressed match rx.recv().await { - Some(SerializableMessage::TxUpdate(TransactionUpdateMessage { - database_update: - SubscriptionUpdateMessage { - database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }), + Some(ClientUpdate { + tx_offset: _, + message: + SerializableMessage::TxUpdate(TransactionUpdateMessage { + database_update: + SubscriptionUpdateMessage { + database_update: FormatSwitch::Bsatn(ws::DatabaseUpdate { tables }), + .. + }, .. - }, - .. - })) => { + }), + }) => { assert!(tables.iter().all(|TableUpdate { updates, .. }| updates .iter() .all(|query_update| matches!(query_update, CompressableQueryUpdate::Uncompressed(_))))); @@ -1948,7 +2031,13 @@ mod tests { subscribe_multi(&subs, queries, sender, &mut 0)?; - assert!(matches!(rx.recv().await, Some(SerializableMessage::Subscription(_)))); + assert!(matches!( + rx.recv().await, + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(_) + }) + )); // Insert two matching player rows commit_tx( @@ -2109,17 +2198,23 @@ mod tests { // Wait for both subscriptions assert!(matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) )); assert!(matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) )); // Modify a single row in `v` @@ -2246,10 +2341,13 @@ mod tests { assert_matches!( rx.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); // Insert a new row into `u` that joins with `x = 1` @@ -2390,17 +2488,23 @@ mod tests { // Wait for both subscriptions assert_matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); assert_matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); // Insert a new row into `u` @@ -2482,27 +2586,36 @@ mod tests { // Wait for both subscriptions assert_matches!( rx_for_a.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); assert_matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); unsubscribe_multi(&subs, tx_for_b, query_ids)?; assert_matches!( rx_for_b.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::UnsubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::UnsubscribeMulti(_), + .. + }) + }) ); // Insert a new row into `u` @@ -2570,10 +2683,13 @@ mod tests { assert_matches!( rx.recv().await, - Some(SerializableMessage::Subscription(SubscriptionMessage { - result: SubscriptionResult::SubscribeMulti(_), - .. - })) + Some(ClientUpdate { + tx_offset: _, + message: SerializableMessage::Subscription(SubscriptionMessage { + result: SubscriptionResult::SubscribeMulti(_), + .. + }) + }) ); assert_eq!(metrics.rows_scanned, 0); diff --git a/crates/core/src/subscription/module_subscription_manager.rs b/crates/core/src/subscription/module_subscription_manager.rs index 9820a739dc8..2545440fae7 100644 --- a/crates/core/src/subscription/module_subscription_manager.rs +++ b/crates/core/src/subscription/module_subscription_manager.rs @@ -11,7 +11,7 @@ use crate::messages::websocket::{self as ws, TableUpdate}; use crate::subscription::delta::eval_delta; use crate::subscription::websocket_building::BuildableWebsocketFormat; use crate::worker_metrics::WORKER_METRICS; -use core::mem; +use core::{fmt, mem}; use hashbrown::hash_map::OccupiedError; use hashbrown::{HashMap, HashSet}; use parking_lot::RwLock; @@ -20,13 +20,17 @@ use spacetimedb_client_api_messages::websocket::{ BsatnFormat, CompressableQueryUpdate, FormatSwitch, JsonFormat, QueryId, QueryUpdate, SingleQueryUpdate, }; use spacetimedb_data_structures::map::{Entry, IntMap}; +use spacetimedb_datastore::locking_tx_datastore::datastore::MutTxOffset; use spacetimedb_datastore::locking_tx_datastore::state_view::StateView; +use spacetimedb_durability::TxOffset; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_lib::{AlgebraicValue, ConnectionId, Identity, ProductValue}; use spacetimedb_primitives::{ColId, IndexId, TableId}; use spacetimedb_subscription::{JoinEdge, SubscriptionPlan, TableName}; use std::collections::BTreeMap; use std::fmt::Debug; +use std::future::{self, Future, IntoFuture}; +use std::pin::Pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use tokio::sync::mpsc; @@ -548,12 +552,79 @@ impl SenderWithGauge { } } +/// The offset of a transaction. +/// +/// Depending on the datastore implementation and the isolation level of the +/// transaction, the offset may be available immediately or in the future, when +/// the transaction commits. +pub enum TransactionOffset { + Now(TxOffset), + Later(Later), +} + +type Later = Pin> + Send + Sync + 'static>>; + +impl TransactionOffset { + /// Resolve the offset, waiting if it is not readily available. + /// + /// Returns `None` if the transaction aborted. + pub async fn get(self) -> Option { + match self { + Self::Now(x) => Some(x), + Self::Later(x) => x.await, + } + } +} + +impl fmt::Debug for TransactionOffset { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let mut debug = f.debug_tuple("TransactionOffset"); + match self { + Self::Now(x) => debug.field(&x), + Self::Later(_) => debug.field(&"<>"), + } + .finish() + } +} + +impl From for TransactionOffset { + fn from(value: TxOffset) -> Self { + Self::Now(value) + } +} + +impl From> for TransactionOffset { + fn from(ready: future::Ready) -> Self { + Self::from(ready.into_inner()) + } +} + +impl From for TransactionOffset { + fn from(fut: MutTxOffset) -> Self { + Self::Later(fut.into_future()) + } +} + /// Message sent by the [`SubscriptionManager`] to the [`SendWorker`]. +// +// NOTE: Variants that store an unresolved [`TransactionOffset`] do so as an +// artifact of the locking strategy, by which messages are sent while the +// transaction is still uncommitted. This is fine as the locking datastore +// will not abort transactions, i.e. all transactions are guaranteed to commit +// unless manually rolled back. +// In the future, we'll want to store the resolved [`TxOffset`], and not even +// submit results of aborted transactions to the queue. #[derive(Debug)] enum SendWorkerMessage { /// A transaction has completed and the [`SubscriptionManager`] has evaluated the incremental queries, /// so the [`SendWorker`] should broadcast them to clients. - Broadcast(ComputedQueries), + /// + /// The `tx_offset` of the transaction is used to control visibility of + /// the results if the client has requested confirmed reads. + Broadcast { + tx_offset: TransactionOffset, + queries: ComputedQueries, + }, /// A new client has been registered in the [`SubscriptionManager`], /// so the [`SendWorker`] should also record its existence. @@ -566,9 +637,14 @@ enum SendWorkerMessage { outbound_ref: Client, }, - // Send a message to a client. + /// Send a message to a client. + /// + /// In some cases, `message` may contain query results. In this case, + /// `tx_offset` is `Some`, and later used to control visibility of the + /// message if the the client has requested confirmed reads. SendMessage { recipient: Arc, + tx_offset: Option, message: SerializableMessage, }, @@ -1100,7 +1176,7 @@ impl SubscriptionManager { #[tracing::instrument(level = "trace", skip_all)] pub fn eval_updates_sequential( &self, - tx: &DeltaTx, + (tx, tx_offset): (&DeltaTx, impl Into), event: Arc, caller: Option>, ) -> ExecutionMetrics { @@ -1266,12 +1342,15 @@ impl SubscriptionManager { // then return ASAP in order to unlock the datastore and start running the next transaction. // See comment on the `send_worker_tx` field in [`SubscriptionManager`] for more motivation. self.send_worker_queue - .send(SendWorkerMessage::Broadcast(ComputedQueries { - updates, - errs, - event, - caller, - })) + .send(SendWorkerMessage::Broadcast { + tx_offset: tx_offset.into(), + queries: ComputedQueries { + updates, + errs, + event, + caller, + }, + }) .expect("send worker has panicked, or otherwise dropped its recv queue!"); drop(span); @@ -1370,10 +1449,12 @@ impl BroadcastQueue { pub fn send_client_message( &self, recipient: Arc, + tx_offset: Option>, message: impl Into, ) -> Result<(), BroadcastError> { self.0.send(SendWorkerMessage::SendMessage { recipient, + tx_offset: tx_offset.map(Into::into), message: message.into(), })?; Ok(()) @@ -1427,14 +1508,31 @@ impl SendWorker { self.clients .insert(client_id, SendWorkerClient { dropped, outbound_ref }); } - SendWorkerMessage::SendMessage { recipient, message } => { - let _ = recipient.send_message(message); - } + SendWorkerMessage::SendMessage { + recipient, + tx_offset, + message, + } => match tx_offset { + None => { + let _ = recipient.send_message(None, message); + } + Some(tx_offset) => { + if let Some(tx_offset) = tx_offset.get().await { + let _ = recipient.send_message(Some(tx_offset), message); + } + // Results of an aborted transaction should not appear + // on the channel, see commentary on [`SendWorkerMessage`]. + } + }, SendWorkerMessage::RemoveClient(client_id) => { self.clients.remove(&client_id); } - SendWorkerMessage::Broadcast(queries) => { - self.send_one_computed_queries(queries); + SendWorkerMessage::Broadcast { tx_offset, queries } => { + if let Some(tx_offset) = tx_offset.get().await { + self.send_one_computed_queries(tx_offset, queries); + } + // Results of an aborted transaction should not appear + // on the channel, see commentary on [`SendWorkerMessage`]. } } } @@ -1442,6 +1540,7 @@ impl SendWorker { fn send_one_computed_queries( &mut self, + tx_offset: TxOffset, ComputedQueries { updates, errs, @@ -1527,7 +1626,7 @@ impl SendWorker { event: Some(event.clone()), database_update, }; - send_to_client(&caller, message); + send_to_client(&caller, Some(tx_offset), message); } // Send all the other updates. @@ -1537,7 +1636,7 @@ impl SendWorker { // Conditionally send out a full update or a light one otherwise. let event = client.config.tx_update_full.then(|| event.clone()); let message = TransactionUpdateMessage { event, database_update }; - send_to_client(&client, message); + send_to_client(&client, Some(tx_offset), message); } // Put back the aggregation maps into the worker. @@ -1550,6 +1649,7 @@ impl SendWorker { client.dropped.store(true, Ordering::Release); send_to_client( &client.outbound_ref, + None, SubscriptionMessage { request_id: None, query_id: None, @@ -1565,8 +1665,12 @@ impl SendWorker { } } -fn send_to_client(client: &ClientConnectionSender, message: impl Into) { - if let Err(e) = client.send_message(message) { +fn send_to_client( + client: &ClientConnectionSender, + tx_offset: Option, + message: impl Into, +) { + if let Err(e) = client.send_message(tx_offset, message) { tracing::warn!(%client.id, "failed to send update message to client: {e}") } } @@ -1587,6 +1691,7 @@ mod tests { use crate::host::module_host::DatabaseTableUpdate; use crate::sql::ast::SchemaViewer; use crate::subscription::module_subscription_manager::ClientQueryId; + use crate::subscription::tx::DeltaTx; use crate::{ client::{ClientActorId, ClientConfig, ClientConnectionSender, ClientName}, db::relational_db::{tests_utils::TestDB, RelationalDB}, @@ -2443,7 +2548,9 @@ mod tests { }); db.with_read_only(Workload::Update, |tx| { - subscriptions.eval_updates_sequential(&(&*tx).into(), event, Some(Arc::new(client0))) + let tx_offset = tx.tx_offset(); + let delta_tx = DeltaTx::from(&*tx); + subscriptions.eval_updates_sequential((&delta_tx, tx_offset), event, Some(Arc::new(client0))) }); runtime.block_on(async move { diff --git a/crates/datastore/Cargo.toml b/crates/datastore/Cargo.toml index 0a46d472df6..b9d53c74617 100644 --- a/crates/datastore/Cargo.toml +++ b/crates/datastore/Cargo.toml @@ -35,6 +35,7 @@ smallvec.workspace = true strum.workspace = true thiserror.workspace = true thin-vec.workspace = true +tokio = { workspace = true, features = ["sync"] } [features] # Print a warning when doing an unindexed `iter_by_col_range` on a large table. @@ -50,4 +51,4 @@ spacetimedb-commitlog = { path = "../commitlog", features = ["test"] } # Also as dev-dependencies for use in _this_ crate's tests. proptest.workspace = true -pretty_assertions.workspace = true \ No newline at end of file +pretty_assertions.workspace = true diff --git a/crates/datastore/src/locking_tx_datastore/datastore.rs b/crates/datastore/src/locking_tx_datastore/datastore.rs index d9b2ac32fe2..3a69e228ce5 100644 --- a/crates/datastore/src/locking_tx_datastore/datastore.rs +++ b/crates/datastore/src/locking_tx_datastore/datastore.rs @@ -38,10 +38,17 @@ use spacetimedb_sats::{bsatn, buffer::BufReader, AlgebraicValue, ProductValue}; use spacetimedb_schema::schema::{ColumnSchema, IndexSchema, SequenceSchema, TableSchema}; use spacetimedb_snapshot::{ReconstructedSnapshot, SnapshotRepository}; use spacetimedb_table::{indexes::RowPointer, page_pool::PagePool, table::RowRef}; -use std::borrow::Cow; -use std::sync::Arc; -use std::time::{Duration, Instant}; +use std::{borrow::Cow, future}; +use std::{ + future::{Future, IntoFuture}, + sync::Arc, +}; +use std::{ + pin::Pin, + time::{Duration, Instant}, +}; use thiserror::Error; +use tokio::sync::watch; pub type Result = std::result::Result; @@ -326,6 +333,7 @@ impl DataRow for Locking { impl Tx for Locking { type Tx = TxId; + type TxOffset = future::Ready; /// Begins a read-only transaction under the given `workload`. /// @@ -357,9 +365,18 @@ impl Tx for Locking { /// Returns: /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - fn release_tx(&self, tx: Self::Tx) -> (TxMetrics, String) { + fn release_tx(&self, tx: Self::Tx) -> (TxOffset, TxMetrics, String) { tx.release() } + + /// Obtain a promise to retrieve the smallest transaction offset visible to + /// this transaction. + /// + /// Since the locking datastore does not exhibit read anomalies, this is + /// always [`future::Ready`] at the offset the transaction started. + fn tx_offset(&self, tx: &Self::Tx) -> Self::TxOffset { + tx.tx_offset() + } } impl TxDatastore for Locking { @@ -838,8 +855,59 @@ impl TxMetrics { } } +/// Implementation of [`MutTx::TxOffset`]. +// +// NOTE: This uses a `watch::Receiver` for two reasons: +// +// 1. Multiple consumers are supported (unlike e.g. `oneshot::Receiver`) +// 2. We can detect if the transaction was dropped without properly releasing +// it (unlike e.g. `SetOnce`). +// +// The second is a programmer error that our API doesn't yet prevent. +// We may want to switch to `SetOnce` in the future. +pub struct MutTxOffset(watch::Receiver); + +#[derive(Clone, Copy)] +pub(crate) enum TxStatus { + Started, + Aborted, + Committed(TxOffset), +} + +impl IntoFuture for MutTxOffset { + type Output = Option; + type IntoFuture = Pin + Send + Sync + 'static>>; + + fn into_future(self) -> Self::IntoFuture { + let mut recv = self.0; + let fut = async move { + loop { + let state = *recv.borrow_and_update(); + match state { + TxStatus::Started => { + if recv.changed().await.is_err() { + return None; + } + } + TxStatus::Aborted => return None, + TxStatus::Committed(offset) => return Some(offset), + } + } + }; + + Box::pin(fut) + } +} + +impl From> for MutTxOffset { + fn from(recv: watch::Receiver) -> Self { + Self(recv) + } +} + impl MutTx for Locking { type MutTx = MutTxId; + type TxOffset = MutTxOffset; /// Begins a mutable transaction under the given `isolation_level` and `workload`. /// @@ -858,11 +926,13 @@ impl MutTx for Locking { let committed_state_write_lock = self.committed_state.write_arc(); let sequence_state_lock = self.sequence_state.lock_arc(); let lock_wait_time = timer.elapsed(); + let (tx_status, _) = watch::channel(TxStatus::Started); MutTxId { committed_state_write_lock, sequence_state_lock, tx_state: TxState::default(), + tx_status, lock_wait_time, timer, ctx, @@ -874,9 +944,13 @@ impl MutTx for Locking { tx.rollback() } - fn commit_mut_tx(&self, tx: Self::MutTx) -> Result> { + fn commit_mut_tx(&self, tx: Self::MutTx) -> Result> { Ok(Some(tx.commit())) } + + fn tx_offset(&self, tx: &Self::MutTx) -> Self::TxOffset { + tx.tx_offset() + } } impl Locking { @@ -914,7 +988,7 @@ pub struct Replay { } impl Replay { - fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor) -> T) -> T { + fn using_visitor(&self, f: impl FnOnce(&mut ReplayVisitor<'_, F>) -> T) -> T { let mut committed_state = self.committed_state.write_arc(); let mut visitor = ReplayVisitor { database_identity: &self.database_identity, @@ -1425,7 +1499,8 @@ mod tests { } fn commit(datastore: &Locking, tx: MutTxId) -> ResultTest { - Ok(datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`").0) + let (_, tx_data, _, _) = datastore.commit_mut_tx(tx)?.expect("commit should produce `TxData`"); + Ok(tx_data) } #[rustfmt::skip] diff --git a/crates/datastore/src/locking_tx_datastore/mut_tx.rs b/crates/datastore/src/locking_tx_datastore/mut_tx.rs index e1c92e6d204..f20a76ac2ad 100644 --- a/crates/datastore/src/locking_tx_datastore/mut_tx.rs +++ b/crates/datastore/src/locking_tx_datastore/mut_tx.rs @@ -1,6 +1,6 @@ use super::{ committed_state::{CommitTableForInsertion, CommittedState}, - datastore::{Result, TxMetrics}, + datastore::{MutTxOffset, Result, TxMetrics, TxStatus}, delete_table::DeleteTable, sequence::{Sequence, SequencesState}, state_view::{IterByColEqMutTx, IterByColRangeMutTx, IterMutTx, ScanIterByColRangeMutTx, StateView}, @@ -25,6 +25,7 @@ use core::ops::RangeBounds; use core::{cell::RefCell, mem}; use core::{iter, ops::Bound}; use smallvec::SmallVec; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::{dml::MutDatastore, Datastore, DeltaStore, Row}; use spacetimedb_lib::{db::raw_def::v9::RawSql, metrics::ExecutionMetrics}; use spacetimedb_lib::{ @@ -57,6 +58,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; +use tokio::sync::watch; type DecodeResult = core::result::Result; @@ -70,6 +72,7 @@ pub struct MutTxId { pub(super) committed_state_write_lock: SharedWriteGuard, pub(super) sequence_state_lock: SharedMutexGuard, pub(super) lock_wait_time: Duration, + pub(super) tx_status: watch::Sender, // TODO(cloutiertyler): The below were made `pub` for the datastore split. We should // make these private again. pub timer: Instant, @@ -1170,8 +1173,10 @@ impl MutTxId { /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - pub fn commit(mut self) -> (TxData, TxMetrics, String) { + pub fn commit(mut self) -> (TxOffset, TxData, TxMetrics, String) { + let tx_offset = self.committed_state_write_lock.next_tx_offset; let tx_data = self.committed_state_write_lock.merge(self.tx_state, &self.ctx); + self.tx_status.send_replace(TxStatus::Committed(tx_offset)); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -1188,7 +1193,7 @@ impl MutTxId { ); let reducer = self.ctx.into_reducer_name(); - (tx_data, tx_metrics, reducer) + (tx_offset, tx_data, tx_metrics, reducer) } /// Commits this transaction, applying its changes to the committed state. @@ -1200,7 +1205,9 @@ impl MutTxId { /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - [`TxId`], a read-only transaction with a shared lock on the committed state. pub fn commit_downgrade(mut self, workload: Workload) -> (TxData, TxMetrics, TxId) { + let tx_offset = self.committed_state_write_lock.next_tx_offset; let tx_data = self.committed_state_write_lock.merge(self.tx_state, &self.ctx); + self.tx_status.send_replace(TxStatus::Committed(tx_offset)); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -1236,6 +1243,7 @@ impl MutTxId { pub fn rollback(mut self) -> (TxMetrics, String) { self.committed_state_write_lock .rollback(&mut self.sequence_state_lock, self.tx_state); + self.tx_status.send_replace(TxStatus::Aborted); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -1264,6 +1272,7 @@ impl MutTxId { pub fn rollback_downgrade(mut self, workload: Workload) -> (TxMetrics, TxId) { self.committed_state_write_lock .rollback(&mut self.sequence_state_lock, self.tx_state); + self.tx_status.send_replace(TxStatus::Aborted); // Compute and keep enough info that we can // record metrics after the transaction has ended @@ -1291,6 +1300,10 @@ impl MutTxId { (tx_metrics, tx) } + + pub fn tx_offset(&self) -> MutTxOffset { + self.tx_status.subscribe().into() + } } /// Either a row just inserted to a table or a row that already existed in some table. diff --git a/crates/datastore/src/locking_tx_datastore/tx.rs b/crates/datastore/src/locking_tx_datastore/tx.rs index 1fadd3e94be..662bdbf4b7e 100644 --- a/crates/datastore/src/locking_tx_datastore/tx.rs +++ b/crates/datastore/src/locking_tx_datastore/tx.rs @@ -6,6 +6,7 @@ use super::{ }; use crate::execution_context::ExecutionContext; use crate::locking_tx_datastore::state_view::IterTx; +use spacetimedb_durability::TxOffset; use spacetimedb_execution::Datastore; use spacetimedb_lib::metrics::ExecutionMetrics; use spacetimedb_primitives::{ColList, TableId}; @@ -13,8 +14,8 @@ use spacetimedb_sats::AlgebraicValue; use spacetimedb_schema::schema::TableSchema; use spacetimedb_table::blob_store::BlobStore; use spacetimedb_table::table::Table; -use std::num::NonZeroU64; use std::sync::Arc; +use std::{future, num::NonZeroU64}; use std::{ ops::RangeBounds, time::{Duration, Instant}, @@ -91,7 +92,8 @@ impl TxId { /// Returns: /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - pub(super) fn release(self) -> (TxMetrics, String) { + pub(super) fn release(self) -> (TxOffset, TxMetrics, String) { + let tx_offset = self.committed_state_shared_lock.next_tx_offset; let tx_metrics = TxMetrics::new( &self.ctx, self.timer, @@ -102,7 +104,7 @@ impl TxId { &self.committed_state_shared_lock, ); let reducer = self.ctx.into_reducer_name(); - (tx_metrics, reducer) + (tx_offset, tx_metrics, reducer) } /// The Number of Distinct Values (NDV) for a column or list of columns, @@ -120,4 +122,8 @@ impl TxId { let (_, index) = table.get_index_by_cols(cols)?; NonZeroU64::new(index.num_keys() as u64) } + + pub fn tx_offset(&self) -> future::Ready { + future::ready(self.committed_state_shared_lock.next_tx_offset) + } } diff --git a/crates/datastore/src/traits.rs b/crates/datastore/src/traits.rs index 9626f992bae..9624509e87c 100644 --- a/crates/datastore/src/traits.rs +++ b/crates/datastore/src/traits.rs @@ -1,6 +1,7 @@ use core::ops::Deref; use std::borrow::Cow; use std::collections::BTreeMap; +use std::future::IntoFuture; use std::{ops::RangeBounds, sync::Arc}; use super::locking_tx_datastore::datastore::TxMetrics; @@ -9,6 +10,7 @@ use super::Result; use crate::execution_context::{ReducerContext, Workload}; use crate::system_tables::ST_TABLE_ID; use spacetimedb_data_structures::map::IntMap; +use spacetimedb_durability::TxOffset; use spacetimedb_lib::{hash_bytes, Identity}; use spacetimedb_primitives::*; use spacetimedb_sats::hash::Hash; @@ -177,13 +179,13 @@ pub struct TxData { deletes: BTreeMap>, /// Map of all `TableId`s in both `inserts` and `deletes` to their /// corresponding table name. + // TODO: Store table name as ref counted string. tables: IntMap, /// Tx offset of the transaction which performed these operations. /// /// `None` implies that `inserts` and `deletes` are both empty, /// but `Some` does not necessarily imply that either is non-empty. tx_offset: Option, - // TODO: Store an `Arc` or equivalent instead. } impl TxData { @@ -320,6 +322,7 @@ pub trait DataRow: Send + Sync { pub trait Tx { type Tx; + type TxOffset: IntoFuture; /// Begins a read-only transaction under the given `workload`. fn begin_tx(&self, workload: Workload) -> Self::Tx; @@ -327,13 +330,27 @@ pub trait Tx { /// Release this read-only transaction. /// /// Returns: + /// - [`TxOffset`], the smallest transaction offset visible to this transaction. + /// See also the commentary on [`Self::tx_offset`]. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. - fn release_tx(&self, tx: Self::Tx) -> (TxMetrics, String); + fn release_tx(&self, tx: Self::Tx) -> (TxOffset, TxMetrics, String); + + /// Obtain a promise to retrieve the smallest transactions offset visible to + /// this transaction, without requiring ownership of the transaction. + /// + /// Implementations must uphold that the offset includes all transactions + /// that were visible to this transaction until it was released. + /// + /// This is relevant to transactions executing under an isolation level + /// weaker than [`IsolationLevel::Snapshot`], where transactions that commit + /// after this transaction started can be visible. + fn tx_offset(&self, tx: &Self::Tx) -> Self::TxOffset; } pub trait MutTx { type MutTx; + type TxOffset: IntoFuture>; /// Begins a mutable transaction under the given `isolation_level` and `workload`. fn begin_mut_tx(&self, isolation_level: IsolationLevel, workload: Workload) -> Self::MutTx; @@ -341,10 +358,12 @@ pub trait MutTx { /// Commits `tx`, applying its changes to the committed state. /// /// Returns: + /// - [`TxOffset`], the offset this transaction was committed at. + /// See also the commentary on [`Self::tx_offset`]. /// - [`TxData`], the set of inserts and deletes performed by this transaction. /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran during this transaction. - fn commit_mut_tx(&self, tx: Self::MutTx) -> Result>; + fn commit_mut_tx(&self, tx: Self::MutTx) -> Result>; /// Rolls back this transaction, discarding its changes. /// @@ -352,6 +371,20 @@ pub trait MutTx { /// - [`TxMetrics`], various measurements of the work performed by this transaction. /// - `String`, the name of the reducer which ran within this transaction. fn rollback_mut_tx(&self, tx: Self::MutTx) -> (TxMetrics, String); + + /// Obtain a promise to retrieve the transaction's offset, without requiring + /// ownership of the transaction. + /// + /// The returned future resolves to `Some` if and when the transaction + /// committed, and to `None` if it aborted. + /// + /// Implementations must uphold that the offset of a committed transaction + /// includes all transactions that were visible to this transaction. + /// + /// This is relevant to transactions executing under an isolation level + /// weaker than [`IsolationLevel::Snapshot`], where transactions that commit + /// after this transaction started can be visible. + fn tx_offset(&self, tx: &Self::MutTx) -> Self::TxOffset; } /// Standard metadata associated with a database. diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index 34816a05437..3fc9eb19e6d 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -14,6 +14,7 @@ log.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true +thiserror.workspace = true tokio.workspace = true tracing.workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 4a5c07feec5..1e2862aca92 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -3,10 +3,7 @@ use std::{ num::NonZeroU16, panic, sync::{ - atomic::{ - AtomicI64, AtomicU64, - Ordering::{Acquire, Relaxed, Release}, - }, + atomic::{AtomicU64, Ordering::Relaxed}, Arc, Weak, }, time::Duration, @@ -18,13 +15,13 @@ use log::{info, trace, warn}; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; use spacetimedb_paths::server::CommitLogDir; use tokio::{ - sync::mpsc, + sync::{mpsc, watch}, task::{spawn_blocking, AbortHandle, JoinHandle}, time::{interval, MissedTickBehavior}, }; use tracing::instrument; -use crate::{Durability, History, TxOffset}; +use crate::{Durability, DurableOffset, History, TxOffset}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] @@ -62,17 +59,7 @@ pub struct Local { clog: Arc>>, /// The durable transaction offset, as reported by the background /// [`FlushAndSyncTask`]. - /// - /// A negative number indicates that we haven't flushed yet, or that the - /// number overflowed. In either case, appending new transactions shall panic. - /// - /// The offset will be used by the datastore to squash durable transactions - /// into the committed state, thereby making them visible to durable-only - /// readers. - /// - /// We don't want to hang on to those transactions longer than needed, so - /// acquire / release or stronger should be used to prevent stale reads. - durable_offset: Arc, + durable_offset: watch::Receiver>, /// Backlog of transactions to be written to disk by the background /// [`PersisterTask`]. /// @@ -100,10 +87,7 @@ impl Local { let clog = Arc::new(Commitlog::open(root, opts.commitlog)?); let (queue, rx) = mpsc::unbounded_channel(); let queue_depth = Arc::new(AtomicU64::new(0)); - let offset = { - let offset = clog.max_committed_offset().map(|x| x as i64).unwrap_or(-1); - Arc::new(AtomicI64::new(offset)) - }; + let (durable_tx, durable_rx) = watch::channel(clog.max_committed_offset()); let persister_task = rt.spawn( PersisterTask { @@ -118,7 +102,7 @@ impl Local { FlushAndSyncTask { clog: Arc::downgrade(&clog), period: opts.sync_interval, - offset: offset.clone(), + offset: durable_tx, abort: persister_task.abort_handle(), } .run(), @@ -126,7 +110,7 @@ impl Local { Ok(Self { clog, - durable_offset: offset, + durable_offset: durable_rx, queue, queue_depth, persister_task, @@ -256,7 +240,7 @@ fn flush_error(e: io::Error) { struct FlushAndSyncTask { clog: Weak>>, period: Duration, - offset: Arc, + offset: watch::Sender>, /// Handle to abort the [`PersisterTask`] if fsync panics. abort: AbortHandle, } @@ -277,8 +261,7 @@ impl FlushAndSyncTask { }; // Skip if nothing changed. if let Some(committed) = clog.max_committed_offset() { - let durable = self.offset.load(Acquire); - if durable.is_positive() && committed == durable as _ { + if self.offset.borrow().is_some_and(|durable| durable == committed) { continue; } } @@ -297,8 +280,9 @@ impl FlushAndSyncTask { } Ok(Ok(Some(new_offset))) => { trace!("synced to offset {new_offset}"); - // NOTE: Overflow will make `durable_tx_offset` return `None` - self.offset.store(new_offset as i64, Release); + self.offset.send_modify(|val| { + val.replace(new_offset); + }); } // No data to flush. Ok(Ok(None)) => {} @@ -317,9 +301,8 @@ impl Durability for Local { self.queue_depth.fetch_add(1, Relaxed); } - fn durable_tx_offset(&self) -> Option { - let offset = self.durable_offset.load(Acquire); - (offset > -1).then_some(offset as u64) + fn durable_tx_offset(&self) -> DurableOffset { + self.durable_offset.clone().into() } } diff --git a/crates/durability/src/lib.rs b/crates/durability/src/lib.rs index 1ab5fac1bd4..6c7972d483a 100644 --- a/crates/durability/src/lib.rs +++ b/crates/durability/src/lib.rs @@ -1,5 +1,8 @@ use std::{iter, marker::PhantomData, sync::Arc}; +use thiserror::Error; +use tokio::sync::watch; + pub use spacetimedb_commitlog::{error, payload::Txdata, Decoder, Transaction}; mod imp; @@ -15,6 +18,69 @@ pub use imp::{local, Local}; /// of all offsets smaller than it. pub type TxOffset = u64; +#[derive(Debug, Error)] +#[error("the database's durability layer went away")] +pub struct DurabilityExited; + +/// Handle to the durable offset, obtained via [`Durability::durable_tx_offset`]. +/// +/// The handle can be used to read the current durable offset, or wait for a +/// provided offset to be reached. +/// +/// The handle is valid for as long as the [`Durability`] instance it was +/// obtained from is live, i.e. able to persist transactions. When the instance +/// shuts down or crashes, methods will return errors of type [`DurabilityExited`]. +pub struct DurableOffset { + // TODO: `watch::Receiver::wait_for` will hold a shared lock until all + // subscribers have seen the current value. Although it may skip entries, + // this may cause unacceptable contention. We may consider a custom watch + // channel that operates on an `AtomicU64` instead of an `RwLock`. + inner: watch::Receiver>, +} + +impl DurableOffset { + /// Get the current durable offset, or `None` if no transaction has been + /// made durable yet. + /// + /// Returns `Err` if the associated durablity is no longer live. + pub fn get(&self) -> Result, DurabilityExited> { + self.guard_closed().map(|()| self.inner.borrow().as_ref().copied()) + } + + /// Get the current durable offset, even if the associated durability is + /// no longer live. + pub fn last_seen(&self) -> Option { + self.inner.borrow().as_ref().copied() + } + + /// Wait for `offset` to become durable, i.e. + /// + /// self.get().unwrap().is_some_and(|durable| durable >= offset) + /// + /// Returns the actual durable offset at which above condition evaluated to + /// `true`, or an `Err` if the durability is no longer live. + /// + /// Returns immediately if the condition evaluates to `true` for the current + /// durable offset. + pub async fn wait_for(&mut self, offset: TxOffset) -> Result { + self.inner + .wait_for(|durable| durable.is_some_and(|val| val >= offset)) + .await + .map(|r| r.as_ref().copied().unwrap()) + .map_err(|_| DurabilityExited) + } + + fn guard_closed(&self) -> Result<(), DurabilityExited> { + self.inner.has_changed().map(drop).map_err(|_| DurabilityExited) + } +} + +impl From>> for DurableOffset { + fn from(inner: watch::Receiver>) -> Self { + Self { inner } + } +} + /// The durability API. /// /// NOTE: This is a preliminary definition, still under consideration. @@ -41,7 +107,7 @@ pub trait Durability: Send + Sync { /// A `None` return value indicates that the durable offset is not known, /// either because nothing has been persisted yet, or because the status /// cannot be retrieved. - fn durable_tx_offset(&self) -> Option; + fn durable_tx_offset(&self) -> DurableOffset; } /// Access to the durable history. diff --git a/crates/execution/src/lib.rs b/crates/execution/src/lib.rs index 8fb8e50e59b..ec083dbb4f5 100644 --- a/crates/execution/src/lib.rs +++ b/crates/execution/src/lib.rs @@ -255,7 +255,7 @@ impl<'a> Iterator for DeltaScanIter<'a> { /// Execute a query plan. /// The actual execution is driven by `f`. -pub fn execute_plan(plan: &ProjectPlan, tx: &T, f: impl Fn(PlanIter) -> R) -> Result +pub fn execute_plan(plan: &ProjectPlan, tx: &T, f: impl Fn(PlanIter<'_>) -> R) -> Result where T: Datastore + DeltaStore, { diff --git a/crates/sdk/src/db_connection.rs b/crates/sdk/src/db_connection.rs index 10bcb149320..7306f45160e 100644 --- a/crates/sdk/src/db_connection.rs +++ b/crates/sdk/src/db_connection.rs @@ -962,6 +962,24 @@ but you must call one of them, or else the connection will never progress. self } + /// Sets whether to use confirmed reads. + /// + /// When enabled, the server will send query results only after they are + /// confirmed to be durable. + /// + /// What durable means depends on the server configuration: a single node + /// server may consider a transaction durable once it is `fsync`'ed to disk, + /// a cluster after some number of replicas have acknowledged that they + /// have stored the transaction. + /// + /// Note that enabling confirmed reads will increase the latency between a + /// reducer call and the corresponding subscription update arriving at the + /// client. + pub fn with_confirmed_reads(mut self, confirmed: bool) -> Self { + self.params.confirmed = confirmed; + self + } + /// Register a callback to run when the connection is successfully initiated. /// /// The callback will receive three arguments: diff --git a/crates/sdk/src/websocket.rs b/crates/sdk/src/websocket.rs index 0595b3dfcc9..09fa7680633 100644 --- a/crates/sdk/src/websocket.rs +++ b/crates/sdk/src/websocket.rs @@ -107,6 +107,8 @@ fn parse_scheme(scheme: Option) -> Result { pub(crate) struct WsParams { pub compression: Compression, pub light: bool, + /// `true` to enable confirmed reads for the connection. + pub confirmed: bool, } fn make_uri(host: Uri, db_name: &str, connection_id: Option, params: WsParams) -> Result { @@ -152,6 +154,11 @@ fn make_uri(host: Uri, db_name: &str, connection_id: Option, param path.push_str("&light=true"); } + // Enable confirmed reads if requested. + if params.confirmed { + path.push_str("&confirmed=true"); + } + parts.path_and_query = Some(path.parse().map_err(|source: InvalidUri| UriError::InvalidUri { source: Arc::new(source), })?); diff --git a/smoketests/__init__.py b/smoketests/__init__.py index 290ca2bb1bc..caca9cce509 100644 --- a/smoketests/__init__.py +++ b/smoketests/__init__.py @@ -188,7 +188,7 @@ def log_records(self, n): logs = self.spacetime("logs", "--format=json", "-n", str(n), "--", self.database_identity) return list(map(json.loads, logs.splitlines())) - def publish_module(self, domain=None, *, clear=True, capture_stderr=True): + def publish_module(self, domain=None, *, clear=True, capture_stderr=True, num_replicas=None): print("publishing module", self.publish_module) publish_output = self.spacetime( "publish", @@ -199,10 +199,11 @@ def publish_module(self, domain=None, *, clear=True, capture_stderr=True): # because the server address is `node` which doesn't look like `localhost` or `127.0.0.1` # and so the publish step prompts for confirmation. "--yes", + *["--num-replicas", f"{num_replicas}"] if num_replicas is not None else [], capture_stderr=capture_stderr, ) self.resolved_identity = re.search(r"identity: ([0-9a-fA-F]+)", publish_output)[1] - self.database_identity = domain if domain is not None else self.resolved_identity + self.database_identity = self.resolved_identity @classmethod def reset_config(cls): diff --git a/smoketests/tests/replication.py b/smoketests/tests/replication.py index 168ebe49efe..1c2473207cb 100644 --- a/smoketests/tests/replication.py +++ b/smoketests/tests/replication.py @@ -1,9 +1,11 @@ -from .. import COMPOSE_FILE, Smoketest, requires_docker, spacetime -from ..docker import DockerManager - import time -from typing import Callable import unittest +from typing import Callable +import json + +from .. import COMPOSE_FILE, Smoketest, random_string, requires_docker, spacetime +from ..docker import DockerManager + def retry(func: Callable, max_retries: int = 3, retry_delay: int = 2): """Retry a function on failure with delay.""" @@ -124,6 +126,18 @@ def ensure_leader_health(self, id): # TODO: Replace with confirmed read. time.sleep(0.6) + def wait_counter_value(self, id, value, max_attempts=10, delay=1): + """Wait for the value for `id` in the counter table to reach `value`""" + + for _ in range(max_attempts): + rows = self.sql(f"select * from counter where id={id}") + if len(rows) >= 1 and int(rows[0]['value']) >= value: + return + else: + time.sleep(delay) + + raise ValueError(f"Counter {id} below {value}") + def fail_leader(self, action='kill'): """Force leader failure through either killing or network disconnect.""" @@ -250,6 +264,9 @@ def start(self, id: int, count: int): def collect_counter_rows(self): return int_vals(self.cluster.sql("select * from counter")) + def call_control(self, reducer, *args): + self.spacetime("call", "spacetime-control", reducer, *map(json.dumps, args)) + class LeaderElection(ReplicationTest): def test_leader_election_in_loop(self): @@ -403,3 +420,31 @@ def test_quorum_loss(self): with self.assertRaises(Exception): for i in range(1001): self.call("send_message", "terminal") + + +class EnableReplication(ReplicationTest): + AUTOPUBLISH = False + + def test_enable_replication(self): + """Tests enabling replication on an un-replicated database""" + + name = random_string() + + self.publish_module(name, num_replicas = 1) + leader = self.cluster.wait_for_leader_change(None) + + n1 = 1_000 + n2 = 100 + self.start(1, n1) + + self.cluster.wait_counter_value(1, n1, max_attempts=10, delay=10) + + self.call_control("enable_replication", {"Name": name}, 3) + + self.cluster.wait_for_leader_change(leader) + self.start(2, n2) + + self.cluster.wait_counter_value(2, n2) + + rows = self.collect_counter_rows() + self.assertEqual([{"id": 1, "value": n1}, {"id": 2, "value": n2}], rows)