Skip to content

Commit bc6141d

Browse files
committed
add rwlock timeout
1 parent cf0443a commit bc6141d

File tree

5 files changed

+113
-31
lines changed

5 files changed

+113
-31
lines changed

rock/admin/core/ray_service.py

Lines changed: 23 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import ray
21
import time
32

4-
from rock.config import RayConfig
5-
from rock.logger import init_logger
6-
from rock.utils.rwlock import AsyncRWLock
3+
import ray
74
from apscheduler.schedulers.asyncio import AsyncIOScheduler
85
from apscheduler.triggers.interval import IntervalTrigger
96

7+
from rock.config import RayConfig
8+
from rock.logger import init_logger
9+
from rock.utils.rwlock import AsyncRWLock, WriteLockTimeout
1010

1111
logger = init_logger(__name__)
1212

@@ -34,7 +34,6 @@ def increment_ray_request_count(self):
3434

3535
def get_ray_rwlock(self):
3636
return self._ray_rwlock
37-
3837

3938
def _setup_ray_reconnect_scheduler(self):
4039
self._ray_reconnection_scheduler = AsyncIOScheduler(
@@ -57,19 +56,22 @@ async def _ray_reconnect_with_policy(self):
5756
await self._reconnect_ray()
5857

5958
async def _reconnect_ray(self):
60-
async with self._ray_rwlock.write_lock():
61-
start_time = time.time()
62-
logger.info(f"current time {start_time}, Reconnect ray cluster")
63-
ray.shutdown()
64-
ray.init(
65-
address=self._config.address,
66-
runtime_env=self._config.runtime_env,
67-
namespace=self._config.namespace,
68-
resources=self._config.resources,
69-
)
70-
self._ray_request_count = 0
71-
end_time = time.time()
72-
self._ray_establish_time = end_time
73-
logger.info(
74-
f"current time {end_time}, Reconnect ray cluster successfully, duration {end_time - start_time}s"
75-
)
59+
try:
60+
async with self._ray_rwlock.write_lock(timeout=self._config.ray_reconnect_wait_timeout_seconds):
61+
start_time = time.time()
62+
logger.info(f"current time {start_time}, Reconnect ray cluster")
63+
ray.shutdown()
64+
ray.init(
65+
address=self._config.address,
66+
runtime_env=self._config.runtime_env,
67+
namespace=self._config.namespace,
68+
resources=self._config.resources,
69+
)
70+
self._ray_request_count = 0
71+
end_time = time.time()
72+
self._ray_establish_time = end_time
73+
logger.info(
74+
f"current time {end_time}, Reconnect ray cluster successfully, duration {end_time - start_time}s"
75+
)
76+
except WriteLockTimeout as e:
77+
logger.warning("Reconnect ray cluster timeout, skip reconnectting", exc_info=e)

rock/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class RayConfig:
2222
ray_reconnect_interval_seconds: int = field(default=60 * 60 * 12)
2323
ray_reconnect_request_threshold: int = field(default=10 * 1024 * 1024)
2424
ray_reconnect_check_interval_seconds: int = field(default=60 * 10)
25+
ray_reconnect_wait_timeout_seconds: int = field(default=30)
2526

2627

2728
@dataclass

rock/utils/rwlock.py

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,37 @@
22
from contextlib import asynccontextmanager
33

44

5+
class WriteLockTimeout(Exception):
6+
pass
7+
8+
59
class AsyncRWLock:
610
"""
711
An asynchronous reader-writer lock implementation that allows multiple concurrent readers
812
or exclusive writers. This lock ensures that:
9-
13+
1014
- Multiple readers can acquire the lock simultaneously when no writer is active
1115
- Writers have exclusive access when acquiring the lock (no readers or other writers)
1216
- Writers are prioritized over new readers when a writer is waiting, preventing reader starvation
1317
- The lock is awaitable and integrates with Python's asyncio framework
14-
18+
1519
The implementation uses asyncio.Condition to manage the waiting queue and coordinate
1620
between readers and writers safely in an asynchronous context.
17-
21+
1822
Usage:
1923
lock = AsyncRWLock()
20-
24+
2125
# For read operations (shared access)
2226
async with lock.read_lock():
2327
# Multiple coroutines can enter this block simultaneously
2428
data = await read_data()
25-
29+
2630
# For write operations (exclusive access)
2731
async with lock.write_lock():
2832
# Only one coroutine can enter this block at a time
2933
await write_data(new_data)
3034
"""
35+
3136
def __init__(self):
3237
self._readers = 0
3338
self._writer = False
@@ -56,13 +61,25 @@ async def read_lock(self):
5661
finally:
5762
await self.release_read()
5863

59-
async def acquire_write(self):
64+
async def acquire_write(self, timeout=None):
6065
async with self._cond:
6166
self._writer_waiting += 1
6267
try:
63-
while self._writer or self._readers > 0:
64-
await self._cond.wait()
68+
69+
async def wait_for_unlock():
70+
while self._writer or self._readers > 0:
71+
await self._cond.wait()
72+
73+
try:
74+
if timeout is None:
75+
await wait_for_unlock()
76+
else:
77+
await asyncio.wait_for(wait_for_unlock(), timeout=timeout)
78+
except asyncio.TimeoutError:
79+
return False
80+
6581
self._writer = True
82+
return True
6683
finally:
6784
self._writer_waiting -= 1
6885

@@ -74,8 +91,10 @@ async def release_write(self):
7491
self._cond.notify_all()
7592

7693
@asynccontextmanager
77-
async def write_lock(self):
78-
await self.acquire_write()
94+
async def write_lock(self, timeout=None):
95+
ready = await self.acquire_write(timeout=timeout)
96+
if not ready:
97+
raise WriteLockTimeout()
7998
try:
8099
yield
81100
finally:

tests/unit/admin/core/test_ray_reconnect.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,27 @@ async def test_reconnect_ray_calls_ray_shutdown_and_init_and_reset_counters(ray_
4343
assert service._ray_establish_time == old_establish_time + 5
4444

4545
assert service._ray_establish_time != old_establish_time
46+
47+
48+
@pytest.mark.asyncio
49+
async def test_reconnect_ray_skip_when_reader_exists_and_write_lock_timeout(ray_service: RayService):
50+
service = ray_service
51+
52+
service._ray_request_count = 123
53+
service._config.ray_reconnect_wait_timeout_seconds = 5
54+
55+
old_count = service._ray_request_count
56+
old_est = service._ray_establish_time
57+
58+
service._ray_rwlock._readers = 1
59+
60+
with patch("rock.admin.core.ray_service.ray.shutdown") as mock_shutdown, patch(
61+
"rock.admin.core.ray_service.ray.init"
62+
) as mock_init, patch("time.time", return_value=old_est + 5):
63+
await service._reconnect_ray()
64+
65+
mock_shutdown.assert_not_called()
66+
mock_init.assert_not_called()
67+
68+
assert service._ray_request_count == old_count
69+
assert service._ray_establish_time == old_est
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
import asyncio
2+
3+
import pytest
4+
5+
from rock.utils.rwlock import AsyncRWLock, WriteLockTimeout
6+
7+
8+
@pytest.mark.asyncio
9+
async def test_rwlock_write_timeout_then_read_lock_ok():
10+
lock = AsyncRWLock()
11+
12+
lock._readers = 1
13+
14+
async def writer():
15+
with pytest.raises(WriteLockTimeout):
16+
async with lock.write_lock(timeout=5):
17+
pytest.fail("write_lock should have timed out and not enter this block")
18+
19+
await writer()
20+
21+
assert lock._writer is False
22+
assert lock._writer_waiting == 0
23+
24+
lock._readers = 0
25+
26+
acquired = False
27+
28+
async def reader():
29+
nonlocal acquired
30+
async with lock.read_lock():
31+
acquired = True
32+
await asyncio.sleep(1)
33+
34+
await reader()
35+
36+
assert acquired is True, "Read lock should be obtainable after write timeout"

0 commit comments

Comments
 (0)