Skip to content

Commit be15fe3

Browse files
committed
fixes and updates for initial core PR for utils that has been posted
1 parent 8e0e5ea commit be15fe3

File tree

10 files changed

+1940
-705
lines changed

10 files changed

+1940
-705
lines changed

src/guidellm/benchmark/aggregator.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
runtime_checkable,
3535
)
3636

37-
import numpy as np
3837
from pydantic import Field, PrivateAttr
3938

4039
from guidellm.backend import (
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import contextlib
5+
import math
6+
import queue
7+
import threading
8+
import time
9+
import uuid
10+
from asyncio import Task
11+
from collections.abc import AsyncIterator, Iterable, Iterator
12+
from multiprocessing import Queue, get_context
13+
from multiprocessing.process import BaseProcess
14+
from multiprocessing.synchronize import Barrier, Event
15+
from threading import Event as ThreadingEvent
16+
from typing import Any, Generic, TypeVar, Literal
17+
from multiprocessing.synchronize import Event as ProcessingEvent
18+
19+
import culsans
20+
21+
from guidellm.config import settings
22+
from guidellm.scheduler.constraints import Constraint
23+
from guidellm.scheduler.objects import (
24+
BackendInterface,
25+
MeasuredRequestTimingsT,
26+
MultiTurnRequestT,
27+
RequestT,
28+
ResponseT,
29+
ScheduledRequestInfo,
30+
SchedulerState,
31+
)
32+
from guidellm.scheduler.strategy import SchedulingStrategy
33+
from guidellm.scheduler.worker import WorkerProcess
34+
from guidellm.utils import MsgpackEncoding, synchronous_to_exitable_async
35+
36+
37+
__all__ = [
38+
"WorkerQueueProxy",
39+
]
40+
41+
42+
MessageT = TypeVar("MessageT", bound=Any)
43+
44+
45+
class WorkerQueueProxy(Generic[MessageT]):
46+
def __init__(
47+
self,
48+
mp_queue: Queue[MessageT],
49+
usage: Literal["producer", "consumer"],
50+
stopped_event: ThreadingEvent | ProcessingEvent | None = None,
51+
stop_events: list[ThreadingEvent | ProcessingEvent | None] = None,
52+
on_stop_event: Literal["continue", "stop", "error"] = "stop",
53+
on_queue_empty: Literal["continue", "stop", "stop_if_event", "error"] = "stop",
54+
on_queue_full: Literal["continue", "stop", "stop_if_event", "error"] = "stop",
55+
on_queue_shutdown: Literal[
56+
"continue", "stop", "stop_if_event", "error"
57+
] = "stop",
58+
poll_interval: float = 0.1,
59+
):
60+
self.mp_queue = mp_queue
61+
self.usage = usage
62+
self.stopped_event = stopped_event
63+
self.stop_events = stop_events
64+
self.on_stop_event = on_stop_event
65+
self.on_queue_empty = on_queue_empty
66+
self.on_queue_full = on_queue_full
67+
self.on_queue_shutdown = on_queue_shutdown
68+
self.poll_interval = poll_interval
69+
70+
self.local_queue: culsans.Queue[MessageT] = culsans.Queue()
71+
self.running = False
72+
73+
async def run(self):
74+
self.running = True
75+
func = (
76+
self._producer_generator
77+
if self.usage == "producer"
78+
else self._consumer_generator
79+
)
80+
await synchronous_to_exitable_async(synchronous=func(), poll_interval=0.0)
81+
self.running = False
82+
83+
def sync_put(
84+
self, item: MessageT, block: bool = True, timeout: float | None = None
85+
):
86+
if self.usage != "producer":
87+
raise ValueError("WorkerQueueProxy is not a producer")
88+
89+
self.local_queue.sync_put(item, block=block, timeout=timeout)
90+
91+
def sync_put_nowait(self, item: MessageT):
92+
if self.usage != "producer":
93+
raise ValueError("WorkerQueueProxy is not a producer")
94+
95+
self.local_queue.put_nowait(item)
96+
97+
async def async_put(self, item: MessageT, timeout: float | None = None):
98+
if self.usage != "producer":
99+
raise ValueError("WorkerQueueProxy is not a producer")
100+
101+
await asyncio.wait_for(self.local_queue.async_put(item), timeout)
102+
103+
def sync_get(self, block: bool = True, timeout: float | None = None) -> MessageT:
104+
if self.usage != "consumer":
105+
raise ValueError("WorkerQueueProxy is not a consumer")
106+
107+
return self.local_queue.sync_get(block=block, timeout=timeout)
108+
109+
def sync_get_nowait(self) -> MessageT:
110+
if self.usage != "consumer":
111+
raise ValueError("WorkerQueueProxy is not a consumer")
112+
113+
return self.local_queue.get_nowait()
114+
115+
async def async_get(self, timeout: float | None = None) -> MessageT:
116+
if self.usage != "consumer":
117+
raise ValueError("WorkerQueueProxy is not a consumer")
118+
119+
return await asyncio.wait_for(self.local_queue.async_get(), timeout)
120+
121+
def _producer_generator(self):
122+
last_yield_time = time.time()
123+
124+
while True:
125+
stop_set = (
126+
any(event.is_set() for event in self.stop_events)
127+
if self.stop_events
128+
else False
129+
)
130+
131+
if stop_set and self.on_stop_event == "stop":
132+
break
133+
134+
if stop_set and self.on_stop_event == "error":
135+
raise RuntimeError(
136+
"WorkerQueueProxy stop event set unexpectedly "
137+
"(on_stop_event==error)"
138+
)
139+
140+
if self.on_stop_event != "continue" and any(
141+
event.is_set() for event in self.stop_events
142+
):
143+
if self.on_stop_event == "stop":
144+
break
145+
if self.on_stop_event == "error":
146+
raise RuntimeError(
147+
"WorkerQueueProxy stop event set unexpectedly "
148+
"(on_stop_event==error)"
149+
)
150+
151+
def _consumer_generator(self):
152+
pass

src/guidellm/utils/auto_importer.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -9,56 +9,54 @@
99
The AutoImporterMixin can be combined with registration mechanisms to create
1010
extensible systems where new implementations are automatically discovered and
1111
registered when they are placed in the correct package structure.
12-
13-
Classes:
14-
- AutoImporterMixin: A mixin class that provides functionality to automatically
15-
import all modules within a specified package or list of packa
1612
"""
1713

14+
from __future__ import annotations
15+
1816
import importlib
1917
import pkgutil
2018
import sys
21-
from typing import ClassVar, Optional, Union
19+
from typing import ClassVar
2220

2321
__all__ = ["AutoImporterMixin"]
2422

2523

2624
class AutoImporterMixin:
2725
"""
28-
A mixin class that provides functionality to automatically import all modules
29-
within a specified package or list of packages.
30-
31-
This mixin is designed to be used with class registration mechanisms to enable
32-
automatic discovery and registration of classes without explicit imports. When
33-
a class inherits from AutoImporterMixin, it can define the package(s) to scan
34-
for modules by setting the `auto_package` class variable.
35-
36-
Usage Example:
37-
```python
38-
from speculators.utils import AutoImporterMixin
39-
class MyRegistry(AutoImporterMixin):
40-
auto_package = "my_package.implementations"
41-
42-
MyRegistry.auto_import_package_modules()
43-
```
44-
45-
:cvar auto_package: The package name or tuple of names to import modules from.
46-
:cvar auto_ignore_modules: Optional tuple of module names to ignore during import.
47-
:cvar auto_imported_modules: List tracking which modules have been imported.
26+
Mixin class for automatic module importing within packages.
27+
28+
This mixin enables dynamic discovery of classes and implementations without
29+
explicit imports by automatically importing all modules within specified
30+
packages. It is designed for use with class registration mechanisms to enable
31+
automatic discovery and registration of classes when they are placed in the
32+
correct package structure.
33+
34+
Example:
35+
::
36+
from guidellm.utils import AutoImporterMixin
37+
38+
class MyRegistry(AutoImporterMixin):
39+
auto_package = "my_package.implementations"
40+
41+
MyRegistry.auto_import_package_modules()
42+
43+
:cvar auto_package: Package name or tuple of package names to import modules from
44+
:cvar auto_ignore_modules: Module names to ignore during import
45+
:cvar auto_imported_modules: List tracking which modules have been imported
4846
"""
4947

50-
auto_package: ClassVar[Optional[Union[str, tuple[str, ...]]]] = None
51-
auto_ignore_modules: ClassVar[Optional[tuple[str, ...]]] = None
52-
auto_imported_modules: ClassVar[Optional[list]] = None
48+
auto_package: ClassVar[str | tuple[str, ...] | None] = None
49+
auto_ignore_modules: ClassVar[tuple[str, ...] | None] = None
50+
auto_imported_modules: ClassVar[list[str] | None] = None
5351

5452
@classmethod
55-
def auto_import_package_modules(cls):
53+
def auto_import_package_modules(cls) -> None:
5654
"""
57-
Automatically imports all modules within the specified package(s).
55+
Automatically import all modules within the specified package(s).
5856
59-
This method scans the package(s) defined in the `auto_package` class variable
60-
and imports all modules found, tracking them in `auto_imported_modules`. It
61-
skips packages (directories) and any modules listed in `auto_ignore_modules`.
57+
Scans the package(s) defined in the `auto_package` class variable and imports
58+
all modules found, tracking them in `auto_imported_modules`. Skips packages
59+
(directories) and any modules listed in `auto_ignore_modules`.
6260
6361
:raises ValueError: If the `auto_package` class variable is not set
6462
"""

0 commit comments

Comments
 (0)