|
62 | 62 | ) |
63 | 63 | from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker |
64 | 64 | from synapse.replication.tcp.streams import BackfillStream |
65 | | -from synapse.replication.tcp.streams.events import EventsStream |
| 65 | +from synapse.replication.tcp.streams.events import ( |
| 66 | + EventsStream, |
| 67 | + EventsStreamCurrentStateRow, |
| 68 | + EventsStreamEventRow, |
| 69 | +) |
66 | 70 | from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause |
67 | 71 | from synapse.storage.database import ( |
68 | 72 | DatabasePool, |
@@ -334,7 +338,23 @@ def process_replication_rows( |
334 | 338 | token: int, |
335 | 339 | rows: Iterable[Any], |
336 | 340 | ) -> None: |
| 341 | + # Process event stream replication rows, handling both the ID generators from the events |
| 342 | + # worker store and the stream change caches in this store as the two are interlinked. |
337 | 343 | if stream_name == EventsStream.NAME: |
| 344 | + for row in rows: |
| 345 | + if row.type == EventsStreamEventRow.TypeId: |
| 346 | + self._events_stream_cache.entity_has_changed( |
| 347 | + row.data.room_id, token |
| 348 | + ) |
| 349 | + if row.data.type == EventTypes.Member: |
| 350 | + self._membership_stream_cache.entity_has_changed( |
| 351 | + row.data.state_key, token |
| 352 | + ) |
| 353 | + if row.type == EventsStreamCurrentStateRow.TypeId: |
| 354 | + self._curr_state_delta_stream_cache.entity_has_changed( |
| 355 | + row.data.room_id, token |
| 356 | + ) |
| 357 | + # Important that the ID gen advances after stream change caches |
338 | 358 | self._stream_id_gen.advance(instance_name, token) |
339 | 359 | elif stream_name == BackfillStream.NAME: |
340 | 360 | self._backfill_id_gen.advance(instance_name, -token) |
|
0 commit comments