Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19038.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add more support for MSC4140, namely the ability to inspect sent, cancelled, or failed delayed events, aka "finalised" delayed events.
15 changes: 15 additions & 0 deletions synapse/config/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,21 @@ def read_config(
# MSC4133: Custom profile fields
self.msc4133_enabled: bool = experimental.get("msc4133_enabled", False)

# MSC4140: How many delayed events a user is allowed to have scheduled at a time.
self.msc4140_max_delayed_events_per_user = experimental.get(
"msc4140_max_delayed_events_per_user", 100
)

# MSC4140: How long to keep finalised delayed events in the database before deleting them.
self.msc4140_finalised_retention_period = self.parse_duration(
config.get("msc4140_finalised_retention_period", "7d")
)

# MSC4140: How many finalised delayed events to keep per user before deleting them.
self.msc4140_finalised_retention_limit = experimental.get(
"msc4140_finalised_retention_limit", 1000
)

# MSC4143: Matrix RTC Transport using Livekit Backend
self.msc4143_enabled: bool = experimental.get("msc4143_enabled", False)

Expand Down
124 changes: 95 additions & 29 deletions synapse/handlers/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
#

import logging
from typing import TYPE_CHECKING, List, Optional, Set, Tuple
from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple

from twisted.internet.interfaces import IDelayedCall

from synapse.api.constants import EventTypes
from synapse.api.errors import ShadowBanError, SynapseError
from synapse.api.errors import ShadowBanError, SynapseError, cs_error
from synapse.api.ratelimiting import Ratelimiter
from synapse.config.workers import MAIN_PROCESS_INSTANCE_NAME
from synapse.logging.context import make_deferred_yieldable
from synapse.logging.opentracing import set_tag
from synapse.metrics import SERVER_NAME_LABEL, event_processing_positions
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.delayed_events import (
ReplicationAddedDelayedEventRestServlet,
)
Expand All @@ -43,6 +44,7 @@
UserID,
create_requester,
)
from synapse.util.constants import MILLISECONDS_PER_SECOND, ONE_MINUTE_SECONDS
from synapse.util.events import generate_fake_event_id
from synapse.util.metrics import Measure
from synapse.util.sentinel import Sentinel
Expand Down Expand Up @@ -125,14 +127,20 @@ async def _schedule_db_events() -> None:
else:
self._repl_client = ReplicationAddedDelayedEventRestServlet.make_client(hs)

if hs.config.worker.run_background_tasks:
self._clock.looping_call(
self._prune_finalised_events,
5 * ONE_MINUTE_SECONDS * MILLISECONDS_PER_SECOND,
)

@property
def _is_master(self) -> bool:
return self._repl_client is None

def notify_new_event(self) -> None:
"""
Called when there may be more state event deltas to process,
which should cancel pending delayed events for the same state.
which should cancel scheduled delayed events for the same state.
"""
if self._event_processing:
return
Expand All @@ -156,8 +164,7 @@ async def _unsafe_process_new_event(self) -> None:
room_max_stream_ordering = self._store.get_room_max_stream_ordering()

# Check that there are actually any delayed events to process. If not, bail early.
delayed_events_count = await self._store.get_count_of_delayed_events()
if delayed_events_count == 0:
if not await self._store.has_scheduled_delayed_events():
# There are no delayed events to process. Update the
# `delayed_events_stream_pos` to the latest `events` stream pos and
# exit early.
Expand Down Expand Up @@ -228,7 +235,7 @@ async def _unsafe_process_new_event(self) -> None:

async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
"""
Process current state deltas to cancel other users' pending delayed events
Process current state deltas to cancel other users' scheduled delayed events
that target the same state.
"""
# Get the senders of each delta's state event (as sender information is
Expand Down Expand Up @@ -316,11 +323,20 @@ async def _handle_state_deltas(self, deltas: List[StateDelta]) -> None:
if sender.domain == self._config.server.server_name
else ""
),
finalised_ts=self._get_current_ts(),
)

if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)

