Skip to content

Commit 4b6e9f8

Browse files
committed
Initial state for Scheduling system rework. Still needs testing and further integration / refactors.
1 parent e3770c1 commit 4b6e9f8

File tree

10 files changed

+1171
-555
lines changed

10 files changed

+1171
-555
lines changed

src/guidellm/backend/openai.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -410,6 +410,8 @@ async def _iterative_completions_request(
410410

411411
yield StreamingTextResponse(
412412
type_="start",
413+
value="",
414+
start_time=start_time,
413415
iter_count=iter_count,
414416
delta="",
415417
time=start_time,
@@ -443,6 +445,8 @@ async def _iterative_completions_request(
443445

444446
yield StreamingTextResponse(
445447
type_="iter",
448+
value=response_value,
449+
start_time=start_time,
446450
iter_count=iter_count,
447451
delta=delta,
448452
time=iter_time,

src/guidellm/backend/response.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ class StreamingTextResponse(BaseModel):
2121
A model representing the response content for a streaming text request.
2222
2323
:param type_: The type of the response; either 'start' or 'iter'.
24+
:param value: The value of the response up to this iteration.
25+
:param start_time: The time.time() the request started.
2426
:param iter_count: The iteration count for the response. For 'start' this is 0
2527
and for the first 'iter' it is 1.
2628
:param delta: The text delta added to the response for this stream iteration.
@@ -30,6 +32,8 @@ class StreamingTextResponse(BaseModel):
3032
"""
3133

3234
type_: StreamingResponseType
35+
value: str
36+
start_time: float
3337
iter_count: int
3438
delta: str
3539
time: float
@@ -69,6 +73,7 @@ class ResponseSummary(BaseModel):
6973
:param prompt_tokens: The number of tokens in the prompt, if any usage was returned.
7074
:param output_tokens: The number of tokens in the output, if any usage was returned.
7175
:param request_id: The unique identifier for the request, if any.
76+
:param error: The error message, if any, returned from making the request.
7277
"""
7378

7479
value: str
@@ -81,6 +86,7 @@ class ResponseSummary(BaseModel):
8186
response_prompt_tokens: Optional[int] = None
8287
response_output_tokens: Optional[int] = None
8388
request_id: Optional[str] = None
89+
error: Optional[str] = None
8490

8591
@computed_field # type: ignore[misc]
8692
@property

src/guidellm/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ class Settings(BaseSettings):
142142
request_timeout: int = 60 * 5 # 5 minutes
143143
request_http2: bool = True
144144
max_concurrency: int = 512
145+
max_worker_processes: int = 10
145146
num_sweep_profiles: int = 9
146147
logging: LoggingSettings = LoggingSettings()
147148

src/guidellm/executor/executor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
ProfileGenerator,
1212
)
1313
from guidellm.request import RequestGenerator
14-
from guidellm.scheduler import Scheduler, SchedulerResult
14+
from guidellm.scheduler import Scheduler
1515

1616
__all__ = ["Executor", "ExecutorResult"]
1717

@@ -38,7 +38,7 @@ class ExecutorResult:
3838
count_completed: int
3939
generation_modes: Sequence[ProfileGenerationMode]
4040
report: TextGenerationBenchmarkReport
41-
scheduler_result: Optional[SchedulerResult] = None
41+
scheduler_result = None
4242
current_index: Optional[int] = None
4343
current_profile: Optional[Profile] = None
4444

src/guidellm/scheduler/__init__.py

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,34 @@
1-
from .load_generator import LoadGenerationMode, LoadGenerator
2-
from .scheduler import Scheduler, SchedulerResult
1+
from .backend_worker import BackendRequestsWorker, GenerationRequest
2+
from .scheduler import (
3+
RequestsWorker,
4+
Scheduler,
5+
SchedulerRequestInfo,
6+
SchedulerResult,
7+
SchedulerRunInfo,
8+
)
9+
from .strategy import (
10+
AsyncConstantStrategy,
11+
AsyncPoissonStrategy,
12+
ConcurrentStrategy,
13+
SchedulingStrategy,
14+
StrategyType,
15+
SynchronousStrategy,
16+
ThroughputStrategy,
17+
)
318

4-
__all__ = ["LoadGenerationMode", "LoadGenerator", "Scheduler", "SchedulerResult"]
19+
__all__ = [
20+
"GenerationRequest",
21+
"BackendRequestsWorker",
22+
"Scheduler",
23+
"SchedulerResult",
24+
"SchedulerRunInfo",
25+
"SchedulerRequestInfo",
26+
"RequestsWorker",
27+
"StrategyType",
28+
"SchedulingStrategy",
29+
"SynchronousStrategy",
30+
"ThroughputStrategy",
31+
"ConcurrentStrategy",
32+
"AsyncConstantStrategy",
33+
"AsyncPoissonStrategy",
34+
]
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
import asyncio
2+
import math
3+
import time
4+
import uuid
5+
from typing import (
6+
Any,
7+
AsyncGenerator,
8+
Dict,
9+
Literal,
10+
Optional,
11+
Tuple,
12+
Union,
13+
)
14+
15+
from pydantic import BaseModel, Field
16+
17+
from guidellm.backend import (
18+
Backend,
19+
RequestArgs,
20+
ResponseSummary,
21+
StreamingTextResponse,
22+
)
23+
from guidellm.scheduler.scheduler import RequestsWorker
24+
25+
__all__ = ["GenerationRequest", "BackendRequestsWorker"]
26+
27+
28+
class GenerationRequest(BaseModel):
29+
"""
30+
A class representing a request for generation.
31+
This class is used to encapsulate the details of a generation request,
32+
including the request ID, type, content, parameters, statistics, and constraints.
33+
It is designed to be used with the BackendRequestsWorker class to handle
34+
the generation process.
35+
36+
:param request_id: The unique identifier for the request.
37+
:param request_type: The type of request (e.g., text, chat).
38+
:param content: The content for the request to send to the backend.
39+
If request_type is 'text', this should be a string or list of strings
40+
which will be resolved by backend.text_completions.
41+
If request_type is 'chat', this should be a string,
42+
a list of (str, Dict[str, Union[str, Dict[str, str]], Path, Image]),
43+
or Any raw content which will be resolved by backend.chat_completions.
44+
If raw content, raw_content=True must be passed in the params.
45+
:param params: Additional parameters for the request passed in as kwargs.
46+
For an http backend, these are passed into the body of the request.
47+
:param stats: Statistics for the request, such as the number of prompt tokens.
48+
Used for tracking and reporting purposes.
49+
:param constraints: Constraints for the request, such as the maximum number
50+
of output tokens. Used for controlling the behavior of the backend.
51+
"""
52+
53+
request_id: Optional[str] = Field(
54+
default_factory=lambda: str(uuid.uuid4()),
55+
description="The unique identifier for the request.",
56+
)
57+
request_type: Literal["text", "chat"] = Field(
58+
default="text",
59+
description=(
60+
"The type of request (e.g., text, chat). "
61+
"If request_type is 'text', resolved by backend.text_completions. "
62+
"If request_type is 'chat', resolved by backend.chat_completions."
63+
),
64+
)
65+
content: Any = Field(
66+
description=(
67+
"The content for the request to send to the backend. "
68+
"If request_type is 'text', this should be a string or list of strings "
69+
"which will be resolved by backend.text_completions. "
70+
"If request_type is 'chat', this should be a string, "
71+
"a list of (str, Dict[str, Union[str, Dict[str, str]], Path, Image]), "
72+
"or Any raw content which will be resolved by backend.chat_completions. "
73+
"If raw content, raw_content=True must be passed in the params."
74+
)
75+
)
76+
params: Dict[str, Any] = Field(
77+
default_factory=dict,
78+
description=(
79+
"Additional parameters for the request that will be passed in as kwargs. "
80+
"For an http backend, these are passed into the body of the request. "
81+
),
82+
)
83+
stats: Dict[Literal["prompt_tokens"], int] = Field(
84+
default_factory=dict,
85+
description=(
86+
"Statistics for the request, such as the number of prompt tokens. "
87+
"Used for tracking and reporting purposes."
88+
),
89+
)
90+
constraints: Dict[Literal["output_tokens"], int] = Field(
91+
default_factory=dict,
92+
description=(
93+
"Constraints for the request, such as the maximum number of output tokens. "
94+
"Used for controlling the behavior of the backend."
95+
),
96+
)
97+
98+
99+
class BackendRequestsWorker(RequestsWorker):
100+
"""
101+
A class that handles the execution of requests using a backend.
102+
This class is responsible for sending requests to the backend,
103+
handling responses, and managing errors.
104+
105+
:param backend: The backend to use for handling requests.
106+
This should be an instance of Backend such as an OpenAIHTTPBackend.
107+
"""
108+
109+
def __init__(self, backend: Backend):
110+
self.backend = backend
111+
112+
async def resolve(
113+
self,
114+
request: GenerationRequest,
115+
start_time: float,
116+
timeout_time: float,
117+
) -> ResponseSummary:
118+
"""
119+
Resolve a request by sending it to the backend and handling the response.
120+
This method sends the request to the backend, waits for a response,
121+
and handles any errors that may occur during the process.
122+
123+
:param request: The request to resolve.
124+
:param start_time: The time to start the request.
125+
:param timeout_time: The time to wait for a response before timing out.
126+
If timeout_time is math.inf, the request will not timeout.
127+
:return: A ResponseSummary object containing the response from the backend.
128+
If an error occurs, the ResponseSummary will contain the error message.
129+
"""
130+
response = None
131+
error: Optional[str] = None
132+
133+
try:
134+
request_func, request_kwargs = self._create_request_func_kwargs(request)
135+
136+
async def _runner():
137+
# wrap function so we can enforce timeout and
138+
# still return the latest state from the backend
139+
async for resp in request_func(**request_kwargs):
140+
nonlocal response
141+
response = resp
142+
143+
if (wait_time := start_time - time.time()) > 0:
144+
await asyncio.sleep(wait_time)
145+
146+
start_time = time.time()
147+
await asyncio.wait_for(
148+
_runner(),
149+
timeout=timeout_time - time.time() if timeout_time < math.inf else None,
150+
)
151+
152+
if not response:
153+
raise ValueError(
154+
f"No response received for request: {request} "
155+
f"and backend: {self.backend}"
156+
)
157+
if not isinstance(response, ResponseSummary):
158+
raise ValueError(
159+
f"Received no ResponseSummary for request: {request} "
160+
f"and backend: {self.backend}, received: {response}"
161+
)
162+
except asyncio.TimeoutError as texc:
163+
error = str(texc)
164+
except Exception as exc: # noqa: BLE001
165+
error = str(exc)
166+
167+
return self._handle_response(request, response, error, start_time)
168+
169+
def _create_request_func_kwargs(
170+
self,
171+
request: GenerationRequest,
172+
) -> Tuple[
173+
AsyncGenerator[Union[StreamingTextResponse, ResponseSummary], None],
174+
Dict[str, Any],
175+
]:
176+
request_func: AsyncGenerator[
177+
Union[StreamingTextResponse, ResponseSummary], None
178+
]
179+
request_kwargs: Dict[str, Any]
180+
181+
if request.request_type == "text":
182+
request_func = self.backend.text_completions
183+
request_kwargs = {
184+
"prompt": request.content,
185+
"request_id": request.request_id,
186+
"prompt_token_count": request.stats.get("prompt_tokens", None),
187+
"output_token_count": request.constraints.get("output_tokens", None),
188+
**request.params,
189+
}
190+
elif request.request_type == "chat":
191+
request_func = self.backend.chat_completions
192+
request_kwargs = {
193+
"content": request.content,
194+
"request_id": request.request_id,
195+
"prompt_token_count": request.stats.get("prompt_tokens", None),
196+
"output_token_count": request.constraints.get("output_tokens", None),
197+
**request.params,
198+
}
199+
else:
200+
raise ValueError(
201+
f"Invalid request type: {request.request_type} for {request}"
202+
)
203+
204+
return request_func, request_kwargs
205+
206+
def _handle_response(
207+
self,
208+
request: GenerationRequest,
209+
response: Any,
210+
error: Optional[str],
211+
start_time: float,
212+
) -> ResponseSummary:
213+
if response is None or not isinstance(
214+
response, (ResponseSummary, StreamingTextResponse)
215+
):
216+
# nothing received or invalid response, fill in defaults for error
217+
if response:
218+
error = str(
219+
ValueError(
220+
f"Invalid response: {type(response)} for request: {request}; "
221+
)
222+
) + (error or "")
223+
224+
return ResponseSummary(
225+
value="",
226+
request_args=RequestArgs(
227+
target=self.backend.target,
228+
headers={},
229+
payload={},
230+
),
231+
start_time=start_time,
232+
end_time=time.time(),
233+
request_id=request.request_id,
234+
error=error or "Unknown error",
235+
)
236+
237+
if isinstance(response, StreamingTextResponse):
238+
return ResponseSummary(
239+
value=response.value,
240+
request_args=RequestArgs(
241+
target=self.backend.target,
242+
headers={},
243+
payload={},
244+
),
245+
start_time=response.start_time,
246+
end_time=time.time(),
247+
request_prompt_tokens=request.stats.get("prompt_tokens", None),
248+
request_output_tokens=None,
249+
response_prompt_tokens=None,
250+
response_output_tokens=response.iter_count,
251+
request_id=request.request_id,
252+
error=error or "Unknown error",
253+
)
254+
255+
response.error = error
256+
257+
return response

0 commit comments

Comments
 (0)