Skip to content

Commit 91c3794

Browse files
authored
allow rocksdb db to be shared amongst workers (#96)
1 parent 05c0ff9 commit 91c3794

File tree

18 files changed

+294
-162
lines changed

18 files changed

+294
-162
lines changed

faust/app/base.py

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,6 +1561,7 @@ async def _stop_consumer(self) -> None:
15611561
self.flow_control.clear()
15621562
await self._stop_fetcher()
15631563
await self._consumer_wait_empty(consumer, self.log)
1564+
await self.tables.stop()
15641565

15651566
async def _consumer_wait_empty(self, consumer: ConsumerT, logger: Any) -> None:
15661567
if self.conf.stream_wait_empty:
@@ -1604,7 +1605,7 @@ def on_rebalance_end(self) -> None:
16041605
try:
16051606
if not sensor_state:
16061607
self.log.warning(
1607-
"Missing sensor state for rebalance #%s", self.rebalancing_count
1608+
"Missing sensor state for rebalance end #%s", self.rebalancing_count
16081609
)
16091610
else:
16101611
self.sensors.on_rebalance_end(self, sensor_state)
@@ -1671,7 +1672,9 @@ async def _stop_fetcher(self) -> None:
16711672
def _on_rebalance_when_stopped(self) -> None:
16721673
self.consumer.close()
16731674

1674-
async def _on_partitions_assigned(self, assigned: Set[TP]) -> None:
1675+
async def _on_partitions_assigned(
1676+
self, assigned: Set[TP], generation_id: int = 0
1677+
) -> None:
16751678
"""Handle new topic partition assignment.
16761679
16771680
This is called during a rebalance after :meth:`on_partitions_revoked`.
@@ -1687,10 +1690,9 @@ async def _on_partitions_assigned(self, assigned: Set[TP]) -> None:
16871690
# (Kafka does not send error, it just logs)
16881691
session_timeout = self.conf.broker_session_timeout * 0.95
16891692
self.unassigned = not assigned
1690-
1693+
logger.info("Executing _on_partitions_assigned")
16911694
revoked, newly_assigned = self._update_assignment(assigned)
16921695
await asyncio.sleep(0)
1693-
16941696
with flight_recorder(self.log, timeout=session_timeout) as on_timeout:
16951697
consumer = self.consumer
16961698
try:
@@ -1715,7 +1717,9 @@ async def _on_partitions_assigned(self, assigned: Set[TP]) -> None:
17151717
)
17161718
on_timeout.info("tables.on_rebalance()")
17171719
await asyncio.sleep(0)
1718-
await T(self.tables.on_rebalance)(assigned, revoked, newly_assigned)
1720+
await T(self.tables.on_rebalance)(
1721+
assigned, revoked, newly_assigned, generation_id
1722+
)
17191723
on_timeout.info("+send signal: on_partitions_assigned")
17201724
await T(self.on_partitions_assigned.send)(assigned)
17211725
on_timeout.info("-send signal: on_partitions_assigned")

faust/stores/rocksdb.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""RocksDB storage."""
22
import asyncio
3+
import gc
34
import math
45
import shutil
56
import typing
@@ -332,6 +333,7 @@ async def on_rebalance(
332333
assigned: Set[TP],
333334
revoked: Set[TP],
334335
newly_assigned: Set[TP],
336+
generation_id: int = 0,
335337
) -> None:
336338
"""Rebalance occurred.
337339
@@ -345,11 +347,14 @@ async def on_rebalance(
345347
self.rebalance_ack = False
346348
async with self.db_lock:
347349
self.revoke_partitions(table, revoked)
348-
await self.assign_partitions(table, newly_assigned)
350+
await self.assign_partitions(table, newly_assigned, generation_id)
349351

350352
async def stop(self) -> None:
351-
for db in self._dbs.values():
352-
db.close()
353+
self.logger.info("Closing rocksdb on stop")
354+
# for db in self._dbs.values():
355+
# db.close()
356+
self._dbs.clear()
357+
gc.collect()
353358

354359
def revoke_partitions(self, table: CollectionT, tps: Set[TP]) -> None:
355360
"""De-assign partitions used on this worker instance.
@@ -363,9 +368,13 @@ def revoke_partitions(self, table: CollectionT, tps: Set[TP]) -> None:
363368
if tp.topic in table.changelog_topic.topics:
364369
db = self._dbs.pop(tp.partition, None)
365370
if db is not None:
366-
db.close()
371+
self.logger.info(f"closing db {tp.topic} partition {tp.partition}")
372+
# db.close()
373+
gc.collect()
367374

368-
async def assign_partitions(self, table: CollectionT, tps: Set[TP]) -> None:
375+
async def assign_partitions(
376+
self, table: CollectionT, tps: Set[TP], generation_id: int = 0
377+
) -> None:
369378
"""Assign partitions to this worker instance.
370379
371380
Arguments:
@@ -377,26 +386,42 @@ async def assign_partitions(self, table: CollectionT, tps: Set[TP]) -> None:
377386
my_topics = table.changelog_topic.topics
378387
for tp in tps:
379388
if tp.topic in my_topics and tp not in standby_tps and self.rebalance_ack:
380-
await self._try_open_db_for_partition(tp.partition)
389+
await self._try_open_db_for_partition(
390+
tp.partition, generation_id=generation_id
391+
)
381392
await asyncio.sleep(0)
382393

383394
async def _try_open_db_for_partition(
384-
self, partition: int, max_retries: int = 30, retry_delay: float = 1.0
395+
self,
396+
partition: int,
397+
max_retries: int = 30,
398+
retry_delay: float = 1.0,
399+
generation_id: int = 0,
385400
) -> DB:
386401
for i in range(max_retries):
387402
try:
388403
# side effect: opens db and adds to self._dbs.
404+
self.logger.info(
405+
f"opening partition {partition} for gen id "
406+
f"{generation_id} app id {self.app.consumer_generation_id}"
407+
)
389408
return self._db_for_partition(partition)
390409
except rocksdb.errors.RocksIOError as exc:
391410
if i == max_retries - 1 or "lock" not in repr(exc):
392411
# release all the locks and crash
412+
self.log.warning(
413+
"DB for partition %r retries timed out ", partition
414+
)
393415
await self.stop()
394416
raise
395417
self.log.info(
396418
"DB for partition %r is locked! Retry in 1s...", partition
397419
)
398-
if not self.rebalance_ack:
399-
self.log.info("Rebalanced again giving up partition", partition)
420+
if generation_id != self.app.consumer_generation_id:
421+
self.log.info(
422+
f"Rebalanced again giving up partition {partition} gen id"
423+
f" {generation_id} app {self.app.consumer_generation_id}"
424+
)
400425
return
401426
await self.sleep(retry_delay)
402427
else: # pragma: no cover

faust/tables/base.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -561,10 +561,16 @@ def _windowed_delta(self, key: Any, d: Seconds, event: EventT = None) -> Any:
561561
)
562562

563563
async def on_rebalance(
564-
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]
564+
self,
565+
assigned: Set[TP],
566+
revoked: Set[TP],
567+
newly_assigned: Set[TP],
568+
generation_id: int = 0,
565569
) -> None:
566570
"""Call when cluster is rebalancing."""
567-
await self.data.on_rebalance(self, assigned, revoked, newly_assigned)
571+
await self.data.on_rebalance(
572+
self, assigned, revoked, newly_assigned, generation_id
573+
)
568574

569575
async def on_recovery_completed(
570576
self, active_tps: Set[TP], standby_tps: Set[TP]

faust/tables/manager.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,18 +181,26 @@ def on_partitions_revoked(self, revoked: Set[TP]) -> None:
181181
T(self.recovery.on_partitions_revoked)(revoked)
182182

183183
async def on_rebalance(
184-
self, assigned: Set[TP], revoked: Set[TP], newly_assigned: Set[TP]
184+
self,
185+
assigned: Set[TP],
186+
revoked: Set[TP],
187+
newly_assigned: Set[TP],
188+
generation_id: int = 0,
185189
) -> None:
186190
"""Call when the cluster is rebalancing."""
187191
self._recovery_started.set() # cannot add more tables.
188192
T = traced_from_parent_span()
189193
for table in self.values():
190-
await T(table.on_rebalance)(assigned, revoked, newly_assigned)
194+
await T(table.on_rebalance)(
195+
assigned, revoked, newly_assigned, generation_id
196+
)
191197

192198
await asyncio.sleep(0)
193199
await T(self._update_channels)()
194200
await asyncio.sleep(0)
195-
await T(self.recovery.on_rebalance)(assigned, revoked, newly_assigned)
201+
await T(self.recovery.on_rebalance)(
202+
assigned, revoked, newly_assigned, generation_id
203+
)
196204

197205
async def wait_until_recovery_completed(self) -> bool:
198206
if (

0 commit comments

Comments
 (0)