Skip to content

Commit 9a950f6

Browse files
committed
make readwrite more safe
1 parent 3592ad3 commit 9a950f6

File tree

2 files changed

+582
-181
lines changed

2 files changed

+582
-181
lines changed

libp2p/stream_muxer/mplex/mplex_stream.py

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -35,55 +35,69 @@
3535

3636

3737
class ReadWriteLock:
38+
"""
39+
A read-write lock that allows multiple concurrent readers
40+
or one exclusive writer, implemented using Trio primitives.
41+
"""
42+
3843
def __init__(self) -> None:
3944
self._readers = 0
40-
self._readers_lock = trio.Lock() # Protects readers count
41-
self._writer_lock = trio.Semaphore(1) # Ensures mutual exclusion for writers
45+
self._readers_lock = trio.Lock() # Protects access to _readers count
46+
self._writer_lock = trio.Semaphore(1) # Allows only one writer at a time
4247

4348
async def acquire_read(self) -> None:
49+
"""Acquire a read lock. Multiple readers can hold it simultaneously."""
4450
try:
4551
async with self._readers_lock:
46-
self._readers += 1
47-
if self._readers == 1:
52+
if self._readers == 0:
4853
await self._writer_lock.acquire()
54+
self._readers += 1
4955
except trio.Cancelled:
50-
async with self._readers_lock:
51-
if self._readers > 0:
52-
self._readers -= 1
53-
if self._readers == 0:
54-
self._writer_lock.release()
5556
raise
5657

5758
async def release_read(self) -> None:
59+
"""Release a read lock."""
5860
async with self._readers_lock:
59-
self._readers -= 1
60-
if self._readers == 0:
61+
if self._readers == 1:
6162
self._writer_lock.release()
63+
self._readers -= 1
6264

6365
async def acquire_write(self) -> None:
66+
"""Acquire an exclusive write lock."""
6467
try:
6568
await self._writer_lock.acquire()
6669
except trio.Cancelled:
6770
raise
6871

6972
def release_write(self) -> None:
73+
"""Release the exclusive write lock."""
7074
self._writer_lock.release()
7175

7276
@asynccontextmanager
7377
async def read_lock(self) -> AsyncGenerator[None, None]:
74-
await self.acquire_read()
78+
"""Context manager for acquiring and releasing a read lock safely."""
79+
acquire = False
7580
try:
81+
await self.acquire_read()
82+
acquire = True
7683
yield
7784
finally:
78-
await self.release_read()
85+
if acquire:
86+
with trio.CancelScope() as scope:
87+
scope.shield = True
88+
await self.release_read()
7989

8090
@asynccontextmanager
8191
async def write_lock(self) -> AsyncGenerator[None, None]:
82-
await self.acquire_write()
92+
"""Context manager for acquiring and releasing a write lock safely."""
93+
acquire = False
8394
try:
95+
await self.acquire_write()
96+
acquire = True
8497
yield
8598
finally:
86-
self.release_write()
99+
if acquire:
100+
self.release_write()
87101

88102

89103
class MplexStream(IMuxedStream):

0 commit comments

Comments
 (0)