Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
2961006
Update simplified sliding sync docstring
reivilibre Jun 17, 2025
875dbf7
spelling
reivilibre Jul 10, 2025
09f8633
Add models for Thread Subscriptions extension to Sliding Sync
reivilibre Jul 18, 2025
f1f5657
Add overload for `gather_optional_coroutines`/6
reivilibre Jul 18, 2025
748316c
Add thread subscriptions position to `StreamToken`
reivilibre Jul 18, 2025
4dcd12b
Add `subscribed` and `automatic` to `get_updated_thread_subscriptions…
reivilibre Jul 18, 2025
0ce5dce
Fix thread_subscriptions stream sequence
reivilibre Jul 21, 2025
0c310b9
Add comment to MultiWriterIdGenerator about cursed sequence semantics
reivilibre Aug 20, 2025
18881b1
Add overload for `parse_integer_from_args`
reivilibre Aug 20, 2025
4a34641
Implement sliding sync extension part of MSC4308
reivilibre Aug 20, 2025
e72d6cd
Add companion endpoint for backpagination of thread subscriptions
reivilibre Aug 20, 2025
f4cd180
Newsfile
reivilibre Aug 20, 2025
921cd53
Update tests/rest/client/sliding_sync/test_extension_thread_subscript…
reivilibre Sep 2, 2025
168b67b
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
1374895
Update synapse/handlers/sliding_sync/extensions.py
reivilibre Sep 2, 2025
0cf178a
Add notifier hooks for sliding sync
reivilibre Sep 2, 2025
924c1bf
Use copy_and_replace in get_current_token_for_pagination
reivilibre Sep 3, 2025
fa8e3b6
Simplify if
reivilibre Sep 3, 2025
80679a7
Comment on why we still check limit
reivilibre Sep 9, 2025
00cb14e
Merge branch 'develop' into rei/ssext_threadsubs
reivilibre Sep 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 3 additions & 12 deletions synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.types import JsonDict, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList

if TYPE_CHECKING:
Expand Down Expand Up @@ -316,17 +316,8 @@ def _get_recent_references_for_event_txn(
StreamKeyType.ROOM, next_key
)
else:
next_token = StreamToken(
room_key=next_key,
presence_key=0,
typing_key=0,
receipt_key=MultiWriterStreamToken(stream=0),
account_data_key=0,
push_rules_key=0,
to_device_key=0,
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
next_token = StreamToken.START.copy_and_replace(
StreamKeyType.ROOM, next_key
)

return events[:limit], next_token
Expand Down
3 changes: 3 additions & 0 deletions synapse/storage/databases/main/thread_subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ def get_max_thread_subscriptions_stream_id(self) -> int:
"""
return self._thread_subscriptions_id_gen.get_current_token()

def get_thread_subscriptions_stream_id_generator(self) -> MultiWriterIdGenerator:
return self._thread_subscriptions_id_gen

async def get_updated_thread_subscriptions(
self, *, from_id: int, to_id: int, limit: int
) -> List[Tuple[int, str, str, str]]:
Expand Down
4 changes: 4 additions & 0 deletions synapse/streams/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def get_current_token(self) -> StreamToken:
un_partial_stated_rooms_key = self.store.get_un_partial_stated_rooms_token(
self._instance_name
)
thread_subscriptions_key = self.store.get_max_thread_subscriptions_stream_id()

token = StreamToken(
room_key=self.sources.room.get_current_key(),
Expand All @@ -97,6 +98,7 @@ def get_current_token(self) -> StreamToken:
# Groups key is unused.
groups_key=0,
un_partial_stated_rooms_key=un_partial_stated_rooms_key,
thread_subscriptions_key=thread_subscriptions_key,
)
return token

Expand All @@ -123,6 +125,7 @@ async def bound_future_token(self, token: StreamToken) -> StreamToken:
StreamKeyType.TO_DEVICE: self.store.get_to_device_id_generator(),
StreamKeyType.DEVICE_LIST: self.store.get_device_stream_id_generator(),
StreamKeyType.UN_PARTIAL_STATED_ROOMS: self.store.get_un_partial_stated_rooms_id_generator(),
StreamKeyType.THREAD_SUBSCRIPTIONS: self.store.get_thread_subscriptions_stream_id_generator(),
}

for _, key in StreamKeyType.__members__.items():
Expand Down Expand Up @@ -206,5 +209,6 @@ async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
thread_subscriptions_key=0,
)
return token
13 changes: 11 additions & 2 deletions synapse/types/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -996,14 +996,15 @@ class StreamKeyType(Enum):
TO_DEVICE = "to_device_key"
DEVICE_LIST = "device_list_key"
UN_PARTIAL_STATED_ROOMS = "un_partial_stated_rooms_key"
THREAD_SUBSCRIPTIONS = "thread_subscriptions_key"


@attr.s(slots=True, frozen=True, auto_attribs=True)
class StreamToken:
"""A collection of keys joined together by underscores in the following
order and which represent the position in their respective streams.

ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379`
ex. `s2633508_17_338_6732159_1082514_541479_274711_265584_1_379_4242`
1. `room_key`: `s2633508` which is a `RoomStreamToken`
- `RoomStreamToken`'s can also look like `t426-2633508` or `m56~2.58~3.59`
- See the docstring for `RoomStreamToken` for more details.
Expand All @@ -1016,6 +1017,7 @@ class StreamToken:
8. `device_list_key`: `265584`
9. `groups_key`: `1` (note that this key is now unused)
10. `un_partial_stated_rooms_key`: `379`
11. `thread_subscriptions_key`: 4242

