Skip to content

Commit b445620

Browse files
authored
Fix module hotswapping for connected clients (#3159)
The `Clone` impl for `ClientConnection` would create an independent instance that could not observe module hotswapping. This would result in methods called on a replaced `ModuleHost` to fail, because that host exited already. Fix by reading the `ModuleHost` from the watch channel directly, instead of maintaining a redundant copy. Also fix `watch_module_host` to properly mark the current module host as seen. # Expected complexity level and risk 2 # Testing - [x] test suite passes - [x] ran @bfops repro script
1 parent f6f0909 commit b445620

File tree

4 files changed

+53
-28
lines changed

4 files changed

+53
-28
lines changed

crates/bench/src/spacetime_module.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ impl<L: ModuleLanguage> BenchDatabase for SpacetimeModule<L> {
6969
L::get_module().load_module(config, Some(&path)).await
7070
});
7171

72-
for table in module.client.module.info.module_def.tables() {
72+
let module_info = module.client.module().info;
73+
for table in module_info.module_def.tables() {
7374
log::trace!("SPACETIME_MODULE: LOADED TABLE: {table:?}");
7475
}
75-
for reducer in module.client.module.info.module_def.reducers() {
76+
for reducer in module_info.module_def.reducers() {
7677
log::trace!("SPACETIME_MODULE: LOADED REDUCER: {reducer:?}");
7778
}
7879
Ok(SpacetimeModule {
@@ -102,7 +103,7 @@ impl<L: ModuleLanguage> BenchDatabase for SpacetimeModule<L> {
102103
module.call_reducer_binary(&name, ProductValue::new(&[])).await?;
103104
*/
104105
// workaround for now
105-
module.client.module.clear_table(&table_id.pascal_case)?;
106+
module.client.module().clear_table(&table_id.pascal_case)?;
106107
Ok(())
107108
})
108109
}

crates/client-api/src/routes/subscribe.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ async fn ws_client_actor_inner(
359359
ws: WebSocketStream,
360360
sendrx: MeteredReceiver<SerializableMessage>,
361361
) {
362-
let database = client.module.info().database_identity;
362+
let database = client.module().info().database_identity;
363363
let client_id = client.id;
364364
let client_closed_metric = WORKER_METRICS.ws_clients_closed_connection.with_label_values(&database);
365365
let state = Arc::new(ActorState::new(database, client_id, config));

crates/core/src/client/client_connection.rs

Lines changed: 45 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,6 @@ impl ClientConnectionSender {
213213
pub struct ClientConnection {
214214
sender: Arc<ClientConnectionSender>,
215215
pub replica_id: u64,
216-
pub module: ModuleHost,
217216
module_rx: watch::Receiver<ModuleHost>,
218217
}
219218

@@ -480,7 +479,6 @@ impl ClientConnection {
480479
let this = Self {
481480
sender,
482481
replica_id,
483-
module,
484482
module_rx,
485483
};
486484

@@ -495,13 +493,11 @@ impl ClientConnection {
495493
id: ClientActorId,
496494
config: ClientConfig,
497495
replica_id: u64,
498-
mut module_rx: watch::Receiver<ModuleHost>,
496+
module_rx: watch::Receiver<ModuleHost>,
499497
) -> Self {
500-
let module = module_rx.borrow_and_update().clone();
501498
Self {
502499
sender: Arc::new(ClientConnectionSender::dummy(id, config)),
503500
replica_id,
504-
module,
505501
module_rx,
506502
}
507503
}
@@ -510,6 +506,20 @@ impl ClientConnection {
510506
self.sender.clone()
511507
}
512508

509+
/// Get the [`ModuleHost`] for this connection.
510+
///
511+
/// Note that modules can be hotswapped, in which case the returned handle
512+
/// becomes invalid (i.e. all calls on it will result in an error).
513+
/// Callers should thus drop the value as soon as they are done, and obtain
514+
/// a fresh one when needed.
515+
///
516+
/// While this [`ClientConnection`] is active, [`Self::watch_module_host`]
517+
/// should be polled in the background, and the connection closed if and
518+
/// when it returns an error.
519+
pub fn module(&self) -> ModuleHost {
520+
self.module_rx.borrow().clone()
521+
}
522+
513523
#[inline]
514524
pub fn handle_message(
515525
&self,
@@ -519,13 +529,26 @@ impl ClientConnection {
519529
message_handlers::handle(self, message.into(), timer)
520530
}
521531

532+
/// Waits until the [`ModuleHost`] of this [`ClientConnection`] instance
533+
/// exits, in which case `Err` containing [`NoSuchModule`] is returned.
534+
///
535+
/// Should be polled while this [`ClientConnection`] is active, so as to be
536+
/// able to shut down the connection gracefully if and when the module
537+
/// exits.
538+
///
539+
/// Note that this borrows `self` mutably, so may require cloning the
540+
/// [`ClientConnection`] instance. The module is shared, however, so all
541+
/// clones will observe a swapped module.
522542
pub async fn watch_module_host(&mut self) -> Result<(), NoSuchModule> {
523-
match self.module_rx.changed().await {
524-
Ok(()) => {
525-
self.module = self.module_rx.borrow_and_update().clone();
526-
Ok(())
543+
loop {
544+
// First check if the module exited between creating the client
545+
// connection and calling `watch_module_host`...
546+
if self.module_rx.changed().await.is_err() {
547+
return Err(NoSuchModule);
527548
}
528-
Err(_) => Err(NoSuchModule),
549+
// ...then mark the current module as seen, so the next iteration
550+
// of the loop waits until the module changes or exits.
551+
self.module_rx.mark_unchanged();
529552
}
530553
}
531554

@@ -544,7 +567,7 @@ impl ClientConnection {
544567
CallReducerFlags::NoSuccessNotify => None,
545568
};
546569

547-
self.module
570+
self.module()
548571
.call_reducer(
549572
self.id.identity,
550573
Some(self.id.connection_id),
@@ -563,9 +586,9 @@ impl ClientConnection {
563586
timer: Instant,
564587
) -> Result<Option<ExecutionMetrics>, DBError> {
565588
let me = self.clone();
566-
self.module
589+
self.module()
567590
.on_module_thread("subscribe_single", move || {
568-
me.module
591+
me.module()
569592
.subscriptions()
570593
.add_single_subscription(me.sender, subscription, timer, None)
571594
})
@@ -575,7 +598,7 @@ impl ClientConnection {
575598
pub async fn unsubscribe(&self, request: Unsubscribe, timer: Instant) -> Result<Option<ExecutionMetrics>, DBError> {
576599
let me = self.clone();
577600
asyncify(move || {
578-
me.module
601+
me.module()
579602
.subscriptions()
580603
.remove_single_subscription(me.sender, request, timer)
581604
})
@@ -588,9 +611,9 @@ impl ClientConnection {
588611
timer: Instant,
589612
) -> Result<Option<ExecutionMetrics>, DBError> {
590613
let me = self.clone();
591-
self.module
614+
self.module()
592615
.on_module_thread("subscribe_multi", move || {
593-
me.module
616+
me.module()
594617
.subscriptions()
595618
.add_multi_subscription(me.sender, request, timer, None)
596619
})
@@ -603,9 +626,9 @@ impl ClientConnection {
603626
timer: Instant,
604627
) -> Result<Option<ExecutionMetrics>, DBError> {
605628
let me = self.clone();
606-
self.module
629+
self.module()
607630
.on_module_thread("unsubscribe_multi", move || {
608-
me.module
631+
me.module()
609632
.subscriptions()
610633
.remove_multi_subscription(me.sender, request, timer)
611634
})
@@ -615,7 +638,7 @@ impl ClientConnection {
615638
pub async fn subscribe(&self, subscription: Subscribe, timer: Instant) -> Result<ExecutionMetrics, DBError> {
616639
let me = self.clone();
617640
asyncify(move || {
618-
me.module
641+
me.module()
619642
.subscriptions()
620643
.add_legacy_subscriber(me.sender, subscription, timer, None)
621644
})
@@ -628,7 +651,7 @@ impl ClientConnection {
628651
message_id: &[u8],
629652
timer: Instant,
630653
) -> Result<(), anyhow::Error> {
631-
self.module
654+
self.module()
632655
.one_off_query::<JsonFormat>(
633656
self.id.identity,
634657
query.to_owned(),
@@ -646,7 +669,7 @@ impl ClientConnection {
646669
message_id: &[u8],
647670
timer: Instant,
648671
) -> Result<(), anyhow::Error> {
649-
self.module
672+
self.module()
650673
.one_off_query::<BsatnFormat>(
651674
self.id.identity,
652675
query.to_owned(),
@@ -659,6 +682,6 @@ impl ClientConnection {
659682
}
660683

661684
pub async fn disconnect(self) {
662-
self.module.disconnect_client(self.id).await
685+
self.module().disconnect_client(self.id).await
663686
}
664687
}

crates/core/src/client/message_handlers.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,11 @@ pub async fn handle(client: &ClientConnection, message: DataMessage, timer: Inst
4646
.map_args(|b| ReducerArgs::Bsatn(message_buf.slice_ref(b))),
4747
};
4848

49-
let mod_info = client.module.info();
49+
let module = client.module();
50+
let mod_info = module.info();
5051
let mod_metrics = &mod_info.metrics;
5152
let database_identity = mod_info.database_identity;
52-
let db = &client.module.replica_ctx().relational_db;
53+
let db = &module.replica_ctx().relational_db;
5354
let record_metrics = |wl| {
5455
move |metrics| {
5556
if let Some(metrics) = metrics {

0 commit comments

Comments
 (0)