Skip to content

Commit 20e98b3

Browse files
Levi080513claude
andcommitted
refactor: remove unnecessary threading.Lock from CHWBL scheduler
Ray Serve runs in a single-threaded asyncio event loop. The locked sections contain no await points, so there is no concurrent access. threading.Lock is unnecessary and could potentially block the event loop if contention ever occurred. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5dafc39 commit 20e98b3

File tree

1 file changed

+18
-24
lines changed

1 file changed

+18
-24
lines changed

cluster-image-builder/serve/_replica_scheduler/chwbl_scheduler.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import hashlib
22
import logging
33
import bisect
4-
import threading
54
from typing import Dict, List, Set, Tuple, Optional
65

76
from ray.serve._private.common import ReplicaID
@@ -39,9 +38,6 @@ def __init__(self, *args, **kwargs):
3938
self._hash_to_replica_id: Dict[int, ReplicaID] = {} # Maps hash points to replica IDs
4039
self._sorted_hashes: List[int] = [] # Sorted list of hash points for binary search
4140

42-
# Lock for thread-safe updates
43-
self._load_lock = threading.Lock()
44-
4541
def initialize_state(
4642
self,
4743
virtual_nodes_per_replica: int = 100,
@@ -61,18 +57,17 @@ def initialize_state(
6157
)
6258

6359
def _create_load_snapshot(self) -> Dict[ReplicaID, int]:
64-
"""Create a snapshot of current replica loads (thread-safe).
60+
"""Create a snapshot of current replica loads.
6561
6662
Returns:
6763
A dictionary mapping replica_id to its current load.
6864
"""
69-
with self._load_lock:
70-
snapshot = {}
71-
for replica_id in self._replicas:
72-
load = self._replica_queue_len_cache.get(replica_id)
73-
snapshot[replica_id] = load if load is not None else 0
65+
snapshot = {}
66+
for replica_id in self._replicas:
67+
load = self._replica_queue_len_cache.get(replica_id)
68+
snapshot[replica_id] = load if load is not None else 0
7469

75-
return snapshot
70+
return snapshot
7671

7772
async def choose_replicas(
7873
self,
@@ -252,21 +247,20 @@ def on_request_completed(self, replica_id: ReplicaID, internal_request_id: str):
252247
253248
Called by the router when a request finishes processing.
254249
"""
255-
with self._load_lock:
256-
current_load = self._replica_queue_len_cache.get(replica_id)
257-
if current_load is None:
258-
logger.warning(
259-
f"CHWBL: Attempted to decrement load for {replica_id} but no load info exists"
260-
)
261-
return
250+
current_load = self._replica_queue_len_cache.get(replica_id)
251+
if current_load is None:
252+
logger.warning(
253+
f"CHWBL: Attempted to decrement load for {replica_id} but no load info exists"
254+
)
255+
return
262256

263-
new_load = max(0, current_load - 1) # Ensure non-negative
264-
self._replica_queue_len_cache.update(replica_id, new_load)
257+
new_load = max(0, current_load - 1) # Ensure non-negative
258+
self._replica_queue_len_cache.update(replica_id, new_load)
265259

266-
logger.debug(
267-
f"CHWBL: Decremented load for {replica_id}: "
268-
f"{current_load} -> {new_load}"
269-
)
260+
logger.debug(
261+
f"CHWBL: Decremented load for {replica_id}: "
262+
f"{current_load} -> {new_load}"
263+
)
270264

271265
def _extract_cache_key(self, payload, request_id: str) -> str:
272266
"""Extract cache key from OpenAI-compatible chat completions payload.

0 commit comments

Comments
 (0)