Conversation
|
Please take a look at this @seetadev, If you tell me some pointers on how to setup test files, that would be really helpful |
|
@Jineshbansal : Welcome to py-libp2p :) Thank you for submitting this PR. Appreciate your efforts and initiative. CCing @lla-dane, @acul71, @kaneki003 for their feedback and review on your PR. They will also help you in setting up the test suite, add key scenarios, newsfragment and arrive at a good conclusion on the issue addressed via the PR. Also, CCing @guha-rahul and @sumanjeet0012. Would like them to review your PR with the team and share feedback too. |
|
Also @Jineshbansal: Take a look at this file: newsfragment-readme, you can add a newsfragment file as per instructions here. |
Review: Locking Mechanism in
|
|
Example tests with trio import pytest
import trio
from unittest.mock import AsyncMock, MagicMock
from libp2p.stream_muxer.exceptions import StreamClosedError
from libp2p.stream_muxer.mplex.constants import MessageType
from libp2p.stream_muxer.mplex.mplex_stream import MplexStream
@pytest.fixture
def mock_muxer():
muxer = MagicMock()
muxer.send_message = AsyncMock()
return muxer
@pytest.mark.trio
async def test_write_calls_send_message(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.write(b"hello")
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.DATA, b"hello")
@pytest.mark.trio
async def test_write_raises_if_closed(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
with pytest.raises(StreamClosedError):
await stream.write(b"data")
@pytest.mark.trio
async def test_close_sends_close_message_once(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
await stream.close() # second call no-op
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.CLOSE)
@pytest.mark.trio
async def test_reset_sends_reset_message_and_closes(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.reset()
assert stream.is_closed()
mock_muxer.send_message.assert_awaited_once_with(1, MessageType.RESET)
@pytest.mark.trio
async def test_is_closed_reflects_state(mock_muxer):
stream = MplexStream(mock_muxer, 1)
assert not stream.is_closed()
await stream.close()
assert stream.is_closed()
@pytest.mark.trio
async def test_async_context_manager_calls_close(mock_muxer):
async with MplexStream(mock_muxer, 1) as stream:
await stream.write(b"test")
mock_muxer.send_message.assert_any_await(1, MessageType.CLOSE)
# --- New tests for Trio locking behavior ---
@pytest.mark.trio
async def test_concurrent_write_lock(mock_muxer):
stream = MplexStream(mock_muxer, 1)
async def writer(data):
await stream.write(data)
async with trio.open_nursery() as nursery:
nursery.start_soon(writer, b"hello")
nursery.start_soon(writer, b"world")
assert mock_muxer.send_message.await_count == 2
@pytest.mark.trio
async def test_concurrent_close_reset_lock(mock_muxer):
stream = MplexStream(mock_muxer, 1)
async def do_close():
await stream.close()
async def do_reset():
await stream.reset()
async with trio.open_nursery() as nursery:
nursery.start_soon(do_close)
nursery.start_soon(do_reset)
# Only one control message should be sent (CLOSE or RESET)
assert mock_muxer.send_message.await_count == 1
@pytest.mark.trio
async def test_write_after_close_raises(mock_muxer):
stream = MplexStream(mock_muxer, 1)
await stream.close()
with pytest.raises(StreamClosedError):
await stream.write(b"data") |
27c99d8 to
e65e38a
Compare
Review of
|
| Area | Suggestion |
|---|---|
| Lock API | Add context manager support (read_lock, write_lock) |
| Cancellation Safety | Handle Cancelled exceptions to maintain state |
| Tests | Replace sleep() with Event for reliability |
| New Tests | Cover actual read()/write() calls with lock instrumentation |
| Edge Cases | Add tests for reset, close, EOF behavior |
✅ Conclusion
The current implementation is structurally sound and aligns with Trio concurrency principles. A few refinements — especially regarding context managers, error/cancellation handling, and expanded test cases — will significantly improve robustness, maintainability, and test reliability.
|
@Jineshbansal Well done, if you can refine: better. |
fa7601f to
9a950f6
Compare
9a950f6 to
9cd3805
Compare
|
@Jineshbansal : This is indeed close to final review and merge. Great work, Jinesh. Please add a newsfragment file. Thank you @lla-dane and @acul71 for your continued support to Jinesh on this PR. Appreciate it. |
|
@lla-dane , @acul71 : Please share any final review points and comments. Once @Jineshbansal adds the newsfragment file, I'll merge it. @Jineshbansal : Also, please add developer docs and demo to the discussion page. Appreciate your initiative. Kindly get in touch with @lla-dane on this effort. |
|
@Jineshbansal - why are the graves being removed throughout? |
|
Earlier I thought this changes are comming from main branch, what now I noticed it, It may be due to I had run some python command or something. Sorry for that I will update this PR. |
|
@Jineshbansal |
|
@Jineshbansal The PR is good, see if you can discover more on the windows error (easiest: increase sleep time, better: poll in a timeout for an event) ReadWriteLock Implementation AnalysisOverviewThis document analyzes the ReadWriteLock implementation in the py-libp2p repository, specifically examining whether it fulfills the requirements mentioned in PR #748. Current Implementation Status✅ IMPLEMENTED: ReadWriteLock ClassThe repository contains a complete class ReadWriteLock:
"""
A read-write lock that allows multiple concurrent readers
or one exclusive writer, implemented using Trio primitives.
"""
def __init__(self) -> None:
self._readers = 0
self._readers_lock = trio.Lock() # Protects access to _readers count
self._writer_lock = trio.Semaphore(1) # Allows only one writer at a time
async def acquire_read(self) -> None:
"""Acquire a read lock. Multiple readers can hold it simultaneously."""
try:
async with self._readers_lock:
if self._readers == 0:
await self._writer_lock.acquire()
self._readers += 1
except trio.Cancelled:
raise
async def release_read(self) -> None:
"""Release a read lock."""
async with self._readers_lock:
if self._readers == 1:
self._writer_lock.release()
self._readers -= 1
async def acquire_write(self) -> None:
"""Acquire an exclusive write lock."""
try:
await self._writer_lock.acquire()
except trio.Cancelled:
raise
def release_write(self) -> None:
"""Release the exclusive write lock."""
self._writer_lock.release()
@asynccontextmanager
async def read_lock(self) -> AsyncGenerator[None, None]:
"""Context manager for acquiring and releasing a read lock safely."""
acquire = False
try:
await self.acquire_read()
acquire = True
yield
finally:
if acquire:
with trio.CancelScope() as scope:
scope.shield = True
await self.release_read()
@asynccontextmanager
async def write_lock(self) -> AsyncGenerator[None, None]:
"""Context manager for acquiring and releasing a write lock safely."""
acquire = False
try:
await self.acquire_write()
acquire = True
yield
finally:
if acquire:
self.release_write()✅ IMPLEMENTED: Integration with MplexStreamThe class MplexStream(IMuxedStream):
# ... other attributes ...
rw_lock: ReadWriteLock
def __init__(self, ...):
# ... other initialization ...
self.rw_lock = ReadWriteLock()
async def read(self, n: int | None = None) -> bytes:
async with self.rw_lock.read_lock():
# ... read implementation ...
async def write(self, data: bytes) -> None:
async with self.rw_lock.write_lock():
# ... write implementation ...✅ IMPLEMENTED: Comprehensive Test SuiteThe implementation includes extensive tests in
Key Features of the Implementation1. Trio-Based Implementation
2. Context Manager Support
3. Proper Lock Semantics
4. Integration with Stream Operations
Comparison with RequirementsBased on the PR comment analysis, the current implementation appears to fulfill the core requirements: ✅ Race Condition Prevention
✅ Thread Safety
✅ Performance Considerations
Additional Observations1. Test CoverageThe test suite is comprehensive and covers:
2. Code Quality
3. Integration
ConclusionThe current py-libp2p repository fully implements the ReadWriteLock functionality as requested. The implementation:
The implementation appears to be production-ready and addresses the concurrency issues that the original PR was intended to solve.
|
The discussion page I think is here: #723 |
|
@acul71 you have already make one discussion page that lgtm. Can't we only use that? |
Yes sorry can you give me the right link? |
|
@acul71 I am talking about this discussion page |
All right then. proceed and ask for merging |
|
@seetadev Please take a final look if it is ready to merge |
|
@Jineshbansal : Appreciate your efforts on this PR with a clean and well-reasoned implementation of read/write synchronization in The introduction of a custom Thank you for working on our feedback points. Also, wish to thank @acul71 for his guidance and pointers, and for reviewing the PR. Since this PR touches core stream safety and concurrency (critical for real-world performance and stability), would you be open to adding the following as part of the PR discussion or documentation? Developer Notes / GuideTo help new contributors understand and extend this work, a short write-up covering:
Screenshots / ScreencastsIf you can spare the time, adding:
These additions would be super helpful for onboarding new developers to Really appreciate your clean and focused work on this — locking logic is tricky, and this one is well done. |
|
@Jineshbansal : This looks ready to merge. As discussed above, please prepare the developer docs and add them to the discussion page. Wish to also thank @lla-dane for his review and feedback. Appreciate the pointers and guidance. |
Purpose
Implements the pending TODO for read/write synchronization in MplexStream.
Logic Behind the Lock
Steps Taken
Tests
To validate the above logic, a unit test has been added in libp2p/stream_muxer/mplex/test_read_write_lock.py.