Skip to content

Commit 83b01c0

Browse files
committed
Latest state updates for perf fixes for multiprocessing communication
1 parent be15fe3 commit 83b01c0

22 files changed

+4843
-1513
lines changed

src/guidellm/benchmark/entrypoints.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
NonDistributedEnvironment,
4343
StrategyType,
4444
)
45-
from guidellm.utils import UNSET, Console, InfoMixin
45+
from guidellm.utils import Console, InfoMixin
4646

4747
__all__ = [
4848
"benchmark_generative_text",
@@ -103,8 +103,8 @@ async def benchmark_generative_text( # noqa: C901
103103
print_updates: bool = False,
104104
# Aggregators configuration
105105
add_aggregators: (
106-
dict[str, str | dict[str, Any] | Aggregator | CompilableAggregator]
107-
) = UNSET,
106+
dict[str, str | dict[str, Any] | Aggregator | CompilableAggregator] | None
107+
) = None,
108108
warmup: float | None = None,
109109
cooldown: float | None = None,
110110
request_samples: int | None = 20,
@@ -209,7 +209,7 @@ async def benchmark_generative_text( # noqa: C901
209209
)
210210
elif constraints:
211211
raise ValueError(
212-
"Constraints must be empty or unset when providing a Profile instance. "
212+
"Constraints must be empty when providing a Profile instance. "
213213
f"Provided constraints: {constraints} ; provided profile: {profile}"
214214
)
215215
console_step.finish(

src/guidellm/scheduler/worker.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
ScheduledRequestInfo,
3232
)
3333
from guidellm.scheduler.strategy import ScheduledRequestTimings
34-
from guidellm.utils import MsgpackEncoding, synchronous_to_exitable_async
34+
from guidellm.utils import MessageEncoding, synchronous_to_exitable_async
3535

3636
__all__ = ["WorkerProcess"]
3737

@@ -492,7 +492,7 @@ def _pull_requests_generator(self) -> Generator:
492492

493493
try:
494494
message = self.requests_queue.get(timeout=self.poll_intervals)
495-
request_tuple = MsgpackEncoding.decode(message)
495+
request_tuple = MessageEncoding.decode_message(message)
496496
self.pending_requests_queue.sync_put(request_tuple)
497497
except QueueEmpty:
498498
pass # No update available, continue polling
@@ -522,7 +522,9 @@ def _push_updates_generator(self) -> Generator:
522522
update_tuple[2]
523523
)
524524

525-
message = MsgpackEncoding.encode((response, request, request_info))
525+
message = MessageEncoding.encode_message(
526+
(response, request, request_info)
527+
)
526528
self.updates_queue.put(message)
527529
self.pending_updates_queue.task_done()
528530
except culsans.QueueEmpty:

src/guidellm/scheduler/worker_group.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@
4141
)
4242
from guidellm.scheduler.strategy import SchedulingStrategy
4343
from guidellm.scheduler.worker import WorkerProcess
44-
from guidellm.utils import MsgpackEncoding, synchronous_to_exitable_async
44+
from guidellm.utils import MessageEncoding, synchronous_to_exitable_async
4545

4646
__all__ = ["WorkerProcessGroup"]
4747

@@ -508,7 +508,7 @@ def _populate_requests_next_message(
508508
)
509509
state, continue_requests, _ = self._update_state(request_info)
510510

511-
request_msg = MsgpackEncoding.encode((request, request_info))
511+
request_msg = MessageEncoding.encode_message((request, request_info))
512512
update_msg = (None, request, request_info, state)
513513

514514
return (request_msg, update_msg), continue_requests
@@ -575,7 +575,7 @@ def _populate_updates_process_next(
575575
) -> tuple[SchedulerState | None, bool]:
576576
try:
577577
message = self.updates_queue.get(timeout=settings.scheduler_poll_interval)
578-
response, request, request_info = MsgpackEncoding.decode(message)
578+
response, request, request_info = MessageEncoding.decode_message(message)
579579

