Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
1456ad1
WIP: Watch durable offset
kim Aug 5, 2025
1264e2d
fixup! WIP: Watch durable offset
kim Aug 6, 2025
1fe9894
fixup! fixup! WIP: Watch durable offset
kim Aug 7, 2025
2c1ec37
Add confirmed parameter to Rust SDK
kim Aug 7, 2025
599e2a1
Add confirmed param to CLI subscribe command
kim Aug 7, 2025
6c64a3f
Dabble with when to obtain the tx offset
kim Aug 8, 2025
1a93771
Future-proof transaction offset API, documentation
kim Aug 11, 2025
521ffed
Make `DurableOffset` methods return an error when the durability is no
kim Aug 11, 2025
4d87798
Fixup the TxOffset future for mutable txs to return None if aborted +
kim Aug 12, 2025
ac92c6e
Simplify to oneshot channel where needed.
kim Aug 13, 2025
7aa3ba7
Merge remote-tracking branch 'origin/master' into kim/confirmed-reads
kim Aug 13, 2025
d871264
Lints
kim Sep 2, 2025
9dd521f
Merge remote-tracking branch 'origin/master' into kim/confirmed-reads
kim Sep 2, 2025
345e79b
Use module watch channel directly
kim Sep 2, 2025
42d8b39
Doc typo
kim Sep 2, 2025
8bf3c6e
Document / simplify tx guard
kim Sep 2, 2025
851da1f
Logging + docs when tx offset sender is dropped
kim Sep 2, 2025
1213b9c
Testing
kim Sep 2, 2025
9cda6ff
Fix CLI docs CI
kim Sep 2, 2025
9177ad8
Fix seemingly unrelated test failures
kim Sep 3, 2025
9b4af51
Sleep-less `assert_after_durable`
kim Sep 3, 2025
ea12bc6
Fix doctest
kim Sep 3, 2025
740f777
Enfore proper `ClientConnectionReceiver` is used everywhere
kim Sep 3, 2025
76b30ea
Add SQL insert to test
kim Sep 3, 2025
e1c13bc
Commentary
kim Sep 3, 2025
cad8846
Regen CLI reference again
kim Sep 3, 2025
9a70c03
Include whether the client has confirmed reads turned on when we kick it
kim Sep 3, 2025
568c7a9
Add support for confirmed reads to `sql` command
kim Sep 3, 2025
82b2eba
Add some rather lame smoketests
kim Sep 3, 2025
d671816
fixup! Add some rather lame smoketests
kim Sep 3, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::process::ExitCode;
use clap::{Arg, Command};
use spacetimedb_cli::*;
use spacetimedb_paths::cli::CliTomlPath;
use spacetimedb_paths::{RootDir, SpacetimePaths};
use spacetimedb_paths::RootDir;

// Note that the standalone server is invoked through standaline/src/main.rs, so you will
// also want to set the allocator there.
Expand All @@ -24,6 +24,8 @@ static GLOBAL: MiMalloc = MiMalloc;
#[cfg(not(feature = "markdown-docs"))]
#[tokio::main]
async fn main() -> anyhow::Result<ExitCode> {
use spacetimedb_paths::SpacetimePaths;

// Compute matches before loading the config, because `Config` has an observable `drop` method
// (which deletes a lockfile),
// and Clap calls `exit` on parse failure rather than panicking, so destructors never run.
Expand Down
15 changes: 13 additions & 2 deletions crates/cli/src/subcommands/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ pub fn cli() -> clap::Command {
.conflicts_with("query")
.help("Instead of using a query, run an interactive command prompt for `SQL` expressions"),
)
.arg(
Arg::new("confirmed")
.required(false)
.long("confirmed")
.action(ArgAction::SetTrue)
.help("Instruct the server to deliver only updates of confirmed transactions"),
)
.arg(common_args::anonymous())
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
.arg(common_args::yes())
Expand Down Expand Up @@ -178,11 +185,15 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
crate::repl::exec(con).await?;
} else {
let query = args.get_one::<String>("query").unwrap();
let confirmed = args.get_flag("confirmed");

let con = parse_req(config, args).await?;
let api = ClientApi::new(con);
let mut api = ClientApi::new(con).sql();
if confirmed {
api = api.query(&[("confirmed", "true")]);
}

run_sql(api.sql(), query, false).await?;
run_sql(api, query, false).await?;
}
Ok(())
}
Expand Down
31 changes: 30 additions & 1 deletion crates/cli/src/subcommands/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use anyhow::Context;
use clap::{value_parser, Arg, ArgAction, ArgMatches};
use futures::{Sink, SinkExt, TryStream, TryStreamExt};
use http::header;
use http::uri::Scheme;
use http::uri::{PathAndQuery, Scheme};
use serde_json::Value;
use spacetimedb_client_api_messages::websocket::{self as ws, JsonFormat};
use spacetimedb_data_structures::map::HashMap;
Expand Down Expand Up @@ -65,6 +65,13 @@ pub fn cli() -> clap::Command {
.action(ArgAction::SetTrue)
.help("Print the initial update for the queries."),
)
.arg(
Arg::new("confirmed")
.required(false)
.long("confirmed")
.action(ArgAction::SetTrue)
.help("Instruct the server to deliver only updates of confirmed transactions"),
)
.arg(common_args::anonymous())
.arg(common_args::yes())
.arg(common_args::server().help("The nickname, host name or URL of the server hosting the database"))
Expand Down Expand Up @@ -130,6 +137,7 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
let num = args.get_one::<u32>("num-updates").copied();
let timeout = args.get_one::<u32>("timeout").copied();
let print_initial_update = args.get_flag("print_initial_update");
let confirmed = args.get_flag("confirmed");

