Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 9d25a0a

Browse files
authored
Split presence out of master (#9820)
1 parent d924827 commit 9d25a0a

File tree

17 files changed

+245
-245
lines changed

17 files changed

+245
-245
lines changed

changelog.d/9820.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental support for handling presence on a worker.

scripts/synapse_port_db

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -634,8 +634,11 @@ class Porter(object):
634634
"device_inbox_sequence", ("device_inbox", "device_federation_outbox")
635635
)
636636
await self._setup_sequence(
637-
"account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data"))
638-
await self._setup_sequence("receipts_sequence", ("receipts_linearized", ))
637+
"account_data_sequence",
638+
("room_account_data", "room_tags_revisions", "account_data"),
639+
)
640+
await self._setup_sequence("receipts_sequence", ("receipts_linearized",))
641+
await self._setup_sequence("presence_stream_sequence", ("presence_stream",))
639642
await self._setup_auth_chain_sequence()
640643

641644
# Step 3. Get tables.

synapse/app/generic_worker.py

Lines changed: 4 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
from synapse.replication.slave.storage.filtering import SlavedFilteringStore
5656
from synapse.replication.slave.storage.groups import SlavedGroupServerStore
5757
from synapse.replication.slave.storage.keys import SlavedKeyStore
58-
from synapse.replication.slave.storage.presence import SlavedPresenceStore
5958
from synapse.replication.slave.storage.profile import SlavedProfileStore
6059
from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
6160
from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -64,7 +63,7 @@
6463
from synapse.replication.slave.storage.room import RoomStore
6564
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
6665
from synapse.rest.admin import register_servlets_for_media_repo
67-
from synapse.rest.client.v1 import events, login, room
66+
from synapse.rest.client.v1 import events, login, presence, room
6867
from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
6968
from synapse.rest.client.v1.profile import (
7069
ProfileAvatarURLRestServlet,
@@ -110,6 +109,7 @@
110109
from synapse.storage.databases.main.monthly_active_users import (
111110
MonthlyActiveUsersWorkerStore,
112111
)
112+
from synapse.storage.databases.main.presence import PresenceStore
113113
from synapse.storage.databases.main.search import SearchWorkerStore
114114
from synapse.storage.databases.main.stats import StatsStore
115115
from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -121,26 +121,6 @@
121121
logger = logging.getLogger("synapse.app.generic_worker")
122122

123123

124-
class PresenceStatusStubServlet(RestServlet):
125-
"""If presence is disabled this servlet can be used to stub out setting
126-
presence status.
127-
"""
128-
129-
PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
130-
131-
def __init__(self, hs):
132-
super().__init__()
133-
self.auth = hs.get_auth()
134-
135-
async def on_GET(self, request, user_id):
136-
await self.auth.get_user_by_req(request)
137-
return 200, {"presence": "offline"}
138-
139-
async def on_PUT(self, request, user_id):
140-
await self.auth.get_user_by_req(request)
141-
return 200, {}
142-
143-
144124
class KeyUploadServlet(RestServlet):
145125
"""An implementation of the `KeyUploadServlet` that responds to read only
146126
requests, but otherwise proxies through to the master instance.
@@ -241,6 +221,7 @@ class GenericWorkerSlavedStore(
241221
StatsStore,
242222
UIAuthWorkerStore,
243223
EndToEndRoomKeyStore,
224+
PresenceStore,
244225
SlavedDeviceInboxStore,
245226
SlavedDeviceStore,
246227
SlavedReceiptsStore,
@@ -259,7 +240,6 @@ class GenericWorkerSlavedStore(
259240
SlavedTransactionStore,
260241
SlavedProfileStore,
261242
SlavedClientIpStore,
262-
SlavedPresenceStore,
263243
SlavedFilteringStore,
264244
MonthlyActiveUsersWorkerStore,
265245
MediaRepositoryStore,
@@ -327,10 +307,7 @@ def _listen_http(self, listener_config: ListenerConfig):
327307

328308
user_directory.register_servlets(self, resource)
329309

330-
# If presence is disabled, use the stub servlet that does
331-
# not allow sending presence
332-
if not self.config.use_presence:
333-
PresenceStatusStubServlet(self).register(resource)
310+
presence.register_servlets(self, resource)
334311

335312
groups.register_servlets(self, resource)
336313

synapse/config/workers.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,14 @@ class WriterLocations:
6464
Attributes:
6565
events: The instances that write to the event and backfill streams.
6666
typing: The instance that writes to the typing stream.
67+
to_device: The instances that write to the to_device stream. Currently
68+
can only be a single instance.
69+
account_data: The instances that write to the account data streams. Currently
70+
can only be a single instance.
71+
receipts: The instances that write to the receipts stream. Currently
72+
can only be a single instance.
73+
presence: The instances that write to the presence stream. Currently
74+
can only be a single instance.
6775
"""
6876

6977
events = attr.ib(
@@ -85,6 +93,11 @@ class WriterLocations:
8593
type=List[str],
8694
converter=_instance_to_list_converter,
8795
)
96+
presence = attr.ib(
97+
default=["master"],
98+
type=List[str],
99+
converter=_instance_to_list_converter,
100+
)
88101

89102

90103
class WorkerConfig(Config):
@@ -188,7 +201,14 @@ def read_config(self, config, **kwargs):
188201

189202
# Check that the configured writers for events and typing also appears in
190203
# `instance_map`.
191-
for stream in ("events", "typing", "to_device", "account_data", "receipts"):
204+
for stream in (
205+
"events",
206+
"typing",
207+
"to_device",
208+
"account_data",
209+
"receipts",
210+
"presence",
211+
):
192212
instances = _instance_to_list_converter(getattr(self.writers, stream))
193213
for instance in instances:
194214
if instance != "master" and instance not in self.instance_map:
@@ -215,6 +235,11 @@ def read_config(self, config, **kwargs):
215235
if len(self.writers.events) == 0:
216236
raise ConfigError("Must specify at least one instance to handle `events`.")
217237

238+
if len(self.writers.presence) != 1:
239+
raise ConfigError(
240+
"Must only specify one instance to handle `presence` messages."
241+
)
242+
218243
self.events_shard_config = RoutableShardedWorkerHandlingConfig(
219244
self.writers.events
220245
)

synapse/handlers/presence.py

Lines changed: 36 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,8 @@
122122

123123

124124
class BasePresenceHandler(abc.ABC):
125-
"""Parts of the PresenceHandler that are shared between workers and master"""
125+
"""Parts of the PresenceHandler that are shared between workers and presence
126+
writer"""
126127

127128
def __init__(self, hs: "HomeServer"):
128129
self.clock = hs.get_clock()
@@ -309,17 +310,25 @@ def __init__(self, hs):
309310
super().__init__(hs)
310311
self.hs = hs
311312

313+
self._presence_writer_instance = hs.config.worker.writers.presence[0]
314+
312315
self._presence_enabled = hs.config.use_presence
313316

317+
# Route presence EDUs to the right worker
318+
hs.get_federation_registry().register_instances_for_edu(
319+
"m.presence",
320+
hs.config.worker.writers.presence,
321+
)
322+
314323
# The number of ongoing syncs on this process, by user id.
315324
# Empty if _presence_enabled is false.
316325
self._user_to_num_current_syncs = {} # type: Dict[str, int]
317326

318327
self.notifier = hs.get_notifier()
319328
self.instance_id = hs.get_instance_id()
320329

321-
# user_id -> last_sync_ms. Lists the users that have stopped syncing
322-
# but we haven't notified the master of that yet
330+
# user_id -> last_sync_ms. Lists the users that have stopped syncing but
331+
# we haven't notified the presence writer of that yet
323332
self.users_going_offline = {}
324333

325334
self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
@@ -352,31 +361,32 @@ def send_user_sync(self, user_id, is_syncing, last_sync_ms):
352361
)
353362

