-
Notifications
You must be signed in to change notification settings - Fork 336
feat: implement msc4308 + automatic background catchup of thread subscriptions #5604
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #5604 +/- ##
==========================================
- Coverage 88.72% 88.71% -0.01%
==========================================
Files 345 346 +1
Lines 97880 98146 +266
Branches 97880 98146 +266
==========================================
+ Hits 86847 87075 +228
- Misses 6754 6775 +21
- Partials 4279 4296 +17 ☔ View full report in Codecov by Sentry. |
CodSpeed Performance ReportMerging #5604 will not alter performanceComparing Summary
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, a couple of changes needed here.
| /// These tokens are created when the client receives some thread subscriptions | ||
| /// from sync, but the sync indicates that there are more thread subscriptions | ||
| /// available on the server. In this case, it's expected that the client will | ||
| /// call the MSC4308 companion endpoint to catch up (back-paginate) on previous |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linky please on the MSC.
| // Take into account the new unsubscriptions. | ||
| for (room_id, room_map) in unsubscribed { | ||
| for (event_id, thread_sub) in room_map { | ||
| client | ||
| .state_store() | ||
| .upsert_thread_subscription( | ||
| &room_id, | ||
| &event_id, | ||
| StoredThreadSubscription { | ||
| status: ThreadSubscriptionStatus::Unsubscribed, | ||
| bump_stamp: Some(thread_sub.bump_stamp.into()), | ||
| }, | ||
| ) | ||
| .await?; | ||
| } | ||
| } | ||
|
|
||
| // Take into account the new subscriptions. | ||
| for (room_id, room_map) in subscribed { | ||
| for (event_id, thread_sub) in room_map { | ||
| client | ||
| .state_store() | ||
| .upsert_thread_subscription( | ||
| &room_id, | ||
| &event_id, | ||
| StoredThreadSubscription { | ||
| status: ThreadSubscriptionStatus::Subscribed { | ||
| automatic: thread_sub.automatic, | ||
| }, | ||
| bump_stamp: Some(thread_sub.bump_stamp.into()), | ||
| }, | ||
| ) | ||
| .await?; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not for now, but I think it'll make sense to turn this into a bulk API so we don't take a transaction per iteration.
Let's just put this into a issue or into a separate PR.
| async fn lock(&self) -> Option<GuardedStoreAccess> { | ||
| let client = self.client.get()?; | ||
| let mutex_guard = self.uniq_mutex.clone().lock_owned().await; | ||
| Some(GuardedStoreAccess { | ||
| _mutex: mutex_guard, | ||
| client, | ||
| is_outdated: self.is_outdated.clone(), | ||
| }) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This lock seems useful, but we're not using it in the part of the code that stores the subscriptions.
Probably not that important since there shouldn't thousands of subscriptions flying in and out, but might be prudent to utilize the lock in more places.
|
|
||
| let has_tokens = !tokens.is_empty(); | ||
|
|
||
| guard.save_catchup_tokens(tokens).await?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we return the value for has_tokens in save_catchup_tokens()?
As it is now, we're calculating this flag twice. Once in save_catchup_tokens() and once here.
If we ever change this we'd need to remember to modify two places.
| } | ||
|
|
||
| /// The background task listening to new catchup tokens, and using them to | ||
| /// catch up the thread subscriptions via the msc4308 companion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Linky to the MSC.
| || !self.inner.lists.read().await.is_empty() | ||
| } | ||
|
|
||
| /// Sned a single sliding sync request, and returns the response summary. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Send*
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well that was fast.
89e4062 to
8e6b0d5
Compare
This adds support for:
prev_batchtoken that indicates if there are more subscriptions to catch up with the companion endpoint.Room::load_or_fetch_thread_subscriptions(), use network if we haven't caught up yet, or the local store only, if we have.I've tested this in isolation, with a few sliding sync tests (using the
MatrixMockServer; this adds quite a bit of code to it, expanding the PR's size), as well as against the Synapse impl from element-hq/synapse#18695, and it works great in both cases.Fixes #5038.