Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2961006
Update simplified sliding sync docstring
reivilibre Jun 17, 2025
875dbf7
spelling
reivilibre Jul 10, 2025
09f8633
Add models for Thread Subscriptions extension to Sliding Sync
reivilibre Jul 18, 2025
f1f5657
Add overload for `gather_optional_coroutines`/6
reivilibre Jul 18, 2025
748316c
Add thread subscriptions position to `StreamToken`
reivilibre Jul 18, 2025
4dcd12b
Add `subscribed` and `automatic` to `get_updated_thread_subscriptions…
reivilibre Jul 18, 2025
0ce5dce
Fix thread_subscriptions stream sequence
reivilibre Jul 21, 2025
0c310b9
Add comment to MultiWriterIdGenerator about cursed sequence semantics
reivilibre Aug 20, 2025
18881b1
Add overload for `parse_integer_from_args`
reivilibre Aug 20, 2025
4a34641
Implement sliding sync extension part of MSC4308
reivilibre Aug 20, 2025
e72d6cd
Add companion endpoint for backpagination of thread subscriptions
reivilibre Aug 20, 2025
f4cd180
Newsfile
reivilibre Aug 20, 2025
921cd53
Update tests/rest/client/sliding_sync/test_extension_thread_subscript…
reivilibre Sep 2, 2025
168b67b
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
1374895
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
0cf178a
Add notifier hooks for sliding sync
reivilibre Sep 2, 2025
924c1bf
Use copy_and_replace in get_current_token_for_pagination
reivilibre Sep 3, 2025
fa8e3b6
Simplify if
reivilibre Sep 3, 2025
80679a7
Comment on why we still check limit
reivilibre Sep 9, 2025
00cb14e
Merge branch 'develop' into rei/ssext_threadsubs
reivilibre Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,5 +590,5 @@ def read_config(
self.msc4293_enabled: bool = experimental.get("msc4293_enabled", False)

# MSC4306: Thread Subscriptions
# (and MSC4308: sliding sync extension for thread subscriptions)
# (and MSC4308: Thread Subscriptions extension to Sliding Sync)
self.msc4306_enabled: bool = experimental.get("msc4306_enabled", False)
2 changes: 1 addition & 1 deletion synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ async def current_sync_for_user(

Args:
sync_config: Sync configuration
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from. Token of the end of the
previous batch. May be `None` if this is the initial sync request.
"""
Expand Down
97 changes: 95 additions & 2 deletions synapse/handlers/sliding_sync/extensions.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
cast,
)

from typing_extensions import assert_never
from typing_extensions import TypeAlias, assert_never

from synapse.api.constants import AccountDataTypes, EduTypes
from synapse.handlers.receipts import ReceiptEventSource
Expand All @@ -40,6 +40,7 @@
SlidingSyncStreamToken,
StrCollection,
StreamToken,
ThreadSubscriptionsToken,
)
from synapse.types.handlers.sliding_sync import (
HaveSentRoomFlag,
Expand All @@ -54,6 +55,13 @@
gather_optional_coroutines,
)

_ThreadSubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadSubscription
) #
_ThreadUnsubscription: TypeAlias = (
SlidingSyncResult.Extensions.ThreadSubscriptionsExtension.ThreadUnsubscription
)

if TYPE_CHECKING:
from synapse.server import HomeServer

Expand All @@ -68,6 +76,7 @@ def __init__(self, hs: "HomeServer"):
self.event_sources = hs.get_event_sources()
self.device_handler = hs.get_device_handler()
self.push_rules_handler = hs.get_push_rules_handler()
self._enable_thread_subscriptions = hs.config.experimental.msc4306_enabled

@trace
async def get_extensions_response(
Expand All @@ -93,7 +102,7 @@ async def get_extensions_response(
actual_room_ids: The actual room IDs in the the Sliding Sync response.
actual_room_response_map: A map of room ID to room results in the the
Sliding Sync response.
to_token: The point in the stream to sync up to.
to_token: The latest point in the stream to sync up to.
from_token: The point in the stream to sync from.
"""

Expand Down Expand Up @@ -156,18 +165,32 @@ async def get_extensions_response(
from_token=from_token,
)

thread_subs_coro = None
if (
sync_config.extensions.thread_subscriptions is not None
and self._enable_thread_subscriptions
):
thread_subs_coro = self.get_thread_subscriptions_extension_response(
sync_config=sync_config,
thread_subscriptions_request=sync_config.extensions.thread_subscriptions,
to_token=to_token,
from_token=from_token,
)

(
to_device_response,
e2ee_response,
account_data_response,
receipts_response,
typing_response,
thread_subs_response,
) = await gather_optional_coroutines(
to_device_coro,
e2ee_coro,
account_data_coro,
receipts_coro,
typing_coro,
thread_subs_coro,
)

return SlidingSyncResult.Extensions(
Expand All @@ -176,6 +199,7 @@ async def get_extensions_response(
account_data=account_data_response,
receipts=receipts_response,
typing=typing_response,
thread_subscriptions=thread_subs_response,
)

def find_relevant_room_ids_for_extension(
Expand Down Expand Up @@ -877,3 +901,72 @@ async def get_typing_extension_response(
return SlidingSyncResult.Extensions.TypingExtension(
room_id_to_typing_map=room_id_to_typing_map,
)

async def get_thread_subscriptions_extension_response(
self,
sync_config: SlidingSyncConfig,
thread_subscriptions_request: SlidingSyncConfig.Extensions.ThreadSubscriptionsExtension,
to_token: StreamToken,
from_token: Optional[SlidingSyncStreamToken],
) -> Optional[SlidingSyncResult.Extensions.ThreadSubscriptionsExtension]:
"""Handle Thread Subscriptions extension (MSC4308)

Args:
sync_config: Sync configuration
thread_subscriptions_request: The thread_subscriptions extension from the request
to_token: The point in the stream to sync up to.
from_token: The point in the stream to sync from.

Returns:
the response (or None if empty)
"""
if not thread_subscriptions_request.enabled:
return None

limit = thread_subscriptions_request.limit

if from_token:
from_stream_id = from_token.stream_token.thread_subscriptions_key
else:
from_stream_id = StreamToken.START.thread_subscriptions_key

to_stream_id = to_token.thread_subscriptions_key

updates = await self.store.get_updated_thread_subscriptions_for_user(
user_id=sync_config.user.to_string(),
from_id=from_stream_id,
to_id=to_stream_id,
limit=limit,
)

if len(updates) == 0:
return None

subscribed_threads: Dict[str, Dict[str, _ThreadSubscription]] = {}
unsubscribed_threads: Dict[str, Dict[str, _ThreadUnsubscription]] = {}
for stream_id, room_id, thread_root_id, subscribed, automatic in updates:
if subscribed:
subscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadSubscription(
automatic=automatic,
bump_stamp=stream_id,
)
)
else:
unsubscribed_threads.setdefault(room_id, {})[thread_root_id] = (
_ThreadUnsubscription(bump_stamp=stream_id)
)

prev_batch = None
if len(updates) == limit:
# Tell the client about a potential gap where there may be more
# thread subscriptions for it to backpaginate.
# We subtract one because the 'later in the stream' bound is inclusive,
# and we already saw the element at index 0.
prev_batch = ThreadSubscriptionsToken(updates[0][0] - 1)

return SlidingSyncResult.Extensions.ThreadSubscriptionsExtension(
subscribed=subscribed_threads,
unsubscribed=unsubscribed_threads,
prev_batch=prev_batch,
)
46 changes: 38 additions & 8 deletions synapse/rest/client/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from collections import defaultdict
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple, Union

import attrs
import attr

from synapse.api.constants import AccountDataTypes, EduTypes, Membership, PresenceState
from synapse.api.errors import Codes, StoreError, SynapseError
Expand Down Expand Up @@ -1260,18 +1260,48 @@ async def encode_extensions(

if (
extensions.thread_subscriptions is not None
and extensions.thread_subscriptions.changed is not None
and extensions.thread_subscriptions
):
serialized_extensions["thread_subscriptions"] = {
"changes": [
attrs.asdict(change, filter=lambda _attr, v: v is not None)
for change in extensions.thread_subscriptions.changed
]
}
serialized_extensions["io.element.msc4308.thread_subscriptions"] = (
_serialise_thread_subscriptions(extensions.thread_subscriptions)
)

return serialized_extensions


def _serialise_thread_subscriptions(
thread_subscriptions: SlidingSyncResult.Extensions.ThreadSubscriptionsExtension,
) -> JsonDict:
out: JsonDict = {}

if thread_subscriptions.subscribed:
out["subscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.subscribed.items()
}

if thread_subscriptions.unsubscribed:
out["unsubscribed"] = {
room_id: {
thread_root_id: attr.asdict(
change, filter=lambda _attr, v: v is not None
)
for thread_root_id, change in room_threads.items()
}
for room_id, room_threads in thread_subscriptions.unsubscribed.items()
}

if thread_subscriptions.prev_batch:
out["prev_batch"] = thread_subscriptions.prev_batch.to_string()

return out


def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
SyncRestServlet(hs).register(http_server)

Expand Down
21 changes: 21 additions & 0 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1327,6 +1327,27 @@ async def to_string(self, store: "DataStore") -> str:
return f"{self.connection_position}/{stream_token_str}"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscriptionsToken:
"""
Token for a position in the thread subscriptions stream.

Format: `ts<stream_id>`
"""

stream_id: int

@staticmethod
def from_string(s: str) -> "ThreadSubscriptionsToken":
if not s.startswith("ts"):
raise ValueError("thread subscription token must start with `ts`")

return ThreadSubscriptionsToken(stream_id=int(s[2:]))

def to_string(self) -> str:
return f"ts{self.stream_id}"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class PersistedPosition:
"""Position of a newly persisted row with instance that persisted it."""
Expand Down
33 changes: 25 additions & 8 deletions synapse/types/handlers/sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
SlidingSyncStreamToken,
StrCollection,
StreamToken,
ThreadSubscriptionsToken,
UserID,
)
from synapse.types.rest.client import SlidingSyncBody
Expand Down Expand Up @@ -362,22 +363,38 @@ class ThreadSubscriptionsExtension:
"""The Thread Subscriptions extension (MSC4308)

Attributes:
changes: list of changes to thread subscriptions
subscribed: map (room_id -> thread_root_id -> info) of new or changed subscriptions
unsubscribed: map (room_id -> thread_root_id -> info) of new unsubscriptions
prev_batch: if present, there is a gap and the client can use this token to backpaginate
"""

@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadSubscriptionChange:
room_id: str
root_event_id: str
subscribed: bool

class ThreadSubscription:
# always present when `subscribed`
automatic: Optional[bool]

changed: Optional[List[ThreadSubscriptionChange]]
# the same as our stream_id; useful for clients to resolve
# race conditions locally
bump_stamp: int

@attr.s(slots=True, frozen=True, auto_attribs=True)
class ThreadUnsubscription:
# the same as our stream_id; useful for clients to resolve
# race conditions locally
bump_stamp: int

# room_id -> event_id (of thread root) -> the subscription change
subscribed: Optional[Mapping[str, Mapping[str, ThreadSubscription]]]
# room_id -> event_id (of thread root) -> the unsubscription
unsubscribed: Optional[Mapping[str, Mapping[str, ThreadUnsubscription]]]
prev_batch: Optional[ThreadSubscriptionsToken]

def __bool__(self) -> bool:
return bool(self.changed)
return (
bool(self.subscribed)
or bool(self.unsubscribed)
or bool(self.prev_batch)
)

to_device: Optional[ToDeviceExtension] = None
e2ee: Optional[E2eeExtension] = None
Expand Down
5 changes: 4 additions & 1 deletion synapse/types/rest/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from synapse._pydantic_compat import (
Extra,
Field,
StrictBool,
StrictInt,
StrictStr,
Expand Down Expand Up @@ -380,7 +381,9 @@ class ThreadSubscriptionsExtension(RequestBodyModel):
account_data: Optional[AccountDataExtension] = None
receipts: Optional[ReceiptsExtension] = None
typing: Optional[TypingExtension] = None
thread_subscriptions: Optional[ThreadSubscriptionsExtension] = None
thread_subscriptions: Optional[ThreadSubscriptionsExtension] = Field(
alias="io.element.msc4308.thread_subscriptions"
)

conn_id: Optional[StrictStr]

Expand Down
Loading