354363
def mark_as_coming_online(self, user_id):
355-
"""A user has started syncing. Send a UserSync to the master, unless they
356-
had recently stopped syncing.
364+
"""A user has started syncing. Send a UserSync to the presence writer,
365+
unless they had recently stopped syncing.
357366
358367
Args:
359368
user_id (str)
360369
"""
361370
going_offline = self.users_going_offline.pop(user_id, None)
362371
if not going_offline:
363-
# Safe to skip because we haven't yet told the master they were offline
372+
# Safe to skip because we haven't yet told the presence writer they
373+
# were offline
364374
self.send_user_sync(user_id, True, self.clock.time_msec())
365375

366376
def mark_as_going_offline(self, user_id):
367-
"""A user has stopped syncing. We wait before notifying the master as
368-
its likely they'll come back soon. This allows us to avoid sending
369-
a stopped syncing immediately followed by a started syncing notification
370-
to the master
377+
"""A user has stopped syncing. We wait before notifying the presence
378+
writer as its likely they'll come back soon. This allows us to avoid
379+
sending a stopped syncing immediately followed by a started syncing
380+
notification to the presence writer
371381
372382
Args:
373383
user_id (str)
374384
"""
375385
self.users_going_offline[user_id] = self.clock.time_msec()
376386

377387
def send_stop_syncing(self):
378-
"""Check if there are any users who have stopped syncing a while ago
379-
and haven't come back yet. If there are poke the master about them.
388+
"""Check if there are any users who have stopped syncing a while ago and
389+
haven't come back yet. If there are poke the presence writer about them.
380390
"""
381391
now = self.clock.time_msec()
382392
for user_id, last_sync_ms in list(self.users_going_offline.items()):
@@ -492,9 +502,12 @@ async def set_state(self, target_user, state, ignore_status_msg=False):
492502
if not self.hs.config.use_presence:
493503
return
494504

495-
# Proxy request to master
505+
# Proxy request to instance that writes presence
496506
await self._set_state_client(
497-
user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
507+
instance_name=self._presence_writer_instance,
508+
user_id=user_id,
509+
state=state,
510+
ignore_status_msg=ignore_status_msg,
498511
)
499512

500513
async def bump_presence_active_time(self, user):
@@ -505,9 +518,11 @@ async def bump_presence_active_time(self, user):
505518
if not self.hs.config.use_presence:
506519
return
507520

508-
# Proxy request to master
521+
# Proxy request to instance that writes presence
509522
user_id = user.to_string()
510-
await self._bump_active_client(user_id=user_id)
523+
await self._bump_active_client(
524+
instance_name=self._presence_writer_instance, user_id=user_id
525+
)
511526

512527

513528
class PresenceHandler(BasePresenceHandler):
@@ -1909,7 +1924,7 @@ def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
19091924
self._queue_presence_updates = True
19101925

19111926
# Whether this instance is a presence writer.
1912-
self._presence_writer = hs.config.worker.worker_app is None
1927+
self._presence_writer = self._instance_name in hs.config.worker.writers.presence
19131928

19141929
# The FederationSender instance, if this process sends federation traffic directly.
19151930
self._federation = None
@@ -1957,7 +1972,7 @@ def send_presence_to_destinations(
19571972
Will forward to the local federation sender (if there is one) and queue
19581973
to send over replication (if there are other federation sender instances.).
19591974
1960-
Must only be called on the master process.
1975+
Must only be called on the presence writer process.
19611976
"""
19621977

