Skip to content
Draft
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions crates/client-api/src/auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,15 @@ impl SpacetimeCreds {
pub struct SpacetimeAuth {
pub creds: SpacetimeCreds,
pub claims: SpacetimeIdentityClaims,
// The decoded JWT payload.
pub raw_payload: String,
/// The JWT payload as a json string (after base64 decoding).
pub jwt_payload: String,
}

impl From<SpacetimeAuth> for ConnectionAuthCtx {
fn from(auth: SpacetimeAuth) -> Self {
ConnectionAuthCtx {
claims: auth.claims,
jwt_payload: auth.raw_payload.clone(),
jwt_payload: auth.jwt_payload.clone(),
}
}
}
Expand Down Expand Up @@ -131,6 +131,9 @@ impl TokenClaims {
Identity::from_claims(&self.issuer, &self.subject)
}

/// Encode the claims into a JWT token and sign it with the provided signer.
/// This also adds claims for expiry and issued at time.
/// Returns an object representing the claims and the signed token.
pub fn encode_and_sign_with_expiry(
&self,
signer: &impl TokenSigner,
Expand All @@ -150,6 +153,9 @@ impl TokenClaims {
Ok((claims, token))
}

/// Encode the claims into a JWT token and sign it with the provided signer.
/// This also adds a claim for issued at time.
/// Returns an object representing the claims and the signed token.
pub fn encode_and_sign(&self, signer: &impl TokenSigner) -> Result<(SpacetimeIdentityClaims, String), JwtError> {
self.encode_and_sign_with_expiry(signer, None)
}
Expand Down Expand Up @@ -177,7 +183,7 @@ impl SpacetimeAuth {
Ok(Self {
creds,
claims,
raw_payload: payload,
jwt_payload: payload,
})
}

Expand Down Expand Up @@ -351,7 +357,7 @@ impl<S: NodeDelegate + Send + Sync> axum::extract::FromRequestParts<S> for Space
let auth = SpacetimeAuth {
creds,
claims,
raw_payload: payload,
jwt_payload: payload,
};
Ok(Self { auth: Some(auth) })
}
Expand Down
29 changes: 19 additions & 10 deletions crates/client-api/src/routes/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,18 +177,19 @@ where
None => format!("unknown ip with Identity {identity} and ConnectionId {connection_id}"),
};

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client = match ClientConnection::spawn(
log::debug!("websocket: New client connected from {client_log_string}");

let connected = match ClientConnection::call_client_connected_maybe_reject(
&mut module_rx,
client_id,
auth.into(),
client_config,
leader.replica_id,
module_rx,
actor,
auth.clone().into(),
)
.await
{
Ok(s) => s,
Ok(connected) => {
log::debug!("websocket: client_connected returned Ok for {client_log_string}");
connected
}
Err(e @ (ClientConnectedError::Rejected(_) | ClientConnectedError::OutOfEnergy)) => {
log::info!(
"websocket: Rejecting connection for {client_log_string} due to error from client_connected reducer: {e}"
Expand All @@ -206,8 +207,16 @@ where
);

let actor = |client, sendrx| ws_client_actor(ws_opts, client, ws, sendrx);
let client =
ClientConnection::spawn(client_id, client_config, leader.replica_id, module_rx, actor, connected).await;
let client = ClientConnection::spawn(
client_id,
auth.into(),
client_config,
leader.replica_id,
module_rx,
actor,
connected,
)
.await;

// Send the client their identity token message as the first message
// NOTE: We're adding this to the protocol because some client libraries are
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/client/client_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,9 +426,10 @@ impl ClientConnection {
pub async fn call_client_connected_maybe_reject(
module_rx: &mut watch::Receiver<ModuleHost>,
id: ClientActorId,
auth: ConnectionAuthCtx,
) -> Result<Connected, ClientConnectedError> {
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(id.identity, id.connection_id).await?;
module.call_identity_connected(auth, id.connection_id).await?;
Ok(Connected { _private: () })
}

Expand All @@ -455,7 +456,6 @@ impl ClientConnection {
// logically subscribed to the database, not any particular replica. We should handle failover for
// them and stuff. Not right now though.
let module = module_rx.borrow_and_update().clone();
module.call_identity_connected(auth.clone(), id.connection_id).await?;

let (sendtx, sendrx) = mpsc::channel::<SerializableMessage>(CLIENT_CHANNEL_CAPACITY);

Expand Down
1 change: 0 additions & 1 deletion crates/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use spacetimedb_vm::expr::Crud;

pub use spacetimedb_datastore::error::{DatastoreError, IndexError, SequenceError, TableError};

// #[derive(Error, Debug, PartialEq, Eq)]
#[derive(Error, Debug)]
pub enum ClientError {
#[error("Client not found: {0}")]
Expand Down
73 changes: 41 additions & 32 deletions crates/core/src/host/module_host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ use derive_more::From;
use indexmap::IndexSet;
use itertools::Itertools;
use prometheus::{Histogram, IntGauge};
use scopeguard::ScopeGuard;
use spacetimedb_auth::identity::ConnectionAuthCtx;
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate, WebsocketFormat};
use spacetimedb_client_api_messages::websocket::{ByteListLen, Compression, OneOffTable, QueryUpdate};
use spacetimedb_data_structures::error_stream::ErrorStream;
use spacetimedb_data_structures::map::{HashCollectionExt as _, IntMap};
use spacetimedb_datastore::execution_context::{ExecutionContext, ReducerContext, Workload, WorkloadType};
Expand Down Expand Up @@ -690,13 +691,38 @@ impl ModuleHost {
let me = self.clone();
self.call("call_identity_connected", move |inst| {
let reducer_lookup = me.info.module_def.lifecycle_reducer(Lifecycle::OnConnect);
let stdb = &me.module.replica_ctx().relational_db;
let workload = Workload::Reducer(ReducerContext {
name: "call_identity_connected".to_owned(),
caller_identity: caller_auth.claims.identity,
caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
});
let mut_tx = stdb.begin_mut_tx(IsolationLevel::Serializable, workload);
let mut mut_tx = scopeguard::guard(mut_tx, |mut_tx| {
// If we crash before committing, we need to ensure that the transaction is rolled back.
// This is necessary to avoid leaving the database in an inconsistent state.
log::debug!("call_identity_connected: rolling back transaction");
let (metrics, reducer_name) = mut_tx.rollback();
stdb.report_mut_tx_metrics(reducer_name, metrics, None);
});

mut_tx
.insert_st_client(
caller_auth.claims.identity,
caller_connection_id,
&caller_auth.jwt_payload,
)
.map_err(DBError::from)?;

if let Some((reducer_id, reducer_def)) = reducer_lookup {
// The module defined a lifecycle reducer to handle new connections.
// Call this reducer.
// If the call fails (as in, something unexpectedly goes wrong with WASM execution),
// abort the connection: we can't really recover.
let reducer_outcome = me.call_reducer_inner_with_inst(
Some(ScopeGuard::into_inner(mut_tx)),
caller_auth.claims.identity,
Some(caller_connection_id),
None,
Expand Down Expand Up @@ -728,38 +754,19 @@ impl ModuleHost {
}
} else {
// The module doesn't define a client_connected reducer.
// Commit a transaction to update `st_clients`
// and to ensure we always have those events paired in the commitlog.
// We need to commit the transaction to update st_clients and st_connection_credentials.
//
// This is necessary to be able to disconnect clients after a server crash.
let reducer_name = reducer_lookup
.as_ref()
.map(|(_, def)| &*def.name)
.unwrap_or("__identity_connected__");

let workload = Workload::Reducer(ReducerContext {
name: reducer_name.to_owned(),
caller_identity: caller_auth.claims.identity,
caller_connection_id,
timestamp: Timestamp::now(),
arg_bsatn: Bytes::new(),
});

let stdb = me.module.replica_ctx().relational_db.clone();
stdb.with_auto_commit(workload, |mut_tx| {
mut_tx
.insert_st_client(caller_auth.claims.identity, caller_connection_id)
.map_err(DBError::from)?;
mut_tx
.insert_st_client_credentials(caller_connection_id, &caller_auth.jwt_payload)
.map_err(DBError::from)
})
.inspect_err(|e| {
log::error!(
"`call_identity_connected`: fallback transaction to insert into `st_client` failed: {e:#?}"
)
})
.map_err(Into::into)

// TODO: Is this being broadcast? Does it need to be, or are st_client table subscriptions
// not allowed?
// I don't think it was being broadcast previously.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// I don't think it was being broadcast previously.
// I (jsdt 2025-08-06) don't think it was being broadcast previously.

This sounds like the kind of bug you were talking about the other day, yeah. st_client subscriptions are not disallowed, but also will not work in any of the SDKs due to missing bindings, and aren't an advertised feature. Maybe we should just make some or all of the system tables private and not have to think about it any more. Or maybe we should fix the bug and broadcast this TX.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created #3130 .

stdb.finish_tx(ScopeGuard::into_inner(mut_tx), Ok(()))
.map_err(|e: DBError| {
log::error!("`call_identity_connected`: finish transaction failed: {e:#?}");
ClientConnectedError::DBError(e)
})?;
Ok(())
}
})
.await
Expand Down Expand Up @@ -813,6 +820,7 @@ impl ModuleHost {
// If it succeeds, `WasmModuleInstance::call_reducer_with_tx` has already ensured
// that `st_client` is updated appropriately.
let result = me.call_reducer_inner_with_inst(
None,
caller_identity,
Some(caller_connection_id),
None,
Expand Down Expand Up @@ -917,6 +925,7 @@ impl ModuleHost {
}
fn call_reducer_inner_with_inst(
&self,
tx: Option<MutTxId>,
caller_identity: Identity,
caller_connection_id: Option<ConnectionId>,
client: Option<Arc<ClientConnectionSender>>,
Expand All @@ -932,7 +941,7 @@ impl ModuleHost {
let caller_connection_id = caller_connection_id.unwrap_or(ConnectionId::ZERO);

Ok(module_instance.call_reducer(
None,
tx,
CallReducerParams {
timestamp: Timestamp::now(),
caller_identity,
Expand Down
31 changes: 2 additions & 29 deletions crates/core/src/host/wasm_common/module_host_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,36 +363,9 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
.with_label_values(&database_identity, reducer_name);

let workload = Workload::Reducer(ReducerContext::from(op.clone()));
let mut tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
let tx = tx.unwrap_or_else(|| stdb.begin_mut_tx(IsolationLevel::Serializable, workload));
let _guard = metric_reducer_plus_query_duration.with_timer(tx.timer);

// For OnConnect, we insert the credentials before the reducer, so we can look them up
// inside that reducer.
// If the connection is rejected, this should get rolled back.
if let Some(Lifecycle::OnConnect) = reducer_def.lifecycle {
let client_clone = match client.clone() {
Some(client) => client,
None => {
log::error!("OnConnect reducer called without a client");
return ReducerCallResult {
outcome: ReducerOutcome::Failed("OnConnect reducer called without a client".into()),
energy_used: EnergyQuanta::ZERO,
execution_duration: Duration::ZERO,
};
}
};
if let Some(err) = tx
.insert_st_client_credentials(caller_connection_id, &client_clone.auth.jwt_payload)
.err()
{
return ReducerCallResult {
outcome: ReducerOutcome::Failed(format!("Error inserting client credentials: {err}")),
energy_used: EnergyQuanta::ZERO,
execution_duration: Duration::ZERO,
};
}
};

let mut tx_slot = self.instance.instance_env().tx.clone();

let reducer_span = tracing::trace_span!(
Expand Down Expand Up @@ -484,7 +457,7 @@ impl<T: WasmInstance> WasmModuleInstance<T> {
// and conversely removing from `st_clients` on disconnect.
Ok(Ok(())) => {
let res = match reducer_def.lifecycle {
Some(Lifecycle::OnConnect) => tx.insert_st_client(caller_identity, caller_connection_id),
Some(Lifecycle::OnConnect) => Ok(()),
Some(Lifecycle::OnDisconnect) => {
tx.delete_st_client(caller_identity, caller_connection_id, database_identity)
}
Expand Down
55 changes: 25 additions & 30 deletions crates/core/src/subscription/module_subscription_actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -876,37 +876,16 @@ impl ModuleSubscriptions {
return Ok(Err(WriteConflict));
};
*db_update = DatabaseUpdate::from_writes(&tx_data);
(read_tx, Some(tx_data), tx_metrics)
(read_tx, Arc::new(tx_data), tx_metrics)
}
EventStatus::Failed(_) | EventStatus::OutOfEnergy => {
let (tx_metrics, tx) = stdb.rollback_mut_tx_downgrade(tx, Workload::Update);
(tx, None, tx_metrics)
}
};

let tx_data = tx_data.map(Arc::new);

// When we're done with this method, release the tx and report metrics.
let mut read_tx = scopeguard::guard(read_tx, |tx| {
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
self.relational_db
.report_tx_metrics(reducer, tx_data.clone(), Some(tx_metrics_mut), Some(tx_metrics_read));
});
// Create the delta transaction we'll use to eval updates against.
let delta_read_tx = tx_data
.as_ref()
.as_ref()
.map(|tx_data| DeltaTx::new(&read_tx, tx_data, subscriptions.index_ids_for_subscriptions()))
.unwrap_or_else(|| DeltaTx::from(&*read_tx));
// If the transaction failed, we need to rollback the mutable tx.
// We don't need to do any subscription updates in this case, so we will exit early.

let event = Arc::new(event);
let mut update_metrics: ExecutionMetrics = ExecutionMetrics::default();

match &event.status {
EventStatus::Committed(_) => {
update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);
}
EventStatus::Failed(_) => {
let event = Arc::new(event);
let (tx_metrics, reducer) = stdb.rollback_mut_tx(tx);
self.relational_db
.report_tx_metrics(reducer, None, Some(tx_metrics), None);
if let Some(client) = caller {
let message = TransactionUpdateMessage {
event: Some(event.clone()),
Expand All @@ -917,9 +896,25 @@ impl ModuleSubscriptions {
} else {
log::trace!("Reducer failed but there is no client to send the failure to!")
}
return Ok(Ok((event, ExecutionMetrics::default())));
}
EventStatus::OutOfEnergy => {} // ?
}
};
let event = Arc::new(event);

// When we're done with this method, release the tx and report metrics.
let mut read_tx = scopeguard::guard(read_tx, |tx| {
let (tx_metrics_read, reducer) = self.relational_db.release_tx(tx);
self.relational_db.report_tx_metrics(
reducer,
Some(tx_data.clone()),
Some(tx_metrics_mut),
Some(tx_metrics_read),
);
});
// Create the delta transaction we'll use to eval updates against.
let delta_read_tx = DeltaTx::new(&read_tx, tx_data.as_ref(), subscriptions.index_ids_for_subscriptions());

let update_metrics = subscriptions.eval_updates_sequential(&delta_read_tx, event.clone(), caller);

// Merge in the subscription evaluation metrics.
read_tx.metrics.merge(update_metrics);
Expand Down
Loading
Loading