@wrap_as_background_process("_prune_finalised_events")
async def _prune_finalised_events(self) -> None:
await self._store.prune_finalised_delayed_events(
self._get_current_ts(),
self.hs.config.experimental.msc4140_finalised_retention_period,
self.hs.config.experimental.msc4140_finalised_retention_limit,
)

async def add(
self,
requester: Requester,
Expand Down Expand Up @@ -380,6 +396,7 @@ async def add(
origin_server_ts=origin_server_ts,
content=content,
delay=delay,
limit=self.hs.config.experimental.msc4140_max_delayed_events_per_user,
)

if self._repl_client is not None:
Expand Down Expand Up @@ -420,6 +437,7 @@ async def cancel(self, requester: Requester, delay_id: str) -> None:
next_send_ts = await self._store.cancel_delayed_event(
delay_id=delay_id,
user_localpart=requester.user.localpart,
finalised_ts=self._get_current_ts(),
)

if self._next_send_ts_changed(next_send_ts):
Expand Down Expand Up @@ -477,18 +495,19 @@ async def send(self, requester: Requester, delay_id: str) -> None:
if self._next_send_ts_changed(next_send_ts):
self._schedule_next_at_or_none(next_send_ts)

await self._send_event(
DelayedEventDetails(
delay_id=DelayID(delay_id),
user_localpart=UserLocalpart(requester.user.localpart),
room_id=event.room_id,
type=event.type,
state_key=event.state_key,
origin_server_ts=event.origin_server_ts,
content=event.content,
device_id=event.device_id,
if event:
await self._send_event(
DelayedEventDetails(
delay_id=DelayID(delay_id),
user_localpart=UserLocalpart(requester.user.localpart),
room_id=event.room_id,
type=event.type,
state_key=event.state_key,
origin_server_ts=event.origin_server_ts,
content=event.content,
device_id=event.device_id,
)
)
)

async def _send_on_timeout(self) -> None:
self._next_delayed_event_call = None
Expand All @@ -513,17 +532,19 @@ async def _send_events(self, events: List[DelayedEventDetails]) -> None:
state_info = None
try:
# TODO: send in background if message event or non-conflicting state event
await self._send_event(event)
finalised_ts = await self._send_event(event)
if state_info is not None:
sent_state.add(state_info)
except Exception:
logger.exception("Failed to send delayed event")
finalised_ts = self._get_current_ts()

for room_id, event_type, state_key in sent_state:
await self._store.delete_processed_delayed_state_events(
await self._store.finalise_processed_delayed_state_events(
room_id=str(room_id),
event_type=event_type,
state_key=state_key,
finalised_ts=finalised_ts,
)

def _schedule_next_at_or_none(self, next_send_ts: Optional[Timestamp]) -> None:
Expand All @@ -547,21 +568,49 @@ def _schedule_next_at(self, next_send_ts: Timestamp) -> None:
else:
self._next_delayed_event_call.reset(delay_sec)

async def get_all_for_user(self, requester: Requester) -> List[JsonDict]:
"""Return all pending delayed events requested by the given user."""
async def get_delayed_events_for_user(
self,
requester: Requester,
delay_ids: Optional[List[str]],
get_scheduled: bool,
get_finalised: bool,
) -> Dict[str, List[JsonDict]]:
"""
Return all scheduled delayed events for the given user.

Args:
requester: The user whose delayed events to get.
delay_ids: The IDs of the delayed events to get, or None to get all of them.
get_scheduled: Whether to look up scheduled delayed events.
get_finalised: Whether to look up finalised delayed events.
"""
await self._delayed_event_mgmt_ratelimiter.ratelimit(
requester,
(requester.user.to_string(), requester.device_id),
)
return await self._store.get_all_delayed_events_for_user(
requester.user.localpart
)

# TODO: Support Pagination stream API
ret = {}
if get_scheduled:
ret["scheduled"] = await self._store.get_scheduled_delayed_events_for_user(
requester.user.localpart,
delay_ids,
)
if get_finalised:
ret["finalised"] = await self._store.get_finalised_delayed_events_for_user(
requester.user.localpart,
delay_ids,
self._get_current_ts(),
self.hs.config.experimental.msc4140_finalised_retention_period,
self.hs.config.experimental.msc4140_finalised_retention_limit,
)
return ret

async def _send_event(
self,
event: DelayedEventDetails,
txn_id: Optional[str] = None,
) -> None:
) -> Timestamp:
user_id = UserID(event.user_localpart, self._config.server.server_name)
user_id_str = user_id.to_string()
# Create a new requester from what data is currently available
Expand All @@ -571,6 +620,7 @@ async def _send_event(
device_id=event.device_id,
)

finalised_ts = None
try:
if event.state_key is not None and event.type == EventTypes.Member:
membership = event.content.get("membership")
Expand Down Expand Up @@ -606,18 +656,34 @@ async def _send_event(
txn_id=txn_id,
)
event_id = sent_event.event_id
if event.origin_server_ts is None:
finalised_ts = Timestamp(sent_event.origin_server_ts)
except ShadowBanError:
event_id = generate_fake_event_id()
send_error = None
except SynapseError as e:
send_error = e.error_dict(None)
except Exception:
send_error = cs_error("Internal server error")
else:
send_error = None
finally:
# TODO: If this is a temporary error, retry. Otherwise, consider notifying clients of the failure
if finalised_ts is None:
finalised_ts = self._get_current_ts()
try:
await self._store.delete_processed_delayed_event(
event.delay_id, event.user_localpart
await self._store.finalise_processed_delayed_event(
event.delay_id,
event.user_localpart,
send_error or event_id,
finalised_ts,
)
except Exception:
logger.exception("Failed to delete processed delayed event")
logger.exception("Failed to finalise processed delayed event")

set_tag("event_id", event_id)
if send_error is None:
set_tag("event_id", event_id)
return finalised_ts

def _get_current_ts(self) -> Timestamp:
return Timestamp(self._clock.time_msec())
Expand Down
35 changes: 31 additions & 4 deletions synapse/rest/client/delayed_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,12 @@

from synapse.api.errors import Codes, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import RestServlet, parse_json_object_from_request
from synapse.http.servlet import (
RestServlet,
parse_json_object_from_request,
parse_string_from_args,
parse_strings_from_args,
)
from synapse.http.site import SynapseRequest
from synapse.rest.client._base import client_patterns
from synapse.types import JsonDict
Expand All @@ -38,6 +43,11 @@ class _UpdateDelayedEventAction(Enum):
SEND = "send"


class _DelayedEventStatus(Enum):
SCHEDULED = "scheduled"
FINALISED = "finalised"


class UpdateDelayedEventServlet(RestServlet):
PATTERNS = client_patterns(
r"/org\.matrix\.msc4140/delayed_events/(?P<delay_id>[^/]+)$",
Expand Down Expand Up @@ -97,10 +107,27 @@ def __init__(self, hs: "HomeServer"):

async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
requester = await self.auth.get_user_by_req(request)
# TODO: Support Pagination stream API ("from" query parameter)
delayed_events = await self.delayed_events_handler.get_all_for_user(requester)

ret = {"delayed_events": delayed_events}
# twisted.web.server.Request.args is incorrectly defined as Optional[Any]
args: dict[bytes, list[bytes]] = request.args # type: ignore
statuses = parse_strings_from_args(
args,
"status",
allowed_values=tuple(s.value for s in _DelayedEventStatus),
)
delay_ids = parse_strings_from_args(args, "delay_id")
# TODO: Support Pagination stream API
_from_token = parse_string_from_args(args, "from")

ret = await self.delayed_events_handler.get_delayed_events_for_user(
requester,
delay_ids,
statuses is None or _DelayedEventStatus.SCHEDULED.value in statuses,
statuses is None or _DelayedEventStatus.FINALISED.value in statuses,
)
# TODO: This is here for backwards compatibility. Remove eventually
if statuses is None:
ret["delayed_events"] = ret[_DelayedEventStatus.SCHEDULED.value]
return 200, ret


Expand Down
Loading
Loading