Skip to content
This repository was archived by the owner on Mar 26, 2024. It is now read-only.

Commit 15ce6be

Browse files
committed
Update device inbox stream change caches before advancing ID generator
1 parent f2bbc2e commit 15ce6be

File tree

2 files changed

+11
-1
lines changed

2 files changed

+11
-1
lines changed

synapse/storage/databases/main/deviceinbox.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,16 +138,23 @@ def process_replication_rows(
138138
if stream_name == ToDeviceStream.NAME:
139139
# If replication is happening than postgres must be being used.
140140
assert isinstance(self._device_inbox_id_gen, MultiWriterIdGenerator)
141-
self._device_inbox_id_gen.advance(instance_name, token)
142141
for row in rows:
142+
# NOTE: here we always tell both stream change caches, either about
143+
# the entity or just the known position.
143144
if row.entity.startswith("@"):
144145
self._device_inbox_stream_cache.entity_has_changed(
145146
row.entity, token
146147
)
148+
self._device_federation_outbox_stream_cache.have_seen_position(
149+
token
150+
)
147151
else:
152+
self._device_inbox_stream_cache.have_seen_position(token)
148153
self._device_federation_outbox_stream_cache.entity_has_changed(
149154
row.entity, token
150155
)
156+
# Important that the ID gen advances after stream change caches
157+
self._device_inbox_id_gen.advance(instance_name, token)
151158
return super().process_replication_rows(stream_name, instance_name, token, rows)
152159

153160
def get_to_device_stream_token(self) -> int:

synapse/util/caches/stream_change_cache.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,9 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
215215
self._entity_to_key[entity] = stream_pos
216216
self._evict()
217217

218+
self.have_seen_position(stream_pos)
219+
220+
def have_seen_position(self, stream_pos: int) -> None:
218221
if stream_pos > self.max_stream_pos:
219222
self.max_stream_pos = stream_pos
220223

0 commit comments

Comments
 (0)