@@ -147,6 +147,8 @@ class Store(base.SerializedStore):
147147
148148 _dbs : MutableMapping [int , DB ]
149149 _key_index : LRUCache [bytes , int ]
150+ rebalance_ack : bool
151+ db_lock : asyncio .Lock
150152
151153 def __init__ (
152154 self ,
@@ -178,6 +180,8 @@ def __init__(
178180 self .key_index_size = key_index_size
179181 self ._dbs = {}
180182 self ._key_index = LRUCache (limit = self .key_index_size )
183+ self .db_lock = asyncio .Lock ()
184+ self .rebalance_ack = False
181185
182186 def persisted_offset (self , tp : TP ) -> Optional [int ]:
183187 """Return the last persisted offset.
@@ -338,8 +342,10 @@ async def on_rebalance(
338342 newly_assigned: Set of newly assigned topic partitions,
339343 for which we were not assigned the last time.
340344 """
341- self .revoke_partitions (table , revoked )
342- await self .assign_partitions (table , newly_assigned )
345+ self .rebalance_ack = False
346+ async with self .db_lock :
347+ self .revoke_partitions (table , revoked )
348+ await self .assign_partitions (table , newly_assigned )
343349
344350 async def stop (self ) -> None :
345351 for db in self ._dbs .values ():
@@ -366,16 +372,16 @@ async def assign_partitions(self, table: CollectionT, tps: Set[TP]) -> None:
366372 table: The table that we store data for.
367373 tps: Set of topic partitions we have been assigned.
368374 """
375+ self .rebalance_ack = True
369376 standby_tps = self .app .assignor .assigned_standbys ()
370377 my_topics = table .changelog_topic .topics
371-
372378 for tp in tps :
373- if tp .topic in my_topics and tp not in standby_tps :
379+ if tp .topic in my_topics and tp not in standby_tps and self . rebalance_ack :
374380 await self ._try_open_db_for_partition (tp .partition )
375381 await asyncio .sleep (0 )
376382
377383 async def _try_open_db_for_partition (
378- self , partition : int , max_retries : int = 60 , retry_delay : float = 1.0
384+ self , partition : int , max_retries : int = 30 , retry_delay : float = 1.0
379385 ) -> DB :
380386 for i in range (max_retries ):
381387 try :
@@ -389,6 +395,9 @@ async def _try_open_db_for_partition(
389395 self .log .info (
390396 "DB for partition %r is locked! Retry in 1s..." , partition
391397 )
398+ if not self .rebalance_ack :
399+ self .log .info ("Rebalanced again giving up partition" , partition )
400+ return
392401 await self .sleep (retry_delay )
393402 else : # pragma: no cover
394403 ...
0 commit comments