Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18873.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement experimental [MSC3871](https://github.com/matrix-org/matrix-spec-proposals/pull/3871) to indicate `gaps` in the `/messages` timeline.
24 changes: 23 additions & 1 deletion synapse/handlers/pagination.py
Original file line number Diff line number Diff line change
Expand Up @@ -414,12 +414,14 @@ async def purge_room(
@trace
async def get_messages(
self,
*,
requester: Requester,
room_id: str,
pagin_config: PaginationConfig,
as_client_event: bool = True,
event_filter: Optional[Filter] = None,
use_admin_priviledge: bool = False,
backfill: bool = True,
) -> JsonDict:
"""Get messages in a room.

Expand All @@ -432,6 +434,8 @@ async def get_messages(
use_admin_priviledge: if `True`, return all events, regardless
of whether `user` has access to them. To be used **ONLY**
from the admin API.
backfill: If false, we skip backfill altogether. When true, we backfill as a
best effort.

Returns:
Pagination API results
Expand Down Expand Up @@ -522,7 +526,7 @@ async def get_messages(
event_filter=event_filter,
)

if pagin_config.direction == Direction.BACKWARDS:
if backfill and pagin_config.direction == Direction.BACKWARDS:
# We use a `Set` because there can be multiple events at a given depth
# and we only care about looking at the unique continum of depths to
# find gaps.
Expand Down Expand Up @@ -622,6 +626,7 @@ async def get_messages(
if not events:
return {
"chunk": [],
"gaps": [],
"start": await from_token.to_string(self.store),
}

Expand All @@ -641,6 +646,7 @@ async def get_messages(
if not events:
return {
"chunk": [],
"gaps": [],
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}
Expand All @@ -666,6 +672,10 @@ async def get_messages(
events, user_id
)

gaps = await self.store.get_events_next_to_gaps(
events=events, direction=pagin_config.direction
)

time_now = self.clock.time_msec()

serialize_options = SerializeEventConfig(
Expand All @@ -681,6 +691,18 @@ async def get_messages(
bundle_aggregations=aggregations,
)
),
"gaps": [
{
"prev_pagination_token": await from_token.copy_and_replace(
StreamKeyType.ROOM, gap.prev_token
).to_string(self.store),
"event_id": gap.event_id,
"next_pagination_token": await from_token.copy_and_replace(
StreamKeyType.ROOM, gap.next_token
).to_string(self.store),
}
for gap in gaps
],
"start": await from_token.to_string(self.store),
"end": await next_token.to_string(self.store),
}
Expand Down
14 changes: 14 additions & 0 deletions synapse/rest/client/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,17 @@ def __init__(self, hs: "HomeServer"):
async def on_GET(
self, request: SynapseRequest, room_id: str
) -> Tuple[int, JsonDict]:
"""
Query paremeters:
dir
from
to
limit
filter
backfill: If false, we skip backfill altogether. When true, we backfill as a
best effort.
"""

processing_start_time = self.clock.time_msec()
# Fire off and hope that we get a result by the end.
#
Expand Down Expand Up @@ -840,12 +851,15 @@ async def on_GET(
):
as_client_event = False

backfill = parse_boolean(request, "backfill", default=True)

msgs = await self.pagination_handler.get_messages(
room_id=room_id,
requester=requester,
pagin_config=pagination_config,
as_client_event=as_client_event,
event_filter=event_filter,
backfill=backfill,
)

processing_end_time = self.clock.time_msec()
Expand Down
172 changes: 136 additions & 36 deletions synapse/storage/databases/main/events_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
Mapping,
MutableMapping,
Optional,
Sequence,
Set,
Tuple,
cast,
Expand All @@ -42,6 +43,7 @@

import attr
from prometheus_client import Gauge
from typing_extensions import assert_never

from twisted.internet import defer

Expand Down Expand Up @@ -83,13 +85,17 @@
LoggingTransaction,
make_tuple_in_list_sql_clause,
)

# from synapse.storage.databases.main.stream import (
# generate_next_token,
# )
Comment on lines +89 to +91
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# from synapse.storage.databases.main.stream import (
# generate_next_token,
# )

from synapse.storage.types import Cursor
from synapse.storage.util.id_generators import (
AbstractStreamIdGenerator,
MultiWriterIdGenerator,
)
from synapse.storage.util.sequence import build_sequence_generator
from synapse.types import JsonDict, get_domain_from_id
from synapse.types import JsonDict, RoomStreamToken, get_domain_from_id
from synapse.types.state import StateFilter
from synapse.types.storage import _BackgroundUpdates
from synapse.util import unwrapFirstError
Expand All @@ -100,6 +106,7 @@
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
from synapse.util.metrics import Measure
from synapse.util.tokens import generate_next_token

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down Expand Up @@ -214,6 +221,34 @@ class EventRedactBehaviour(Enum):
block = auto()


@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventGapEntry:
"""
Represents a gap in the timeline.

From MSC3871: Gappy timeline
"""

event_id: str
"""
The target event ID which we see a gap before or after.
"""

prev_token: RoomStreamToken
"""
The token position before the target `event_id`

Remember: tokens are positions between events
"""

next_token: RoomStreamToken
"""
The token position after the target `event_id`

Remember: tokens are positions between events
"""


class EventsWorkerStore(SQLBaseStore):
# Whether to use dedicated DB threads for event fetching. This is only used
# if there are multiple DB threads available. When used will lock the DB
Expand Down Expand Up @@ -2315,15 +2350,24 @@ def is_event_next_to_backward_gap_txn(txn: LoggingTransaction) -> bool:
is_event_next_to_backward_gap_txn,
)

async def is_event_next_to_forward_gap(self, event: EventBase) -> bool:
"""Check if the given event is next to a forward gap of missing events.
The gap in front of the latest events is not considered a gap.
async def is_event_next_to_forward_gap(
self, event: EventBase, *, ignore_gap_after_latest: bool = True
) -> bool:
"""
Check if the given event is next to a forward gap of missing events.

By default when `ignore_gap_after_latest = True`, the gap in front of the
latest events is not considered a gap.

<latest messages> A(False)--->B(False)--->C(False)---> <gap, unknown events> <oldest messages>
<latest messages> A(False)--->B(False)---> <gap, unknown events> --->D(True)--->E(False) <oldest messages>

When `ignore_gap_after_latest = False`, `A` would be considered next to a gap.

Args:
room_id: room where the event lives
event: event to check (can't be an `outlier`)
ignore_gap_after_latest: Whether the gap after the latest events (forward
extremeties) in the room should be considered as an actual gap.
Comment on lines +2369 to +2370
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per matrix-org/matrix-spec-proposals#3871 (comment), we should revert the ignore_gap_after_latest change.

The default of "omit the gap after the latest messages in the room" is the correct choice


Returns:
Boolean indicating whether it's an extremity
Expand All @@ -2335,38 +2379,39 @@ async def is_event_next_to_forward_gap(self, event: EventBase) -> bool:
)

def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
# If the event in question is a forward extremity, we will just
# consider any potential forward gap as not a gap since it's one of
# the latest events in the room.
#
# `event_forward_extremities` does not include backfilled or outlier
# events so we can't rely on it to find forward gaps. We can only
# use it to determine whether a message is the latest in the room.
#
# We can't combine this query with the `forward_edge_query` below
# because if the event in question has no forward edges (isn't
# referenced by any other event's prev_events) but is in
# `event_forward_extremities`, we don't want to return 0 rows and
# say it's next to a gap.
forward_extremity_query = """
SELECT 1 FROM event_forward_extremities
WHERE
room_id = ?
AND event_id = ?
LIMIT 1
"""
if ignore_gap_after_latest:
# If the event in question is a forward extremity, we will just
# consider any potential forward gap as not a gap since it's one of
# the latest events in the room.
#
# `event_forward_extremities` does not include backfilled or outlier
# events so we can't rely on it to find forward gaps. We can only
# use it to determine whether a message is the latest in the room.
#
# We can't combine this query with the `forward_edge_query` below
# because if the event in question has no forward edges (isn't
# referenced by any other event's prev_events) but is in
# `event_forward_extremities`, we don't want to return 0 rows and
# say it's next to a gap.
forward_extremity_query = """
SELECT 1 FROM event_forward_extremities
WHERE
room_id = ?
AND event_id = ?
LIMIT 1
"""

# We consider any forward extremity as the latest in the room and
# not a forward gap.
#
# To expand, even though there is technically a gap at the front of
# the room where the forward extremities are, we consider those the
# latest messages in the room so asking other homeservers for more
# is useless. The new latest messages will just be federated as
# usual.
txn.execute(forward_extremity_query, (event.room_id, event.event_id))
if txn.fetchone():
return False
# We consider any forward extremity as the latest in the room and
# not a forward gap.
#
# To expand, even though there is technically a gap at the front of
# the room where the forward extremities are, we consider those the
# latest messages in the room so asking other homeservers for more
# is useless. The new latest messages will just be federated as
# usual.
txn.execute(forward_extremity_query, (event.room_id, event.event_id))
if txn.fetchone():
return False

# Check to see whether the event in question is already referenced
# by another event. If we don't see any edges, we're next to a
Expand Down Expand Up @@ -2398,6 +2443,61 @@ def is_event_next_to_gap_txn(txn: LoggingTransaction) -> bool:
is_event_next_to_gap_txn,
)

async def get_events_next_to_gaps(
self, events: Sequence[EventBase], direction: Direction
) -> Sequence[EventGapEntry]:
"""
Find all of the events that have gaps next to them.

When going backwards, we look for backward gaps (i.e. missing prev_events).

When going forwards, we look for forward gaps (i.e. events that aren't
referenced by any other events).

Args:
events: topological ordered list of events
direction: which side of the events to check for gaps. This should match the
direction we're paginating in.
"""

gaps = []
for event in events:
# FIXME: We should use a bulk look-up instead of N+1 queries.
if direction == Direction.BACKWARDS:
is_next_to_gap = await self.is_event_next_to_backward_gap(event)
elif direction == Direction.FORWARDS:
is_next_to_gap = await self.is_event_next_to_forward_gap(
event, ignore_gap_after_latest=False
)
else:
assert_never(direction)

if not is_next_to_gap:
continue

stream_ordering = event.internal_metadata.stream_ordering
assert stream_ordering is not None, (
"persisted events should have stream_ordering"
)

gaps.append(
EventGapEntry(
prev_token=generate_next_token(
direction=Direction.BACKWARDS,
last_topo_ordering=event.depth,
last_stream_ordering=stream_ordering,
),
event_id=event.event_id,
next_token=generate_next_token(
direction=Direction.FORWARDS,
last_topo_ordering=event.depth,
last_stream_ordering=stream_ordering,
),
)
)

return gaps

async def get_event_id_for_timestamp(
self, room_id: str, timestamp: int, direction: Direction
) -> Optional[str]:
Expand Down
2 changes: 1 addition & 1 deletion synapse/storage/databases/main/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,13 @@
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import (
generate_next_token,
generate_pagination_bounds,
generate_pagination_where_clause,
)
from synapse.storage.engines import PostgresEngine
from synapse.types import JsonDict, MultiWriterStreamToken, StreamKeyType, StreamToken
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.tokens import generate_next_token

if TYPE_CHECKING:
from synapse.server import HomeServer
Expand Down
Loading
Loading