|
70 | 70 | from synapse.storage.engines import PostgresEngine |
71 | 71 | from synapse.storage.types import Cursor |
72 | 72 | from synapse.storage.util.id_generators import ( |
| 73 | + AbstractStreamIdGenerator, |
73 | 74 | AbstractStreamIdTracker, |
74 | 75 | MultiWriterIdGenerator, |
75 | 76 | StreamIdGenerator, |
@@ -292,6 +293,93 @@ def get_chain_id_txn(txn: Cursor) -> int: |
292 | 293 | id_column="chain_id", |
293 | 294 | ) |
294 | 295 |
|
| 296 | + self._un_partial_stated_events_stream_id_gen: AbstractStreamIdGenerator |
| 297 | + |
| 298 | + if isinstance(database.engine, PostgresEngine): |
| 299 | + self._un_partial_stated_events_stream_id_gen = MultiWriterIdGenerator( |
| 300 | + db_conn=db_conn, |
| 301 | + db=database, |
| 302 | + stream_name="un_partial_stated_event_stream", |
| 303 | + instance_name=hs.get_instance_name(), |
| 304 | + tables=[ |
| 305 | + ("un_partial_stated_event_stream", "instance_name", "stream_id") |
| 306 | + ], |
| 307 | + sequence_name="un_partial_stated_event_stream_sequence", |
| 308 | + # TODO(faster_joins, multiple writers) Support multiple writers. |
| 309 | + writers=["master"], |
| 310 | + ) |
| 311 | + else: |
| 312 | + self._un_partial_stated_events_stream_id_gen = StreamIdGenerator( |
| 313 | + db_conn, "un_partial_stated_event_stream", "stream_id" |
| 314 | + ) |
| 315 | + |
| 316 | + def get_un_partial_stated_events_token(self) -> int: |
| 317 | + # TODO(faster_joins, multiple writers): This is inappropriate if there are multiple |
| 318 | + # writers because workers that don't write often will hold all |
| 319 | + # readers up. |
| 320 | + return self._un_partial_stated_events_stream_id_gen.get_current_token() |
| 321 | + |
| 322 | + async def get_un_partial_stated_events_from_stream( |
| 323 | + self, instance_name: str, last_id: int, current_id: int, limit: int |
| 324 | + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: |
| 325 | + """Get updates for the un-partial-stated events replication stream. |
| 326 | +
|
| 327 | + Args: |
| 328 | + instance_name: The writer we want to fetch updates from. Unused |
| 329 | + here since there is only ever one writer. |
| 330 | + last_id: The token to fetch updates from. Exclusive. |
| 331 | + current_id: The token to fetch updates up to. Inclusive. |
| 332 | + limit: The requested limit for the number of rows to return. The |
| 333 | + function may return more or fewer rows. |
| 334 | +
|
| 335 | + Returns: |
| 336 | + A tuple consisting of: the updates, a token to use to fetch |
| 337 | + subsequent updates, and whether we returned fewer rows than exists |
| 338 | + between the requested tokens due to the limit. |
| 339 | +
|
| 340 | + The token returned can be used in a subsequent call to this |
| 341 | + function to get further updatees. |
| 342 | +
|
| 343 | + The updates are a list of 2-tuples of stream ID and the row data |
| 344 | + """ |
| 345 | + |
| 346 | + if last_id == current_id: |
| 347 | + return [], current_id, False |
| 348 | + |
| 349 | + def get_un_partial_stated_events_from_stream_txn( |
| 350 | + txn: LoggingTransaction, |
| 351 | + ) -> Tuple[List[Tuple[int, Tuple[str, bool]]], int, bool]: |
| 352 | + sql = """ |
| 353 | + SELECT stream_id, event_id, rejection_status_changed |
| 354 | + FROM un_partial_stated_event_stream |
| 355 | + WHERE ? < stream_id AND stream_id <= ? AND instance_name = ? |
| 356 | + ORDER BY stream_id ASC |
| 357 | + LIMIT ? |
| 358 | + """ |
| 359 | + txn.execute(sql, (last_id, current_id, instance_name, limit)) |
| 360 | + updates = [ |
| 361 | + ( |
| 362 | + row[0], |
| 363 | + ( |
| 364 | + row[1], |
| 365 | + bool(row[2]), |
| 366 | + ), |
| 367 | + ) |
| 368 | + for row in txn |
| 369 | + ] |
| 370 | + limited = False |
| 371 | + upto_token = current_id |
| 372 | + if len(updates) >= limit: |
| 373 | + upto_token = updates[-1][0] |
| 374 | + limited = True |
| 375 | + |
| 376 | + return updates, upto_token, limited |
| 377 | + |
| 378 | + return await self.db_pool.runInteraction( |
| 379 | + "get_un_partial_stated_events_from_stream", |
| 380 | + get_un_partial_stated_events_from_stream_txn, |
| 381 | + ) |
| 382 | + |
295 | 383 | def process_replication_rows( |
296 | 384 | self, |
297 | 385 | stream_name: str, |
|
0 commit comments