Skip to content

Commit 94231e0

Browse files
committed
refactor(client): 🔨 centralize request execution setup logic
This commit extracts the initialization, credential filtering, and validation steps into a reusable `_prepare_execution` helper method within `RequestExecutor`. This ensures consistency and reduces code duplication between streaming and non-streaming request handlers. Also in this commit: - refactor(usage): remove unused `release` method from `TrackingEngine
1 parent ba60834 commit 94231e0

File tree

2 files changed

+54
-63
lines changed

2 files changed

+54
-63
lines changed

src/rotator_library/client/executor.py

Lines changed: 54 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,17 @@
1717
import logging
1818
import random
1919
import time
20-
from typing import Any, AsyncGenerator, Dict, List, Optional, Set, TYPE_CHECKING, Union
20+
from typing import (
21+
Any,
22+
AsyncGenerator,
23+
Dict,
24+
List,
25+
Optional,
26+
Set,
27+
TYPE_CHECKING,
28+
Tuple,
29+
Union,
30+
)
2131

2232
import httpx
2333
import litellm
@@ -143,34 +153,21 @@ async def execute(
143153
else:
144154
return await self._execute_non_streaming(context)
145155

146-
async def _execute_non_streaming(
156+
async def _prepare_execution(
147157
self,
148158
context: RequestContext,
149-
) -> Any:
150-
"""
151-
Execute non-streaming request with retry/rotation.
152-
153-
Args:
154-
context: RequestContext with all request details
155-
156-
Returns:
157-
Response object
158-
"""
159+
) -> Tuple["UsageManager", Any, List[str], Optional[str], Dict[str, Any]]:
159160
provider = context.provider
160161
model = context.model
161-
deadline = context.deadline
162162

163-
# Get the UsageManager for this provider
164163
usage_manager = self._usage_managers.get(provider)
165164
if not usage_manager:
166165
raise NoAvailableKeysError(f"No UsageManager for provider {provider}")
167166

168-
# Filter credentials by tier
169167
filter_result = self._filter.filter_by_tier(
170168
context.credentials, model, provider
171169
)
172170
credentials = filter_result.all_usable
173-
174171
quota_group = usage_manager.get_model_quota_group(model)
175172

176173
await self._ensure_initialized(usage_manager, context, filter_result)
@@ -179,13 +176,41 @@ async def _execute_non_streaming(
179176
if not credentials:
180177
raise NoAvailableKeysError(f"No compatible credentials for model {model}")
181178

182-
error_accumulator = RequestErrorAccumulator()
183-
error_accumulator.model = model
184-
error_accumulator.provider = provider
185179
request_headers = (
186180
dict(context.request.headers) if context.request is not None else {}
187181
)
188182

183+
return usage_manager, filter_result, credentials, quota_group, request_headers
184+
185+
async def _execute_non_streaming(
186+
self,
187+
context: RequestContext,
188+
) -> Any:
189+
"""
190+
Execute non-streaming request with retry/rotation.
191+
192+
Args:
193+
context: RequestContext with all request details
194+
195+
Returns:
196+
Response object
197+
"""
198+
provider = context.provider
199+
model = context.model
200+
deadline = context.deadline
201+
202+
(
203+
usage_manager,
204+
filter_result,
205+
credentials,
206+
quota_group,
207+
request_headers,
208+
) = await self._prepare_execution(context)
209+
210+
error_accumulator = RequestErrorAccumulator()
211+
error_accumulator.model = model
212+
error_accumulator.provider = provider
213+
189214
retry_state = RetryState()
190215
last_exception: Optional[Exception] = None
191216

@@ -430,37 +455,18 @@ async def _execute_streaming(
430455
model = context.model
431456
deadline = context.deadline
432457

433-
# Get the UsageManager for this provider
434-
usage_manager = self._usage_managers.get(provider)
435-
if not usage_manager:
436-
error_data = {
437-
"error": {
438-
"message": f"No UsageManager for provider {provider}",
439-
"type": "proxy_error",
440-
}
441-
}
442-
yield f"data: {json.dumps(error_data)}\n\n"
443-
yield "data: [DONE]\n\n"
444-
return
445-
446-
# Filter credentials by tier
447-
filter_result = self._filter.filter_by_tier(
448-
context.credentials, model, provider
449-
)
450-
credentials = filter_result.all_usable
451-
quota_group = usage_manager.get_model_quota_group(model)
452-
453-
await self._ensure_initialized(usage_manager, context, filter_result)
454-
await self._validate_request(provider, model, context.kwargs)
455-
456-
request_headers = (
457-
dict(context.request.headers) if context.request is not None else {}
458-
)
459-
460-
if not credentials:
458+
try:
459+
(
460+
usage_manager,
461+
filter_result,
462+
credentials,
463+
quota_group,
464+
request_headers,
465+
) = await self._prepare_execution(context)
466+
except NoAvailableKeysError as exc:
461467
error_data = {
462468
"error": {
463-
"message": f"No compatible credentials for {model}",
469+
"message": str(exc),
464470
"type": "proxy_error",
465471
}
466472
}

src/rotator_library/usage/tracking/engine.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -354,21 +354,6 @@ async def acquire(
354354
state.active_requests += 1
355355
return True
356356

357-
async def release(
358-
self,
359-
state: CredentialState,
360-
model: str,
361-
) -> None:
362-
"""
363-
Release a credential after request completes.
364-
365-
Args:
366-
state: Credential state
367-
model: Model that was used
368-
"""
369-
async with self._lock:
370-
return
371-
372357
async def apply_cooldown(
373358
self,
374359
state: CredentialState,

0 commit comments

Comments
 (0)