You can see how many of these keys correspond to the various
fields in a "/sync" response:
Expand Down Expand Up @@ -1074,6 +1076,7 @@ class StreamToken:
# Note that the groups key is no longer used and may have bogus values.
groups_key: int
un_partial_stated_rooms_key: int
thread_subscriptions_key: int

_SEPARATOR = "_"
START: ClassVar["StreamToken"]
Expand Down Expand Up @@ -1101,6 +1104,7 @@ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
device_list_key,
groups_key,
un_partial_stated_rooms_key,
thread_subscriptions_key,
) = keys

return cls(
Expand All @@ -1116,6 +1120,7 @@ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
),
groups_key=int(groups_key),
un_partial_stated_rooms_key=int(un_partial_stated_rooms_key),
thread_subscriptions_key=int(thread_subscriptions_key),
)
except CancelledError:
raise
Expand All @@ -1138,6 +1143,7 @@ async def to_string(self, store: "DataStore") -> str:
# if additional tokens are added.
str(self.groups_key),
str(self.un_partial_stated_rooms_key),
str(self.thread_subscriptions_key),
]
)

Expand Down Expand Up @@ -1202,6 +1208,7 @@ def get_field(
StreamKeyType.TO_DEVICE,
StreamKeyType.TYPING,
StreamKeyType.UN_PARTIAL_STATED_ROOMS,
StreamKeyType.THREAD_SUBSCRIPTIONS,
],
) -> int: ...

Expand Down Expand Up @@ -1257,7 +1264,8 @@ def __str__(self) -> str:
f"typing: {self.typing_key}, receipt: {self.receipt_key}, "
f"account_data: {self.account_data_key}, push_rules: {self.push_rules_key}, "
f"to_device: {self.to_device_key}, device_list: {self.device_list_key}, "
f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key})"
f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key},"
f"thread_subscriptions: {self.thread_subscriptions_key})"
)


Expand All @@ -1272,6 +1280,7 @@ def __str__(self) -> str:
device_list_key=MultiWriterStreamToken(stream=0),
groups_key=0,
un_partial_stated_rooms_key=0,
thread_subscriptions_key=0,
)


Expand Down
4 changes: 2 additions & 2 deletions tests/rest/admin/test_room.py
Original file line number Diff line number Diff line change
Expand Up @@ -2244,7 +2244,7 @@ def test_timestamp_to_event(self) -> None:

def test_topo_token_is_accepted(self) -> None:
"""Test Topo Token is accepted."""
token = "t1-0_0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand All @@ -2258,7 +2258,7 @@ def test_topo_token_is_accepted(self) -> None:

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
"""Test that stream token is accepted for forward pagination."""
token = "s0_0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET",
"/_synapse/admin/v1/rooms/%s/messages?from=%s" % (self.room_id, token),
Expand Down
4 changes: 2 additions & 2 deletions tests/rest/client/test_rooms.py
Original file line number Diff line number Diff line change
Expand Up @@ -2245,7 +2245,7 @@ def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.room_id = self.helper.create_room_as(self.user_id)

def test_topo_token_is_accepted(self) -> None:
token = "t1-0_0_0_0_0_0_0_0_0_0"
token = "t1-0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand All @@ -2256,7 +2256,7 @@ def test_topo_token_is_accepted(self) -> None:
self.assertTrue("end" in channel.json_body)

def test_stream_token_is_accepted_for_fwd_pagianation(self) -> None:
token = "s0_0_0_0_0_0_0_0_0_0"
token = "s0_0_0_0_0_0_0_0_0_0_0"
channel = self.make_request(
"GET", "/rooms/%s/messages?access_token=x&from=%s" % (self.room_id, token)
)
Expand Down