Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18756.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Update implementation of [MSC4306: Thread Subscriptions](https://github.com/matrix-org/matrix-doc/issues/4306) to include automatic subscription conflict prevention as introduced in later drafts.
6 changes: 6 additions & 0 deletions synapse/api/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@ class Codes(str, Enum):
# Part of MSC4155
INVITE_BLOCKED = "ORG.MATRIX.MSC4155.M_INVITE_BLOCKED"

# Part of MSC4306: Thread Subscriptions
MSC4306_CONFLICTING_UNSUBSCRIPTION = (
"IO.ELEMENT.MSC4306.M_CONFLICTING_UNSUBSCRIPTION"
)
MSC4306_NOT_IN_THREAD = "IO.ELEMENT.MSC4306.M_NOT_IN_THREAD"


class CodeMessageException(RuntimeError):
"""An exception with integer code, a message string attributes and optional headers.
Expand Down
67 changes: 55 additions & 12 deletions synapse/handlers/thread_subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import logging
from http import HTTPStatus
from typing import TYPE_CHECKING, Optional

from synapse.api.errors import AuthError, NotFoundError
from synapse.storage.databases.main.thread_subscriptions import ThreadSubscription
from synapse.types import UserID
from synapse.api.constants import RelationTypes
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.events import relation_from_event
from synapse.storage.databases.main.thread_subscriptions import (
AutomaticSubscriptionConflicted,
ThreadSubscription,
)
from synapse.types import EventOrderings, UserID

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -55,42 +61,79 @@ async def subscribe_user_to_thread(
room_id: str,
thread_root_event_id: str,
*,
automatic: bool,
automatic_event_id: Optional[str],
) -> Optional[int]:
"""Sets or updates a user's subscription settings for a specific thread root.

Args:
requester_user_id: The ID of the user whose settings are being updated.
thread_root_event_id: The event ID of the thread root.
automatic: whether the user was subscribed by an automatic decision by
their client.
automatic_event_id: if the user was subscribed by an automatic decision by
their client, the event ID that caused this.

Returns:
The stream ID for this update, if the update isn't no-opped.

Raises:
NotFoundError if the user cannot access the thread root event, or it isn't
known to this homeserver.
known to this homeserver. Ditto for the automatic cause event if supplied.

SynapseError(400, M_NOT_IN_THREAD): if client supplied an automatic cause event
but user cannot access the event.

SynapseError(409, M_SKIPPED): if client requested an automatic subscription
but it was skipped because the cause event is logically later than an unsubscription.
"""
# First check that the user can access the thread root event
# and that it exists
try:
event = await self.event_handler.get_event(
thread_root_event = await self.event_handler.get_event(
user_id, room_id, thread_root_event_id
)
if event is None:
if thread_root_event is None:
raise NotFoundError("No such thread root")
except AuthError:
logger.info("rejecting thread subscriptions change (thread not accessible)")
raise NotFoundError("No such thread root")

return await self.store.subscribe_user_to_thread(
if automatic_event_id:
autosub_cause_event = await self.event_handler.get_event(
user_id, room_id, automatic_event_id
)
if autosub_cause_event is None:
raise NotFoundError("Automatic subscription event not found")
relation = relation_from_event(autosub_cause_event)
if (
relation is None
or relation.rel_type != RelationTypes.THREAD
or relation.parent_id != thread_root_event_id
):
raise SynapseError(
HTTPStatus.BAD_REQUEST,
"Automatic subscription must use an event in the thread",
errcode=Codes.MSC4306_NOT_IN_THREAD,
)

automatic_event_orderings = EventOrderings.from_event(autosub_cause_event)
else:
automatic_event_orderings = None

outcome = await self.store.subscribe_user_to_thread(
user_id.to_string(),
event.room_id,
room_id,
thread_root_event_id,
automatic=automatic,
automatic_event_orderings=automatic_event_orderings,
)

if isinstance(outcome, AutomaticSubscriptionConflicted):
raise SynapseError(
HTTPStatus.CONFLICT,
"Automatic subscription obsoleted by an unsubscription request.",
errcode=Codes.MSC4306_CONFLICTING_UNSUBSCRIPTION,
)

return outcome

async def unsubscribe_user_from_thread(
self, user_id: UserID, room_id: str, thread_root_event_id: str
) -> Optional[int]:
Expand Down
4 changes: 2 additions & 2 deletions synapse/replication/tcp/streams/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,7 +739,7 @@ class ThreadSubscriptionsStreamRow:
NAME = "thread_subscriptions"
ROW_TYPE = ThreadSubscriptionsStreamRow

def __init__(self, hs: Any):
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastores().main
super().__init__(
hs.get_instance_name(),
Expand All @@ -751,7 +751,7 @@ async def _update_function(
self, instance_name: str, from_token: int, to_token: int, limit: int
) -> StreamUpdateResult:
updates = await self.store.get_updated_thread_subscriptions(
from_token, to_token, limit
from_id=from_token, to_id=to_token, limit=limit
)
rows = [
(
Expand Down
17 changes: 11 additions & 6 deletions synapse/rest/client/thread_subscriptions.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from http import HTTPStatus
from typing import TYPE_CHECKING, Tuple
from typing import TYPE_CHECKING, Optional, Tuple

from synapse._pydantic_compat import StrictBool
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import HttpServer
from synapse.http.servlet import (
Expand All @@ -12,6 +11,7 @@
from synapse.rest.client._base import client_patterns
from synapse.types import JsonDict, RoomID
from synapse.types.rest import RequestBodyModel
from synapse.util.pydantic_models import AnyEventId

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand All @@ -32,7 +32,12 @@ def __init__(self, hs: "HomeServer"):
self.handler = hs.get_thread_subscriptions_handler()

class PutBody(RequestBodyModel):
automatic: StrictBool
automatic: Optional[AnyEventId]
"""
If supplied, the event ID of an event giving rise to this automatic subscription.

If omitted, this subscription is a manual subscription.
"""

async def on_GET(
self, request: SynapseRequest, room_id: str, thread_root_id: str
Expand Down Expand Up @@ -63,15 +68,15 @@ async def on_PUT(
raise SynapseError(
HTTPStatus.BAD_REQUEST, "Invalid event ID", errcode=Codes.INVALID_PARAM
)
requester = await self.auth.get_user_by_req(request)

body = parse_and_validate_json_object_from_request(request, self.PutBody)

requester = await self.auth.get_user_by_req(request)

await self.handler.subscribe_user_to_thread(
requester.user,
room_id,
thread_root_id,
automatic=body.automatic,
automatic_event_id=body.automatic,
)

return HTTPStatus.OK, {}
Expand Down
Loading
Loading