19631978
# This should only be called on a presence writer.
@@ -2003,10 +2018,11 @@ async def get_replication_rows(
20032018
We return rows in the form of `(destination, user_id)` to keep the size
20042019
of each row bounded (rather than returning the sets in a row).
20052020
2006-
On workers this will query the master process via HTTP replication.
2021+
On workers this will query the presence writer process via HTTP replication.
20072022
"""
20082023
if instance_name != self._instance_name:
2009-
# If not local we query over http replication from the master
2024+
# If not local we query over http replication from the presence
2025+
# writer
20102026
result = await self._repl_client(
20112027
instance_name=instance_name,
20122028
stream_name=PresenceFederationStream.NAME,

synapse/replication/http/_base.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,10 @@ async def _handle_request(self, request, **kwargs):
158158
def make_client(cls, hs):
159159
"""Create a client that makes requests.
160160
161-
Returns a callable that accepts the same parameters as `_serialize_payload`.
161+
Returns a callable that accepts the same parameters as
162+
`_serialize_payload`, and also accepts an optional `instance_name`
163+
parameter to specify which instance to hit (the instance must be in
164+
the `instance_map` config).
162165
"""
163166
clock = hs.get_clock()
164167
client = hs.get_simple_http_client()

synapse/replication/slave/storage/presence.py

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)