Skip to content

Commit 883593a

Browse files
author
markvaykhansky
committed
wip
1 parent 464ebe3 commit 883593a

File tree

5 files changed

+168
-18
lines changed

5 files changed

+168
-18
lines changed

src/guidellm/backend/openai.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -572,12 +572,12 @@ async def _iterative_completions_request(
572572

573573
async for line in stream.aiter_lines():
574574
iter_time = time.time()
575-
logger.debug(
576-
"{} request: {} recieved iter response line: {}",
577-
self.__class__.__name__,
578-
request_id,
579-
line,
580-
)
575+
# logger.debug(
576+
# "{} request: {} recieved iter response line: {}",
577+
# self.__class__.__name__,
578+
# request_id,
579+
# line,
580+
# )
581581

582582
if not line or not line.strip().startswith("data:"):
583583
continue

src/guidellm/objects/pydantic.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ class StandardBaseModel(BaseModel):
2121

2222
def __init__(self, /, **data: Any) -> None:
2323
super().__init__(**data)
24-
logger.debug(
25-
"Initialized new instance of {} with data: {}",
26-
self.__class__.__name__,
27-
data,
28-
)
24+
# logger.debug(
25+
# "Initialized new instance of {} with data: {}",
26+
# self.__class__.__name__,
27+
# data,
28+
# )
2929

3030

3131
SuccessfulT = TypeVar("SuccessfulT")

src/guidellm/scheduler/repro.py

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
import asyncio
2+
import multiprocessing
3+
import time
4+
import logging
5+
import threading
6+
7+
# Configure logging
8+
logging.basicConfig(
9+
level=logging.INFO,
10+
format='%(asctime)s - %(levelname)s - [%(threadName)s] - %(message)s',
11+
datefmt='%H:%M:%S'
12+
)
13+
14+
# A multiprocessing queue that will remain empty
15+
# Naming it mp_queue to distinguish from asyncio.Queue
16+
mp_queue = multiprocessing.Queue()
17+
18+
19+
async def get_item_from_mp_queue(q: multiprocessing.Queue, worker_id: int):
20+
"""
21+
Coroutine that tries to get an item from a multiprocessing.Queue
22+
using asyncio.to_thread.
23+
"""
24+
logging.info(f"Worker {worker_id}: get_item_from_mp_queue: ENTERED. Awaiting asyncio.to_thread(q.get).")
25+
try:
26+
# This is the blocking call in a separate thread
27+
item = await asyncio.to_thread(q.get)
28+
# We don't expect this to be reached if the queue is empty
29+
logging.info(
30+
f"Worker {worker_id}: get_item_from_mp_queue: asyncio.to_thread RETURNED NORMALLY with item: {item}.")
31+
return item
32+
except asyncio.CancelledError:
33+
# This is where it SHOULD go if the task awaiting this coroutine is cancelled,
34+
# and asyncio.to_thread correctly propagates the cancellation to its awaiter.
35+
logging.error(
36+
f"Worker {worker_id}: get_item_from_mp_queue: CAUGHT CancelledError from asyncio.to_thread directly!")
37+
raise # Re-raise to propagate the cancellation
38+
except Exception as e:
39+
logging.error(f"Worker {worker_id}: get_item_from_mp_queue: CAUGHT an UNEXPECTED EXCEPTION {type(e)}: {e}",
40+
exc_info=True)
41+
raise
42+
finally:
43+
# This finally block will execute. The key is whether the CancelledError was caught above.
44+
logging.info(f"Worker {worker_id}: get_item_from_mp_queue: EXITED (finally block).")
45+
46+
47+
async def worker_coroutine(worker_id: int, q: multiprocessing.Queue):
48+
"""
49+
The main coroutine for our worker task. It will try to get an item
50+
from the queue.
51+
"""
52+
logging.info(f"Worker {worker_id}: worker_coroutine: STARTED.")
53+
try:
54+
logging.info(f"Worker {worker_id}: worker_coroutine: About to await get_item_from_mp_queue.")
55+
# This is the await point where CancelledError should be injected
56+
# if this worker_coroutine task is cancelled.
57+
await get_item_from_mp_queue(q, worker_id)
58+
logging.info(f"Worker {worker_id}: worker_coroutine: get_item_from_mp_queue completed (unexpectedly).")
59+
except asyncio.CancelledError:
60+
logging.error(f"Worker {worker_id}: worker_coroutine: SUCCESSFULLY CAUGHT CancelledError.")
61+
# Perform any task-specific cleanup here if needed
62+
except Exception as e:
63+
logging.error(f"Worker {worker_id}: worker_coroutine: CAUGHT UNEXPECTED EXCEPTION {type(e)}: {e}",
64+
exc_info=True)
65+
finally:
66+
logging.info(f"Worker {worker_id}: worker_coroutine: FINISHED (finally block).")
67+
68+
69+
async def main_orchestrator():
70+
"""
71+
Orchestrates the test: creates, runs, and cancels the worker.
72+
"""
73+
logging.info("Main Orchestrator: Starting worker task.")
74+
worker_task = asyncio.create_task(worker_coroutine(1, mp_queue), name="WorkerCoroutine-1")
75+
76+
# Give the worker task a moment to start and block on the queue
77+
logging.info("Main Orchestrator: Sleeping for 1 second to let worker block...")
78+
await asyncio.sleep(1)
79+
80+
logging.info(f"Main Orchestrator: Current active threads: {[t.name for t_ in threading.enumerate()]}...")
81+
82+
# Cancel the worker task
83+
print("Main Orchestrator: Cancelling worker_task...")
84+
worker_task.cancel()
85+
86+
# Wait for the worker task to finish, with a timeout.
87+
# If cancellation works as expected, worker_task should complete (by handling CancelledError)
88+
# well before the timeout.
89+
# If it gets stuck, asyncio.TimeoutError will be raised.
90+
timeout_seconds = 5.0
91+
logging.info(f"Main Orchestrator: Awaiting worker_task with timeout {timeout_seconds}s...")
92+
try:
93+
await asyncio.wait_for(worker_task, timeout=timeout_seconds)
94+
logging.info("Main Orchestrator: worker_task completed WITHOUT timeout.")
95+
except asyncio.TimeoutError:
96+
logging.error(
97+
f"Main Orchestrator: TIMEOUT! worker_task did not finish within {timeout_seconds}s after cancellation.")
98+
logging.error(
99+
f"Main Orchestrator: worker_task.done() = {worker_task.done()}, worker_task.cancelled() = {worker_task.cancelled()}")
100+
# At this point, the thread running mp_queue.get() is likely still blocked.
101+
except asyncio.CancelledError:
102+
# This would happen if main_orchestrator itself was cancelled, not expected here.
103+
logging.error("Main Orchestrator: main_orchestrator itself was cancelled (unexpected).")
104+
except Exception as e:
105+
logging.error(f"Main Orchestrator: An unexpected error occurred while waiting for worker_task: {e}",
106+
exc_info=True)
107+
finally:
108+
logging.info("Main Orchestrator: Test finished.")
109+
# Note: The thread started by asyncio.to_thread for mp_queue.get()
110+
# might still be alive and blocked if q.get() wasn't unblocked.
111+
# It's a daemon thread by default, so it won't prevent program exit.
112+
# To clean it up, one would typically put a sentinel into mp_queue.
113+
# For this test, we are focused on the asyncio task cancellation.
114+
logging.info(
115+
f"Main Orchestrator: Final check: worker_task.done() = {worker_task.done()}, worker_task.cancelled() = {worker_task.cancelled()}")
116+
117+
# Attempt to unblock the queue to allow the thread to exit,
118+
# though the test's focus is on the asyncio cancellation.
119+
try:
120+
mp_queue.put_nowait(None) # Sentinel
121+
logging.info("Main Orchestrator: Put sentinel in mp_queue to unblock thread.")
122+
except Exception:
123+
logging.warning("Main Orchestrator: Could not put sentinel in mp_queue.")
124+
125+
126+
if __name__ == "__main__":
127+
# For multiprocessing queues to work correctly, especially on Windows/macOS
128+
# with 'spawn' or 'forkserver' start methods, it's good practice
129+
# to ensure the queue is created in the main process scope before tasks.
130+
# In this simple script, it's fine.
131+
try:
132+
asyncio.run(main_orchestrator())
133+
except KeyboardInterrupt:
134+
logging.info("Main Orchestrator: Keyboard interrupt received.")
135+
finally:
136+
mp_queue.close()
137+
mp_queue.join_thread() # Ensure queue's feeder thread is joined
138+
logging.info("Main Orchestrator: mp_queue resources released.")

src/guidellm/scheduler/scheduler.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ async def run(
154154
):
155155
# we've exhausted all requests we've wanted to run
156156
# and yielded all responses
157+
logger.info("run_info.completed_requests >= run_info.created_requests")
157158
break
158159

159160
requests_iter = self._add_requests(
@@ -198,7 +199,7 @@ async def run(
198199
run_info=run_info,
199200
)
200201

201-
await self._stop_processes(futures, requests_queue)
202+
await self._stop_processes(futures, shutdown_event, requests_queue)
202203

203204
def _validate_scheduler_params(
204205
self,
@@ -457,10 +458,9 @@ def _check_result_ready(
457458
async def _stop_processes(
458459
self,
459460
futures: list[asyncio.Future],
461+
shutdown_event: MultiprocessingEvent,
460462
requests_queue: multiprocessing.Queue,
461463
):
462-
for _ in futures:
463-
requests_queue.put(None)
464-
464+
shutdown_event.set()
465465
logger.debug("Waiting for futures to shut down")
466466
await asyncio.gather(*futures)

src/guidellm/scheduler/worker.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,10 @@ async def _process_runner():
223223
raise ValueError(f"Invalid process type: {type_}")
224224

225225
shutdown_task = asyncio.create_task(
226-
self._wait_for_shutdown(shutdown_event, shutdown_poll_interval_seconds),
226+
self._wait_for_shutdown(
227+
shutdown_event=shutdown_event,
228+
shutdown_poll_interval=shutdown_poll_interval_seconds
229+
),
227230
name="shutdown_task",
228231
)
229232

@@ -236,7 +239,9 @@ async def _process_runner():
236239
)
237240

238241
for task in pending:
239-
task.cancel()
242+
logger.debug(f"Cancelling task {task.get_name()}")
243+
cancel_result = task.cancel()
244+
logger.debug(f"{'Task is already done or canceled' if not cancel_result else 'sent cancel signal'}")
240245
try:
241246
await task
242247
except asyncio.CancelledError:
@@ -265,6 +270,8 @@ async def _wait_for_shutdown(
265270
while not shutdown_event.is_set():
266271
await asyncio.sleep(shutdown_poll_interval)
267272

273+
logger.debug("Shutdown signal received")
274+
raise ValueError("kaki")
268275
raise asyncio.CancelledError("Shutdown event set, cancelling process loop.")
269276

270277
async def _process_synchronous_requests_loop(
@@ -290,6 +297,9 @@ async def _process_synchronous_requests_loop(
290297
process_id=process_id,
291298
)
292299

300+
logger.debug("Done processing synchronous loop")
301+
302+
293303
async def _process_asynchronous_requests_loop(
294304
self,
295305
requests_queue: multiprocessing.Queue,
@@ -303,6 +313,7 @@ async def _process_asynchronous_requests_loop(
303313
raise ValueError("Async worker called with max_concurrency < 1")
304314

305315
while True:
316+
logger.info("Awaiting request...")
306317
process_request = await self.get_request(
307318
requests_queue=requests_queue,
308319
)
@@ -315,7 +326,6 @@ async def _process_asynchronous_requests_loop(
315326
)
316327

317328
await pending.acquire()
318-
319329
lock_acquired_at = time.time()
320330
logger.debug(
321331
f"Lock acquired Process ID {process_id} ||"
@@ -341,6 +351,8 @@ def _task_done(_: asyncio.Task):
341351
task.add_done_callback(_task_done)
342352
await asyncio.sleep(0) # enable start task immediately
343353

354+
logger.debug("Done processing asynchronous loop")
355+
344356

345357
class GenerativeRequestsWorkerDescription(WorkerDescription):
346358
type_: Literal["generative_requests_worker"] = "generative_requests_worker" # type: ignore[assignment]

0 commit comments

Comments
 (0)