11"""
22Environment abstractions for coordinating scheduler execution across distributed nodes.
33
4- Provides environment abstractions that handle synchronization, timing coordination,
5- error propagation, and lifecycle management for scheduler execution across single
6- or multiple nodes. The Environment protocol defines the interface for distributed
4+ Provides abstractions that handle synchronization, timing coordination, error
5+ propagation, and lifecycle management for scheduler execution across single or
6+ multiple nodes. The Environment protocol defines the interface for distributed
77coordination while NonDistributedEnvironment provides a minimal implementation
8- for single-node execution.
8+ for single-node execution. Environments manage the complete execution lifecycle
9+ from parameter distribution through result aggregation.
910
10- Environment Execution Flow:
11- 1. sync_run_params() - Distribute workload and synchronize parameters across nodes
12- 2. sync_run_start() - Coordinate synchronized start time for all nodes
13- 3. update_run_iteration() - Update state after each request (called per iteration)
11+ Execution Flow:
12+ 1. sync_run_params() - Distribute workload and synchronize parameters
13+ 2. sync_run_start() - Coordinate synchronized start time
14+ 3. update_run_iteration() - Update state after each request iteration
14154. sync_run_error() - Handle and propagate errors across nodes
15- 5. sync_run_end() - Aggregate results and cleanup at completion
16+ 5. sync_run_end() - Aggregate results and finalize execution
1617"""
1718
1819from __future__ import annotations
3940
4041class Environment (ABC , Generic [RequestT , ResponseT ], InfoMixin ):
4142 """
42- Abstract base for coordinating scheduler execution across distributed nodes.
43+ Abstract interface for coordinating scheduler execution across distributed nodes.
4344
44- Defines the interface for managing distributed scheduler execution including
45+ Defines the protocol for managing distributed scheduler execution including
4546 parameter synchronization, timing coordination, state updates, error propagation,
46- and result aggregation. Implementations handle the complexity of distributed
47- coordination while providing a unified interface for scheduler orchestration.
47+ and result aggregation. Implementations handle distributed coordination complexity
48+ while providing a unified interface for scheduler orchestration.
4849 """
4950
5051 @abstractmethod
@@ -61,10 +62,6 @@ async def sync_run_params(
6162 """
6263 Synchronize execution parameters across nodes and resolve local scope.
6364
64- Coordinates parameter distribution and validation across active nodes.
65- In distributed environments, handles node assignment and workload partitioning.
66- In non-distributed environments, typically returns parameters unchanged.
67-
6865 :param requests: Complete set of requests to process across all nodes
6966 :param strategy: Scheduling strategy to apply during execution
7067 :param constraints: Runtime constraints to enforce during execution
@@ -78,9 +75,6 @@ async def sync_run_start(self) -> float:
7875 """
7976 Coordinate synchronized start time across all nodes.
8077
81- Ensures all nodes begin processing simultaneously for accurate benchmarking
82- and consistent timing measurements across distributed execution.
83-
8478 :return: Unix timestamp when all nodes should begin processing
8579 :raises Exception: If startup synchronization fails across nodes
8680 """
@@ -97,11 +91,6 @@ async def update_run_iteration(
9791 """
9892 Update environment state with completed request iteration results.
9993
100- Called after each request processing to update execution progress and
101- synchronize any required state across nodes in distributed environments.
102- Generally, distributed is expected to store the iteration updates until
103- all nodes have processed and sync_run_end is called to retrieve them.
104-
10594 :param response: Response generated for the request, if successful
10695 :param request: The processed request
10796 :param request_info: Metadata about request processing including timings
@@ -115,9 +104,6 @@ async def sync_run_error(self, err: list[Exception] | Exception):
115104 """
116105 Handle and propagate errors across all active nodes.
117106
118- Coordinates error handling when failures occur, ensuring all nodes are
119- notified for appropriate cleanup or shutdown procedures.
120-
121107 :param err: The exception(s) that occurred during execution
122108 """
123109 ...
@@ -136,10 +122,6 @@ async def sync_run_end(
136122 """
137123 Finalize execution and aggregate results from all nodes.
138124
139- Handles cleanup, result synchronization, and error propagation at execution
140- completion. Collects and yields results from worker nodes in distributed
141- environments.
142-
143125 :return: Iterator of (response, request, request_info, state) tuples from
144126 remote nodes in distributed environments, empty for non-distributed
145127 :raises Exception: Any errors that occurred during execution
@@ -151,9 +133,9 @@ class NonDistributedEnvironment(Environment[RequestT, ResponseT]):
151133 """
152134 Single-node scheduler execution environment with minimal coordination overhead.
153135
154- Simplified environment for running schedulers on a single node without distributed
155- coordination requirements. Implements the Environment interface with no-op
156- synchronization for local testing, development, and single-machine benchmarking .
136+ Implements the Environment interface with no-op synchronization for local testing,
137+ development, and single-machine benchmarking. All synchronization methods return
138+ immediately without distributed coordination logic .
157139
158140 Example:
159141 ::
@@ -165,29 +147,27 @@ class NonDistributedEnvironment(Environment[RequestT, ResponseT]):
165147 SynchronousStrategy,
166148 )
167149
168-
169- # Definitions
150+ env = NonDistributedEnvironment()
170151 requests = [f"req_{ind}" for ind in range(5)]
171152 strategy = SynchronousStrategy()
172153 constraints = {"max_num": MaxNumberConstraint(max_num=5)}
173154 state = SchedulerState()
174155
175- # Run environment
176156 local_req, local_strat, local_const = await env.sync_run_params(
177157 requests, strategy, constraints
178158 )
179159 start_time = await env.sync_run_start()
180160 for req in local_req:
181161 state.processed_requests += 1
182- await env.update_run_iteration(
183- f"resp_{req}", req, RequestInfo(), state
184- )
162+ await env.update_run_iteration(f"resp_{req}", req, RequestInfo(), state)
185163 async for nonlocal_req in env.sync_run_end():
186164 state.processed_requests += 1
187165 """
188166
189167 def __init__ (self ):
190- """Initialize with empty error storage for single-node execution."""
168+ """
169+ Initialize single-node environment with empty error storage.
170+ """
191171 self .run_errors : list [Exception ] = []
192172
193173 async def sync_run_params (
@@ -206,15 +186,15 @@ async def sync_run_params(
206186 :param requests: Requests to process locally
207187 :param strategy: Scheduling strategy to apply during execution
208188 :param constraints: Runtime constraints to enforce during execution
209- :return: Tuple containing the original (requests, strategy, constraints)
189+ :return: Original (requests, strategy, constraints) tuple unchanged
210190 """
211191 return requests , strategy , constraints
212192
213193 async def sync_run_start (self ) -> float :
214194 """
215195 Return current time plus configured delay for single-node startup.
216196
217- :return: Unix timestamp for when the run should start
197+ :return: Unix timestamp when execution should begin
218198 """
219199 return time .time () + settings .scheduler_start_delay_non_distributed
220200
@@ -229,7 +209,7 @@ async def update_run_iteration(
229209 No-op for single-node execution with no distributed state synchronization.
230210
231211 :param response: Response generated for the request, if successful
232- :param request: The request that was processed
212+ :param request: The processed request
233213 :param request_info: Metadata about request processing including timings
234214 :param state: Current scheduler state with metrics and progress
235215 """
@@ -256,7 +236,7 @@ async def sync_run_end(
256236 """
257237 Finalize single-node execution and propagate any stored errors.
258238
259- :return: Empty iterator since there are no remote nodes
239+ :return: Empty iterator as there are no remote nodes
260240 :raises Exception: Any error stored during execution via sync_run_error
261241 """
262242 if self .run_errors :
0 commit comments