Skip to content
This repository was archived by the owner on Apr 12, 2024. It is now read-only.

Commit b1433bf

Browse files
Don't table scan events on worker startup (#8419)
* Fix table scan of events on worker startup. This happened because we assumed "new" writers had an initial stream position of 0, so the replication code tried to fetch all events written by the instance between 0 and the current position. Instead, set the initial position of new writers to the current persisted up to position, on the assumption that new writers won't have written anything before that point. * Consider old writers coming back as "new". Otherwise we'd try and fetch entries between the old stale token and the current position, even though it won't have written any rows. Co-authored-by: Andrew Morgan <[email protected]> Co-authored-by: Andrew Morgan <[email protected]>
1 parent 2649d54 commit b1433bf

File tree

3 files changed

+44
-1
lines changed

3 files changed

+44
-1
lines changed

changelog.d/8419.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add experimental support for sharding event persister.

synapse/storage/util/id_generators.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,19 @@ def _load_current_ids(
273273

274274
# Load the current positions of all writers for the stream.
275275
if self._writers:
276+
# We delete any stale entries in the positions table. This is
277+
# important if we add back a writer after a long time; we want to
278+
# consider that a "new" writer, rather than using the old stale
279+
# entry here.
280+
sql = """
281+
DELETE FROM stream_positions
282+
WHERE
283+
stream_name = ?
284+
AND instance_name != ALL(?)
285+
"""
286+
sql = self._db.engine.convert_param_style(sql)
287+
cur.execute(sql, (self._stream_name, self._writers))
288+
276289
sql = """
277290
SELECT instance_name, stream_id FROM stream_positions
278291
WHERE stream_name = ?
@@ -453,11 +466,22 @@ def get_current_token_for_writer(self, instance_name: str) -> int:
453466
"""Returns the position of the given writer.
454467
"""
455468

469+
# If we don't have an entry for the given instance name, we assume it's a
470+
# new writer.
471+
#
472+
# For new writers we assume their initial position to be the current
473+
# persisted up to position. This stops Synapse from doing a full table
474+
# scan when a new writer announces itself over replication.
456475
with self._lock:
457-
return self._return_factor * self._current_positions.get(instance_name, 0)
476+
return self._return_factor * self._current_positions.get(
477+
instance_name, self._persisted_upto_position
478+
)
458479

459480
def get_positions(self) -> Dict[str, int]:
460481
"""Get a copy of the current positon map.
482+
483+
Note that this won't necessarily include all configured writers if some
484+
writers haven't written anything yet.
461485
"""
462486

463487
with self._lock:

tests/storage/test_id_generators.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,17 +390,28 @@ def test_writer_config_change(self):
390390
# Initial config has two writers
391391
id_gen = self._create_id_generator("first", writers=["first", "second"])
392392
self.assertEqual(id_gen.get_persisted_upto_position(), 3)
393+
self.assertEqual(id_gen.get_current_token_for_writer("first"), 3)
394+
self.assertEqual(id_gen.get_current_token_for_writer("second"), 5)
393395

394396
# New config removes one of the configs. Note that if the writer is
395397
# removed from config we assume that it has been shut down and has
396398
# finished persisting, hence why the persisted upto position is 5.
397399
id_gen_2 = self._create_id_generator("second", writers=["second"])
398400
self.assertEqual(id_gen_2.get_persisted_upto_position(), 5)
401+
self.assertEqual(id_gen_2.get_current_token_for_writer("second"), 5)
399402

400403
# This config points to a single, previously unused writer.
401404
id_gen_3 = self._create_id_generator("third", writers=["third"])
402405
self.assertEqual(id_gen_3.get_persisted_upto_position(), 5)
403406

407+
# For new writers we assume their initial position to be the current
408+
# persisted up to position. This stops Synapse from doing a full table
409+
# scan when a new writer comes along.
410+
self.assertEqual(id_gen_3.get_current_token_for_writer("third"), 5)
411+
412+
id_gen_4 = self._create_id_generator("fourth", writers=["third"])
413+
self.assertEqual(id_gen_4.get_current_token_for_writer("third"), 5)
414+
404415
# Check that we get a sane next stream ID with this new config.
405416

406417
async def _get_next_async():
@@ -410,6 +421,13 @@ async def _get_next_async():
410421
self.get_success(_get_next_async())
411422
self.assertEqual(id_gen_3.get_persisted_upto_position(), 6)
412423

424+
# If we add back the old "first" then we shouldn't see the persisted up
425+
# to position revert back to 3.
426+
id_gen_5 = self._create_id_generator("five", writers=["first", "third"])
427+
self.assertEqual(id_gen_5.get_persisted_upto_position(), 6)
428+
self.assertEqual(id_gen_5.get_current_token_for_writer("first"), 6)
429+
self.assertEqual(id_gen_5.get_current_token_for_writer("third"), 6)
430+
413431
def test_sequence_consistency(self):
414432
"""Test that we error out if the table and sequence diverges.
415433
"""

0 commit comments

Comments
 (0)