let conn = parse_req(config, args).await?;
let api = ClientApi::new(conn);
Expand All @@ -146,6 +154,9 @@ pub async fn exec(config: Config, args: &ArgMatches) -> Result<(), anyhow::Error
s
}
});
if confirmed {
append_query_param(&mut uri, ("confirmed", "true"));
}

// Create the websocket request.
let mut req = http::Uri::from_parts(uri)?.into_client_request()?;
Expand Down Expand Up @@ -334,3 +345,21 @@ fn format_output_json(msg: &ws::DatabaseUpdate<JsonFormat>, schema: &RawModuleDe

Ok(output)
}

fn append_query_param(uri: &mut http::uri::Parts, (k, v): (&str, &str)) {
let (mut path, query) = uri
.path_and_query
.as_ref()
.map(|pq| (pq.path().to_owned(), pq.query()))
.unwrap_or_default();
path.push('?');
if let Some(query) = query {
path.push_str(query);
path.push('&');
}
path.push_str(k);
path.push('=');
path.push_str(v);

uri.path_and_query = Some(PathAndQuery::from_maybe_shared(path).unwrap());
}
26 changes: 19 additions & 7 deletions crates/client-api/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,15 @@ impl Host {
&self,
auth: AuthCtx,
database: Database,
confirmed_read: bool,
body: String,
) -> axum::response::Result<Vec<SqlStmtResult<ProductValue>>> {
let module_host = self
.module()
.await
.map_err(|_| (StatusCode::NOT_FOUND, "module not found".to_string()))?;

let json = self
let (tx_offset, durable_offset, json) = self
.host_controller
.using_database(
database,
Expand Down Expand Up @@ -115,17 +116,28 @@ impl Host {
.map(|(col_name, col_type)| ProductTypeElement::new(col_type, Some(col_name)))
.collect();

Ok(vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}])
Ok((
result.tx_offset,
db.durable_tx_offset(),
vec![SqlStmtResult {
schema,
rows: result.rows,
total_duration_micros: total_duration.as_micros() as u64,
stats: SqlStmtStats::from_metrics(&result.metrics),
}],
))
},
)
.await
.map_err(log_and_500)??;

if confirmed_read {
if let Some(mut durable_offset) = durable_offset {
let tx_offset = tx_offset.await.map_err(|_| log_and_500("transaction aborted"))?;
durable_offset.wait_for(tx_offset).await.map_err(log_and_500)?;
}
}

Ok(json)
}

Expand Down
11 changes: 8 additions & 3 deletions crates/client-api/src/routes/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,12 +382,17 @@ pub struct SqlParams {
}

#[derive(Deserialize)]
pub struct SqlQueryParams {}
pub struct SqlQueryParams {
/// If `true`, return the query result only after its transaction offset
/// is confirmed to be durable.
#[serde(default)]
confirmed: bool,
}

pub async fn sql<S>(
State(worker_ctx): State<S>,
Path(SqlParams { name_or_identity }): Path<SqlParams>,
Query(SqlQueryParams {}): Query<SqlQueryParams>,
Query(SqlQueryParams { confirmed }): Query<SqlQueryParams>,
Extension(auth): Extension<SpacetimeAuth>,
body: String,
) -> axum::response::Result<impl IntoResponse>
Expand All @@ -410,7 +415,7 @@ where
.await
.map_err(log_and_500)?
.ok_or(StatusCode::NOT_FOUND)?;
let json = host.exec_sql(auth, database, body).await?;
let json = host.exec_sql(auth, database, confirmed, body).await?;

let total_duration = json.iter().fold(0, |acc, x| acc + x.total_duration_micros);

Expand Down
Loading
Loading