|
13 | 13 | # limitations under the License. |
14 | 14 |
|
15 | 15 | import logging |
16 | | -from collections import namedtuple |
17 | 16 | from typing import ( |
18 | 17 | Awaitable, |
19 | 18 | Callable, |
|
44 | 43 | from synapse.logging.utils import log_function |
45 | 44 | from synapse.metrics import LaterGauge |
46 | 45 | from synapse.streams.config import PaginationConfig |
47 | | -from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID |
| 46 | +from synapse.types import ( |
| 47 | + JsonDict, |
| 48 | + PersistedEventPosition, |
| 49 | + RoomStreamToken, |
| 50 | + StreamToken, |
| 51 | + UserID, |
| 52 | +) |
48 | 53 | from synapse.util.async_helpers import ObservableDeferred, timeout_deferred |
49 | 54 | from synapse.util.metrics import Measure |
50 | 55 | from synapse.visibility import filter_events_for_client |
@@ -178,7 +183,12 @@ def new_listener(self, token: StreamToken) -> _NotificationListener: |
178 | 183 | return _NotificationListener(self.notify_deferred.observe()) |
179 | 184 |
|
180 | 185 |
|
181 | | -class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): |
| 186 | +@attr.s(slots=True, frozen=True, auto_attribs=True) |
| 187 | +class EventStreamResult: |
| 188 | + events: List[Union[JsonDict, EventBase]] |
| 189 | + start_token: StreamToken |
| 190 | + end_token: StreamToken |
| 191 | + |
182 | 192 | def __bool__(self): |
183 | 193 | return bool(self.events) |
184 | 194 |
|
@@ -582,9 +592,12 @@ async def check_for_updates( |
582 | 592 | before_token: StreamToken, after_token: StreamToken |
583 | 593 | ) -> EventStreamResult: |
584 | 594 | if after_token == before_token: |
585 | | - return EventStreamResult([], (from_token, from_token)) |
| 595 | + return EventStreamResult([], from_token, from_token) |
586 | 596 |
|
587 | | - events: List[EventBase] = [] |
| 597 | + # The events fetched from each source are a JsonDict, EventBase, or |
| 598 | + # UserPresenceState, but see below for UserPresenceState being |
| 599 | + # converted to JsonDict. |
| 600 | + events: List[Union[JsonDict, EventBase]] = [] |
588 | 601 | end_token = from_token |
589 | 602 |
|
590 | 603 | for name, source in self.event_sources.sources.get_sources(): |
@@ -623,7 +636,7 @@ async def check_for_updates( |
623 | 636 | events.extend(new_events) |
624 | 637 | end_token = end_token.copy_and_replace(keyname, new_key) |
625 | 638 |
|
626 | | - return EventStreamResult(events, (from_token, end_token)) |
| 639 | + return EventStreamResult(events, from_token, end_token) |
627 | 640 |
|
628 | 641 | user_id_for_stream = user.to_string() |
629 | 642 | if is_peeking: |
|
0 commit comments