Skip to content

Commit bc6024c

Browse files
nipunn1313dowski
authored andcommitted
convex-rs PR 10: Fix for a panic that could occur in a subscribe call (#41227)
If a query was subscribed to multiple times by the same client, an unsubscribe followed by a subsequent subscribe (if in the right/wrong order) would trigger bad internal state and a panic. This was due to a using a value that was incremented and decremented as part of the SubscriberId. That could lead to a SubscriberId for a new subscription matching a previous one, which is invalid state. Co-authored-by: Christian Wyglendowski <[email protected]> GitOrigin-RevId: 04150d72ffba18410d332086d32d18ce33a7c67b
1 parent 699e706 commit bc6024c

File tree

2 files changed

+44
-1
lines changed

2 files changed

+44
-1
lines changed

crates/convex/src/base_client/mod.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ struct LocalQuery {
6363
canonicalized_udf_path: CanonicalizedUdfPath,
6464
args: BTreeMap<String, Value>,
6565
num_subscribers: usize, // TODO: remove
66+
/// A unique index value for each subscription to this query.
67+
///
68+
/// Must be incremented each time a new subscription is added, and never
69+
/// decremented.
70+
subscription_index: usize,
6671
}
6772

6873
#[derive(Clone, Debug)]
@@ -115,9 +120,11 @@ impl LocalSyncState {
115120
let query_token = serialize_path_and_args(udf_path.clone(), args.clone());
116121

117122
if let Some(existing_entry) = self.query_set.get_mut(&query_token) {
123+
// This is a new subscription to an existing query.
118124
existing_entry.num_subscribers += 1;
125+
existing_entry.subscription_index += 1;
119126
let query_id = existing_entry.id;
120-
let subscription = SubscriberId(query_id, existing_entry.num_subscribers - 1);
127+
let subscription = SubscriberId(query_id, existing_entry.subscription_index);
121128
let prev = self.latest_results.subscribers.insert(subscription);
122129
assert!(prev.is_none(), "INTERNAL BUG: Subscriber ID already taken.");
123130
return (None, subscription);
@@ -147,6 +154,7 @@ impl LocalSyncState {
147154
canonicalized_udf_path,
148155
args,
149156
num_subscribers: 1,
157+
subscription_index: 0,
150158
};
151159

152160
self.query_set.insert(query_token.clone(), query);

crates/convex/src/client/mod.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,7 @@ pub mod tests {
484484
SyncProtocol,
485485
},
486486
value::Value,
487+
QuerySubscription,
487488
};
488489

489490
impl ConvexClient {
@@ -769,6 +770,40 @@ pub mod tests {
769770
Ok(())
770771
}
771772

773+
#[tokio::test]
774+
async fn test_client_subscribe_unsubscribe_subscribe() -> anyhow::Result<()> {
775+
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;
776+
let subscription1b: QuerySubscription;
777+
{
778+
// This subscription goes out of scope and unsubscribes at the end of this
779+
// block. The internal num_subscribers value gets decremented.
780+
let _ignored = client.subscribe("getValue1", btreemap! {}).await?;
781+
subscription1b = client.subscribe("getValue1", btreemap! {}).await?;
782+
}
783+
// In the buggy scenario, this subscription gets an ID via num_subscribers ID
784+
// that matches subscription1b. That triggers a panic.
785+
let subscription1c = client.subscribe("getValue1", btreemap! {}).await?;
786+
test_protocol.take_sent().await;
787+
let mut watch = client.watch_all();
788+
789+
test_protocol
790+
.fake_server_response(
791+
fake_transition(StateVersion::initial(), vec![(QueryId::new(0), 10.into())]).0,
792+
)
793+
.await?;
794+
795+
let results = watch.next().await.expect("Watch should have results");
796+
assert_eq!(
797+
results.get(&subscription1b),
798+
Some(&FunctionResult::Value(10.into()))
799+
);
800+
assert_eq!(
801+
results.get(&subscription1c),
802+
Some(&FunctionResult::Value(10.into()))
803+
);
804+
Ok(())
805+
}
806+
772807
#[tokio::test]
773808
async fn test_client_consistent_view_watch() -> anyhow::Result<()> {
774809
let (mut client, mut test_protocol) = ConvexClient::with_test_protocol().await?;

0 commit comments

Comments
 (0)