580580
scheduler_state, _, continue_updates = self._update_state(request_info)
581581
self.pending_updates_queue.sync_put(
@@ -596,7 +596,7 @@ def _populate_updates_cancel_remaining(
596596
message = self.requests_queue.get(
597597
timeout=settings.scheduler_poll_interval
598598
)
599-
request, request_info = MsgpackEncoding.decode(message)
599+
request, request_info = MessageEncoding.decode_message(message)
600600

601601
# Send start first
602602
request_info.status = "in_progress"

src/guidellm/scheduler/worker_queue.py

Lines changed: 0 additions & 152 deletions
This file was deleted.

src/guidellm/utils/__init__.py

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,22 @@
11
from .auto_importer import AutoImporterMixin
22
from .console import Colors, Console, ConsoleUpdateStep, StatusIcons, StatusStyles
33
from .default_group import DefaultGroupHandler
4-
from .encoding import MsgpackEncoding
5-
from .general import (
6-
UNSET,
7-
UnsetType,
4+
from .encoding import (
5+
EncodedTypeAlias,
6+
Encoder,
7+
EncodingTypesAlias,
8+
MessageEncoding,
9+
SerializationTypesAlias,
10+
SerializedTypeAlias,
11+
Serializer,
12+
)
13+
from .functions import (
814
all_defined,
915
safe_add,
1016
safe_divide,
1117
safe_format_timestamp,
1218
safe_getattr,
1319
safe_multiply,
14-
safe_subtract,
1520
)
1621
from .hf_datasets import (
1722
SUPPORTED_TYPES,
@@ -20,6 +25,13 @@
2025
from .hf_transformers import (
2126
check_load_processor,
2227
)
28+
from .messaging import (
29+
InterProcessMessaging,
30+
InterProcessMessagingManagerQueue,
31+
InterProcessMessagingPipe,
32+
InterProcessMessagingQueue,
33+
MessageT,
34+
)
2335
from .mixins import InfoMixin
2436
from .pydantic_utils import (
2537
PydanticClassRegistryMixin,
@@ -52,23 +64,34 @@
5264

5365
__all__ = [
5466
"SUPPORTED_TYPES",
55-
"UNSET",
5667
"AutoImporterMixin",
5768
"Colors",
5869
"Colors",
5970
"Console",
6071
"ConsoleUpdateStep",
6172
"DefaultGroupHandler",
6273
"DistributionSummary",
74+
"EncodedTypeAlias",
75+
"Encoder",
76+
"EncodingTypesAlias",
6377
"EndlessTextCreator",
6478
"InfoMixin",
6579
"IntegerRangeSampler",
66-
"MsgpackEncoding",
80+
"InterProcessMessaging",
81+
"InterProcessMessagingManagerQueue",
82+
"InterProcessMessagingPipe",
83+
"InterProcessMessagingQueue",
84+
"MessageEncoding",
85+
"MessageEncoding",
86+
"MessageT",
6787
"Percentiles",
6888
"PydanticClassRegistryMixin",
6989
"RegistryMixin",
7090
"ReloadableBaseModel",
7191
"RunningStats",
92+
"SerializationTypesAlias",
93+
"SerializedTypeAlias",
94+
"Serializer",
7295
"SingletonMixin",
7396
"StandardBaseDict",
7497
"StandardBaseModel",
@@ -78,7 +101,6 @@
78101
"StatusStyles",
79102
"ThreadSafeSingletonMixin",
80103
"TimeRunningStats",
81-
"UnsetType",
82104
"all_defined",
83105
"check_load_processor",
84106
"clean_text",
@@ -91,7 +113,6 @@
91113
"safe_format_timestamp",
92114
"safe_getattr",
93115
"safe_multiply",
94-
"safe_subtract",
95116
"save_dataset_to_file",
96117
"split_text",
97118
"split_text_list_by_length",

0 commit comments

Comments
 (0)