Skip to content

Commit ac07cc6

Browse files
committed
Attempts to fix stranded messages
1 parent f3bf683 commit ac07cc6

File tree

7 files changed

+171
-174
lines changed

7 files changed

+171
-174
lines changed

src/guidellm/scheduler/worker.py

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ def __init__(
7878
startup_barrier: ProcessingBarrier,
7979
shutdown_event: ProcessingEvent,
8080
error_event: ProcessingEvent,
81-
completed_event: ProcessingEvent,
81+
requests_completed_event: ProcessingEvent,
8282
backend: BackendInterface[RequestT, MeasuredRequestTimingsT, ResponseT],
8383
request_timings: ScheduledRequestTimings,
8484
):
@@ -90,7 +90,8 @@ def __init__(
9090
:param startup_barrier: Multiprocessing barrier for coordinated startup
9191
:param shutdown_event: Event for signaling graceful shutdown
9292
:param error_event: Event for signaling error conditions across processes
93-
:param completed_event: Event for signaling when this worker has completed
93+
:param requests_completed_event: Event for signaling when the main process
94+
has stopped sending requests / all requests are added to the queue
9495
:param backend: Backend instance for processing requests
9596
:param request_timings: Timing strategy for request scheduling
9697
"""
@@ -99,7 +100,7 @@ def __init__(
99100
self.startup_barrier = startup_barrier
100101
self.shutdown_event = shutdown_event
101102
self.error_event = error_event
102-
self.completed_event = completed_event
103+
self.requests_completed_event = requests_completed_event
103104
self.backend = backend
104105
self.request_timings = request_timings
105106
self.startup_completed = False
@@ -126,8 +127,6 @@ def run(self):
126127
f"Worker process {self.messaging.worker_index} encountered an "
127128
f"error: {err}"
128129
) from err
129-
finally:
130-
self.completed_event.set()
131130

132131
async def run_async(self):
133132
"""
@@ -212,11 +211,10 @@ async def _run_async_requests_processing(self):
212211
await self.backend.validate()
213212

214213
# Get messaging system ready
215-
processing_cancelled = ThreadingEvent()
216214
all_requests_processed = ThreadingEvent()
217215
await self.messaging.start(
218216
send_stop_criteria=[all_requests_processed],
219-
receive_stop_criteria=[processing_cancelled],
217+
receive_stop_criteria=[self.requests_completed_event, self.error_event],
220218
pydantic_models=list(
221219
SchedulerMessagingPydanticRegistry.registry.values()
222220
),
@@ -255,7 +253,6 @@ def _task_done(task):
255253
pending_tasks.add(request_task)
256254
request_task.add_done_callback(_task_done)
257255
except (asyncio.CancelledError, Exception) as err:
258-
processing_cancelled.set()
259256
await self._cancel_remaining_requests(pending_tasks, all_requests_processed)
260257
await self.messaging.stop()
261258
await self.backend.process_shutdown()
@@ -323,27 +320,17 @@ def _send_update(
323320
prev_status = request_info.status
324321

325322
try:
326-
if (new_status == "in_progress" and prev_status != "in_progress") or (
327-
new_status != "in_progress" and prev_status == "pending"
328-
):
329-
request_info.status = "in_progress"
330-
self.messaging.put_sync(
331-
(None, request, request_info.model_copy()),
332-
timeout=-1,
333-
)
334-
prev_status = new_status
335-
336-
if prev_status == "in_progress" and new_status in {
337-
"completed",
338-
"errored",
339-
"cancelled",
340-
}:
341-
request_info.status = new_status
342-
self.messaging.put_sync(
343-
(response, request, request_info), # last update, no copy
344-
timeout=-1,
345-
)
346-
prev_status = new_status
323+
request_info.status = new_status
324+
request_info = (
325+
request_info.model_copy()
326+
if new_status not in {"completed", "errored", "cancelled"}
327+
else request_info # last update, don't need to copy
328+
)
329+
self.messaging.put_sync(
330+
(response, request, request_info),
331+
timeout=-1,
332+
)
333+
prev_status = new_status
347334
except Exception as exc:
348335
# Reset status to last one that succeeded or started function with
349336
# Calling logic can retry after handling error, if possible

src/guidellm/scheduler/worker_group.py

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def __init__(
120120
self.mp_context = None
121121
self.mp_manager = None
122122
self.processes: list[BaseProcess] = None
123-
self.processes_completed_events: list[Event] = None
123+
self.requests_completed_event: Event = None
124124
self.startup_barrier: Barrier = None
125125
self.shutdown_event: Event = None
126126
self.error_event: Event = None
@@ -176,8 +176,11 @@ async def create_processes(self):
176176
raise RuntimeError("num_processes resolved to 0; increase limits/config")
177177

178178
per_proc_max_conc = max_conc // num_processes
179-
per_proc_max_receive_buffer = max(
180-
1, math.floor(per_proc_max_conc * settings.mp_proc_receive_buffer_per)
179+
max_pending_size = max(
180+
1, math.floor(max_conc * settings.mp_max_pending_buffer_percent)
181+
)
182+
per_proc_max_buffer_size = max(
183+
1, math.floor(per_proc_max_conc * settings.mp_max_worker_buffer_percent)
181184
)
182185

183186
# Initialize multiprocessing components
@@ -186,12 +189,13 @@ async def create_processes(self):
186189
self.startup_barrier = self.mp_context.Barrier(num_processes + 1)
187190
self.shutdown_event = self.mp_context.Event()
188191
self.error_event = self.mp_context.Event()
192+
self.requests_completed_event = self.mp_context.Event()
189193

190194
if settings.mp_messaging_object == "queue":
191195
self.messaging = InterProcessMessagingQueue(
192196
serialization=settings.mp_serialization,
193197
encoding=settings.mp_encoding,
194-
max_send_size=max_conc,
198+
max_pending_size=max_pending_size,
195199
max_buffer_send_size=settings.mp_requests_send_buffer_size,
196200
poll_interval=settings.mp_poll_interval,
197201
)
@@ -200,7 +204,7 @@ async def create_processes(self):
200204
manager=self.mp_manager,
201205
serialization=settings.mp_serialization,
202206
encoding=settings.mp_encoding,
203-
max_send_size=max_conc,
207+
max_pending_size=max_pending_size,
204208
max_buffer_send_size=settings.mp_requests_send_buffer_size,
205209
poll_interval=settings.mp_poll_interval,
206210
)
@@ -209,32 +213,30 @@ async def create_processes(self):
209213
num_workers=num_processes,
210214
serialization=settings.mp_serialization,
211215
encoding=settings.mp_encoding,
212-
max_send_size=max_conc,
216+
max_pending_size=max_pending_size,
213217
max_buffer_send_size=settings.mp_requests_send_buffer_size,
214218
poll_interval=settings.mp_poll_interval,
215219
)
216220

217221
# Initialize worker processes
218222
self.processes = []
219-
self.processes_completed_events = []
220223
for rank in range(num_processes):
221224
# Distribute any remainder across the first N ranks
222225
async_limit = per_proc_max_conc + (
223226
1 if rank < (max_conc % num_processes) else 0
224227
)
225228

226-
worker_completed_event = self.mp_context.Event()
227229
worker = WorkerProcess[RequestT, MeasuredRequestTimingsT, ResponseT](
228230
messaging=self.messaging.create_worker_copy(
229231
worker_index=rank,
230232
max_buffer_send_size=None,
231-
max_buffer_receive_size=per_proc_max_receive_buffer,
233+
max_buffer_receive_size=per_proc_max_buffer_size,
232234
),
233235
async_limit=async_limit,
234236
startup_barrier=self.startup_barrier,
235237
shutdown_event=self.shutdown_event,
236238
error_event=self.error_event,
237-
completed_event=worker_completed_event,
239+
requests_completed_event=self.requests_completed_event,
238240
backend=self.backend,
239241
request_timings=self.strategy.create_request_timings(
240242
local_rank=rank,
@@ -245,7 +247,6 @@ async def create_processes(self):
245247
proc = self.mp_context.Process(target=worker.run, daemon=False)
246248
proc.start()
247249
self.processes.append(proc)
248-
self.processes_completed_events.append(worker_completed_event)
249250

250251
reason, _ = await synchronous_to_exitable_async(
251252
synchronous=None,
@@ -279,7 +280,7 @@ async def start(self, start_time: float):
279280
self._state = _WorkerGroupState[RequestT, MeasuredRequestTimingsT, ResponseT](
280281
start_time=start_time,
281282
num_processes=len(self.processes),
282-
processes_completed_events=self.processes_completed_events,
283+
processes=self.processes,
283284
constraints=self.constraints,
284285
shutdown_event=self.shutdown_event,
285286
)
@@ -289,6 +290,7 @@ async def start(self, start_time: float):
289290
),
290291
receive_callback=self._state.update_callback_receive,
291292
send_stop_criteria=[self.shutdown_event, self.error_event],
293+
send_stopped_event=self.requests_completed_event,
292294
receive_stop_criteria=[self.error_event, self._state.stop_callback_receive],
293295
pydantic_models=list(SchedulerMessagingPydanticRegistry.registry.values()),
294296
)
@@ -408,7 +410,7 @@ def __init__(
408410
self,
409411
start_time: float,
410412
num_processes: int,
411-
processes_completed_events: list[Event],
413+
processes: list[BaseProcess],
412414
constraints: dict[str, Constraint],
413415
shutdown_event: Event,
414416
):
@@ -419,7 +421,7 @@ def __init__(
419421
num_processes=num_processes,
420422
start_time=start_time,
421423
)
422-
self.processes_completed_events = processes_completed_events
424+
self.processes = processes
423425
self._constraints = constraints
424426
self._internal_constraints: dict[str, Constraint] = {}
425427
self._shutdown_event = shutdown_event
@@ -544,7 +546,7 @@ def stop_callback_receive(
544546
and messaging.send_stopped_event.is_set() # No more requests will be added
545547
and self._shutdown_event.is_set() # processing should stop
546548
and all(
547-
event.is_set() for event in self.processes_completed_events
549+
not proc.is_alive() for proc in self.processes
548550
) # no more updates will be added by workers
549551
)
550552

@@ -601,21 +603,19 @@ def _update_new_request(self):
601603
self._state.queued_requests += 1
602604

603605
def _update_new_response(self, info: ScheduledRequestInfo[MeasuredRequestTimingsT]):
604-
if info.status == "in_progress":
606+
if info.status == "in_progress" or (
607+
info.status == "cancelled" and info.scheduler_timings.resolve_start is None
608+
# Cancelled request that never sent a progress update
609+
):
605610
self._state.queued_requests -= 1
606611
self._state.processing_requests += 1
607-
elif info.status in ("completed", "errored", "cancelled"):
612+
613+
if info.status in ("completed", "errored", "cancelled"):
608614
self._state.processing_requests -= 1
609615
self._state.processed_requests += 1
610616
self._state.successful_requests += 1 if info.status == "completed" else 0
611617
self._state.errored_requests += 1 if info.status == "errored" else 0
612618
self._state.cancelled_requests += 1 if info.status == "cancelled" else 0
613-
else:
614-
raise ValueError(
615-
f"Unknown request status: {info.status}. "
616-
"Supported statuses are: queued, pending, in_progress, "
617-
"completed, errored, cancelled."
618-
)
619619

620620
def _update_with_constraints(
621621
self, info: ScheduledRequestInfo[MeasuredRequestTimingsT]

src/guidellm/settings.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,8 @@ class Settings(BaseSettings):
140140
mp_messaging_object: Literal["queue", "manager_queue", "pipe"] = "queue"
141141
mp_requests_send_buffer_size: int = 1
142142
mp_poll_interval: float = 0.1
143-
mp_proc_receive_buffer_per: float = 0.1
143+
mp_max_pending_buffer_percent: float = 0.5
144+
mp_max_worker_buffer_percent: float = 0.2
144145
max_concurrency: int = 512
145146
max_worker_processes: int = 10
146147
scheduler_start_delay_non_distributed: float = 0.1

0 commit comments

Comments
 (0)