2727
2828
2929class StreamChangeCache :
30- """Keeps track of the stream positions of the latest change in a set of entities.
30+ """
31+ Keeps track of the stream positions of the latest change in a set of entities.
32+
33+ The entity will is typically a room ID or user ID, but can be any string.
3134
32- Typically the entity will be a room or user id.
35+ Can be queried for whether a specific entity has changed after a stream position
36+ or for a list of changed entities after a stream position. See the individual
37+ methods for more information.
3338
34- Given a list of entities and a stream position, it will give a subset of
35- entities that may have changed since that position. If position key is too
36- old then the cache will simply return all given entities.
39+ Only tracks to a maximum cache size, any position earlier than the earliest
40+ known stream position must be treated as unknown.
3741 """
3842
3943 def __init__ (
@@ -45,16 +49,20 @@ def __init__(
4549 ) -> None :
4650 self ._original_max_size : int = max_size
4751 self ._max_size = math .floor (max_size )
48- self ._entity_to_key : Dict [EntityType , int ] = {}
4952
50- # map from stream id to the a set of entities which changed at that stream id.
53+ # map from stream id to the set of entities which changed at that stream id.
5154 self ._cache : SortedDict [int , Set [EntityType ]] = SortedDict ()
55+ # map from entity to the stream ID of the latest change for that entity.
56+ #
57+ # Must be kept in sync with _cache.
58+ self ._entity_to_key : Dict [EntityType , int ] = {}
5259
5360 # the earliest stream_pos for which we can reliably answer
5461 # get_all_entities_changed. In other words, one less than the earliest
5562 # stream_pos for which we know _cache is valid.
5663 #
5764 self ._earliest_known_stream_pos = current_stream_pos
65+
5866 self .name = name
5967 self .metrics = caches .register_cache (
6068 "cache" , self .name , self ._cache , resize_callback = self .set_cache_factor
@@ -82,38 +90,74 @@ def set_cache_factor(self, factor: float) -> bool:
8290 return False
8391
8492 def has_entity_changed (self , entity : EntityType , stream_pos : int ) -> bool :
85- """Returns True if the entity may have been updated since stream_pos"""
93+ """
94+ Returns True if the entity may have been updated after stream_pos.
95+
96+ Args:
97+ entity: The entity to check for changes.
98+ stream_pos: The stream position to check for changes after.
99+
100+ Return:
101+ True if the entity may have been updated, this happens if:
102+ * The given stream position is at or earlier than the earliest
103+ known stream position.
104+ * The given stream position is earlier than the latest change for
105+ the entity.
106+
107+ False otherwise:
108+ * The entity is unknown.
109+ * The given stream position is at or later than the latest change
110+ for the entity.
111+ """
86112 assert isinstance (stream_pos , int )
87113
88- if stream_pos < self ._earliest_known_stream_pos :
114+ # _cache is not valid at or before the earliest known stream position, so
115+ # return that the entity has changed.
116+ if stream_pos <= self ._earliest_known_stream_pos :
89117 self .metrics .inc_misses ()
90118 return True
91119
120+ # If the entity is unknown, it hasn't changed.
92121 latest_entity_change_pos = self ._entity_to_key .get (entity , None )
93122 if latest_entity_change_pos is None :
94123 self .metrics .inc_hits ()
95124 return False
96125
126+ # This is a known entity, return true if the stream position is earlier
127+ # than the last change.
97128 if stream_pos < latest_entity_change_pos :
98129 self .metrics .inc_misses ()
99130 return True
100131
132+ # Otherwise, the stream position is after the latest change: return false.
101133 self .metrics .inc_hits ()
102134 return False
103135
104136 def get_entities_changed (
105137 self , entities : Collection [EntityType ], stream_pos : int
106138 ) -> Union [Set [EntityType ], FrozenSet [EntityType ]]:
107139 """
108- Returns subset of entities that have had new things since the given
109- position. Entities unknown to the cache will be returned. If the
110- position is too old it will just return the given list.
140+ Returns the subset of the given entities that have had changes after the given position.
141+
142+ Entities unknown to the cache will be returned.
143+
144+ If the position is too old it will just return the given list.
145+
146+ Args:
147+ entities: Entities to check for changes.
148+ stream_pos: The stream position to check for changes after.
149+
150+ Return:
151+ A subset of entities which have changed after the given stream position.
152+
153+ This will be all entities if the given stream position is at or earlier
154+ than the earliest known stream position.
111155 """
112156 changed_entities = self .get_all_entities_changed (stream_pos )
113157 if changed_entities is not None :
114158 # We now do an intersection, trying to do so in the most efficient
115159 # way possible (some of these sets are *large*). First check in the
116- # given iterable is already set that we can reuse, otherwise we
160+ # given iterable is already a set that we can reuse, otherwise we
117161 # create a set of the *smallest* of the two iterables and call
118162 # `intersection(..)` on it (this can be twice as fast as the reverse).
119163 if isinstance (entities , (set , frozenset )):
@@ -130,29 +174,57 @@ def get_entities_changed(
130174 return result
131175
132176 def has_any_entity_changed (self , stream_pos : int ) -> bool :
133- """Returns if any entity has changed"""
134- assert type (stream_pos ) is int
177+ """
178+ Returns true if any entity has changed after the given stream position.
179+
180+ Args:
181+ stream_pos: The stream position to check for changes after.
182+
183+ Return:
184+ True if any entity has changed after the given stream position or
185+ if the given stream position is at or earlier than the earliest
186+ known stream position.
187+
188+ False otherwise.
189+ """
190+ assert isinstance (stream_pos , int )
135191
136192 if not self ._cache :
137193 # If the cache is empty, nothing can have changed.
138194 return False
139195
140- if stream_pos >= self ._earliest_known_stream_pos :
141- self .metrics .inc_hits ()
142- return self ._cache .bisect_right (stream_pos ) < len (self ._cache )
143- else :
196+ # _cache is not valid at or before the earliest known stream position, so
197+ # return that an entity has changed.
198+ if stream_pos <= self ._earliest_known_stream_pos :
144199 self .metrics .inc_misses ()
145200 return True
146201
202+ self .metrics .inc_hits ()
203+ return stream_pos < self ._cache .peekitem ()[0 ]
204+
147205 def get_all_entities_changed (self , stream_pos : int ) -> Optional [List [EntityType ]]:
148- """Returns all entities that have had new things since the given
149- position. If the position is too old it will return None.
206+ """
207+ Returns all entities that have had changes after the given position.
208+
209+ If the stream change cache does not go far enough back, i.e. the position
210+ is too old, it will return None.
150211
151212 Returns the entities in the order that they were changed.
213+
214+ Args:
215+ stream_pos: The stream position to check for changes after.
216+
217+ Return:
218+ Entities which have changed after the given stream position.
219+
220+ None if the given stream position is at or earlier than the earliest
221+ known stream position.
152222 """
153- assert type (stream_pos ) is int
223+ assert isinstance (stream_pos , int )
154224
155- if stream_pos < self ._earliest_known_stream_pos :
225+ # _cache is not valid at or before the earliest known stream position, so
226+ # return None to mark that it is unknown if an entity has changed.
227+ if stream_pos <= self ._earliest_known_stream_pos :
156228 return None
157229
158230 changed_entities : List [EntityType ] = []
@@ -162,11 +234,17 @@ def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]
162234 return changed_entities
163235
164236 def entity_has_changed (self , entity : EntityType , stream_pos : int ) -> None :
165- """Informs the cache that the entity has been changed at the given
166- position.
167237 """
168- assert type (stream_pos ) is int
238+ Informs the cache that the entity has been changed at the given position.
239+
240+ Args:
241+ entity: The entity to mark as changed.
242+ stream_pos: The stream position to update the entity to.
243+ """
244+ assert isinstance (stream_pos , int )
169245
246+ # For a change before _cache is valid (e.g. at or before the earliest known
247+ # stream position) there's nothing to do.
170248 if stream_pos <= self ._earliest_known_stream_pos :
171249 return
172250
@@ -189,6 +267,11 @@ def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
189267 self ._evict ()
190268
191269 def _evict (self ) -> None :
270+ """
271+ Ensure the cache has not exceeded the maximum size.
272+
273+ Evicts entries until it is at the maximum size.
274+ """
192275 # if the cache is too big, remove entries
193276 while len (self ._cache ) > self ._max_size :
194277 k , r = self ._cache .popitem (0 )
@@ -199,5 +282,12 @@ def _evict(self) -> None:
199282 def get_max_pos_of_last_change (self , entity : EntityType ) -> int :
200283 """Returns an upper bound of the stream id of the last change to an
201284 entity.
285+
286+ Args:
287+ entity: The entity to check.
288+
289+ Return:
290+ The stream position of the latest change for the given entity or
291+ the earliest known stream position if the entitiy is unknown.
202292 """
203293 return self ._entity_to_key .get (entity , self ._earliest_known_stream_pos )
0 commit comments