Skip to content

Commit 56d30af

Browse files
committed
upd: изменена логика и механика работы ограничителей запросов; upd: доработаны тесты ядра
1 parent 61fa065 commit 56d30af

File tree

53 files changed

+2126
-2410
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

53 files changed

+2126
-2410
lines changed

examples/get_product_info_descriptions.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,9 @@
2626
Для prod значения параметров могут быть увеличены.
2727
"""
2828

29-
product_list_limit = 10 # Кол-во товаров, выгружаемых за одну итерацию
30-
consumers_amount = 5 # Кол-во потребителей, выгружающих описания
29+
product_list_limit = 10 # Кол-во товаров, выгружаемых за одну итерацию
30+
consumers_amount = 5 # Кол-во потребителей, выгружающих описания
31+
consumers_rps_max_limit = 6 # Максимальное кол-во запросов в секунду для каждого потребителя
3132
queue_max_size = product_list_limit * consumers_amount # Максимальный размер очереди
3233

3334
product_descriptions = list()
@@ -39,7 +40,7 @@ async def producer(queue):
3940
async with SellerAPI(
4041
config=SellerAPIConfig(
4142
# Понижаем уровень логирования (для наглядности)
42-
log_level="INFO"
43+
log_level="DEBUG"
4344
)
4445
) as api:
4546

@@ -81,7 +82,8 @@ async def consumer(queue):
8182
async with SellerAPI(
8283
config=SellerAPIConfig(
8384
# Понижаем уровень логирования
84-
log_level="INFO",
85+
log_level="DEBUG",
86+
max_requests_per_second=consumers_rps_max_limit
8587
)
8688
) as api:
8789
while True:

src/ozonapi/seller/core/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ class APIConfig(BaseSettings):
5656
description="Базовый URL API Ozon"
5757
)
5858
max_requests_per_second: int = Field(
59-
default=25,
59+
default=27,
6060
ge=1,
6161
le=50,
6262
description="Максимальное количество запросов в секунду (50 по документации Ozon)"

src/ozonapi/seller/core/core.py

Lines changed: 18 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
retry_if_exception_type,
1313
stop_after_attempt,
1414
wait_exponential,
15-
before_sleep_log,
1615
)
1716

1817
from .config import APIConfig
1918
from .method_rate_limiter import MethodRateLimiterManager
20-
from .rate_limiter import RateLimiterConfig, RateLimiterManager
19+
from .rate_limiter import RateLimiterManager
2120
from .sessions import SessionManager
2221
from .exceptions import (
2322
APIClientError,
@@ -41,11 +40,9 @@ class APIManager:
4140
"""
4241

4342
# Общие менеджеры для всех экземпляров класса
44-
_rate_limiter_manager: ClassVar[Optional[RateLimiterManager]] = None
4543
_session_manager: ClassVar[Optional[SessionManager]] = None
4644
_method_rate_limiter_manager: ClassVar[Optional[MethodRateLimiterManager]] = None
4745
_initialized: ClassVar[bool] = False
48-
_instance_count: ClassVar[int] = 0
4946

5047
_class_logger: ClassVar[Logger] = APIConfig().logger
5148

@@ -66,45 +63,39 @@ def __init__(
6663
config: Конфигурация клиента
6764
"""
6865
self._config = self.load_config(config)
66+
6967
self._client_id = client_id or self._config.client_id
7068
self._api_key = api_key or self._config.api_key
7169
self._token = token or self._config.token
7270

73-
if (self._token is None and (self._api_key is None or self._client_id is None)):
74-
raise ValueError(
75-
"Не предоставлены авторизационные данные. Проверьте указание token или client_id и api_key."
76-
)
71+
self._validate_credentials()
7772

7873
self._instance_id = id(self)
79-
self._registered = False
8074
self._closed = False
8175
self._logging_manager = None
76+
self._instance_logger_number = None
8277
self._instance_logger: Logger = self._get_instance_logger()
8378

84-
self._validate_credentials()
85-
8679
if self._token is not None and self._client_id is None:
8780
self._client_id = "OAuth {}".format(int(hashlib.sha256(self._token.encode()).hexdigest()[:10], 16) % 10000000)
8881

82+
self._rate_limiter = RateLimiterManager(
83+
instance=self,
84+
logger=logging.manager.get_logger(f"seller.client[{self._client_id}]-[{self._instance_logger_number}].rate_limiter")
85+
)
8986

90-
if APIManager._rate_limiter_manager is None:
91-
APIManager._rate_limiter_manager = RateLimiterManager(
92-
cleanup_interval=self._config.cleanup_interval,
93-
instance_logger=logging.manager.get_logger(f"seller.rate_limiter")
94-
)
9587
if APIManager._session_manager is None:
9688
APIManager._session_manager = SessionManager(
9789
timeout=self._config.request_timeout,
9890
connector_limit=self._config.connector_limit,
99-
instance_logger=logging.manager.get_logger(f"seller.session")
91+
instance_logger=logging.manager.get_logger(f"seller.client[{self._client_id}].session")
10092
)
10193
if APIManager._method_rate_limiter_manager is None:
10294
APIManager._method_rate_limiter_manager = MethodRateLimiterManager(
10395
cleanup_interval=self._config.cleanup_interval,
104-
instance_logger=logging.manager.get_logger(f"seller.method_rate_limiter")
96+
instance_logger=logging.manager.get_logger(f"seller.client[{self._client_id}].method_rate_limiter")
10597
)
10698

107-
APIManager._instance_count += 1
10899
self.logger.debug(f"API-клиент инициализирован")
109100

110101
@classmethod
@@ -152,6 +143,7 @@ def _get_instance_logger(self) -> Logger:
152143
except RuntimeError:
153144
log_instance_count += 1
154145
else:
146+
self._instance_logger_number = log_instance_count
155147
break
156148

157149
return self._logging_manager.get_logger()
@@ -160,8 +152,6 @@ def _get_instance_logger(self) -> Logger:
160152
async def initialize(cls) -> None:
161153
"""Инициализация ресурсов."""
162154
if not cls._initialized:
163-
if cls._rate_limiter_manager:
164-
await cls._rate_limiter_manager.start()
165155
if cls._method_rate_limiter_manager:
166156
await cls._method_rate_limiter_manager.start()
167157
cls._initialized = True
@@ -171,8 +161,6 @@ async def initialize(cls) -> None:
171161
async def shutdown(cls) -> None:
172162
"""Очистка ресурсов."""
173163
if cls._initialized:
174-
if cls._rate_limiter_manager:
175-
await cls._rate_limiter_manager.shutdown()
176164
if cls._method_rate_limiter_manager:
177165
await cls._method_rate_limiter_manager.shutdown()
178166
if cls._session_manager:
@@ -183,39 +171,25 @@ async def shutdown(cls) -> None:
183171
def _validate_credentials(self) -> None:
184172
"""Валидация учетных данных."""
185173
if self._token is not None:
186-
# Валидация для OAuth-токена
187174
if self._token.startswith("Bearer "):
188175
self._token = self._token[7:]
189176
if not self._token or not isinstance(self._token, str):
190177
raise ValueError("token должен быть непустой строкой")
191178

192179
elif self._api_key is not None:
193-
# Валидация для классической авторизации
194180
if not self._client_id or not isinstance(self._client_id, str):
195181
raise ValueError("client_id должен быть непустой строкой")
196182
if not self._api_key or not isinstance(self._api_key, str):
197183
raise ValueError("api_key должен быть непустой строкой")
198184
else:
199185
raise ValueError("Не предоставлены авторизационные данные")
200186

201-
async def _ensure_registered(self) -> None:
202-
"""Гарантирует регистрацию экземпляра в менеджерах."""
203-
if self._closed:
204-
raise RuntimeError(f"Регистрация API-клиента отменена для ClientID {self._client_id}")
205-
206-
if not self._registered and self._rate_limiter_manager:
207-
await self._rate_limiter_manager.register_instance(
208-
self._client_id, self._instance_id
209-
)
210-
self._registered = True
211-
212187
async def __aenter__(self) -> "APIManager":
213188
"""Асинхронный контекстный менеджер."""
214189
if self._closed:
215190
raise RuntimeError(f"Невозможно использовать закрытый API-клиент для ClientID {self._client_id}")
216191

217192
await self.initialize()
218-
await self._ensure_registered()
219193
return self
220194

221195
async def __aexit__(
@@ -233,17 +207,8 @@ async def close(self) -> None:
233207

234208
self._closed = True
235209

236-
if self._registered and self._rate_limiter_manager:
237-
await self._rate_limiter_manager.unregister_instance(
238-
self._client_id, self._instance_id
239-
)
240-
self._registered = False
241-
242-
APIManager._instance_count -= 1
243-
244-
if APIManager._instance_count == 0:
245-
if APIManager._session_manager:
246-
await APIManager._session_manager.close_all()
210+
if APIManager._session_manager:
211+
await APIManager._session_manager.remove_instance(self._client_id, self._instance_id)
247212

248213
self.logger.debug(f"Работа API-клиента завершена")
249214
self._logging_manager.shutdown()
@@ -273,11 +238,6 @@ def logger(self):
273238
"""Возвращает логер экземпляра."""
274239
return self._instance_logger
275240

276-
@classmethod
277-
def get_instance_count(cls) -> int:
278-
"""Получает количество активных экземпляров."""
279-
return cls._instance_count
280-
281241
def _create_retry_decorator(self):
282242
"""Создает декоратор повторов на основе конфигурации."""
283243

@@ -350,7 +310,6 @@ async def _request(
350310
351311
Args:
352312
method: HTTP метод запроса
353-
api_name: Название API
354313
api_version: Версия API
355314
endpoint: Конечная точка API
356315
payload: Данные для отправки в формате JSON
@@ -371,9 +330,6 @@ async def _request(
371330
if self._closed:
372331
raise RuntimeError("API-клиент остановлен")
373332

374-
if not self._rate_limiter_manager or not self._session_manager:
375-
raise RuntimeError("API-клиент не инициализирован")
376-
377333
url = f"{self._config.base_url}/{api_version}/{endpoint}"
378334

379335
def get_payload_snippet(p: dict | None) -> str | None:
@@ -385,22 +341,16 @@ def get_payload_snippet(p: dict | None) -> str | None:
385341

386342
return string if len(string) < 200 else string[:200] + "..."
387343

388-
log_context = {
344+
log_context: dict[str, Any] = {
389345
"method": method,
390346
"endpoint": f"{api_version}/{endpoint}",
391347
"payload": get_payload_snippet(payload),
392348
}
393349

394350
self.logger.info(f"Отправка запроса к API: {log_context}")
395351

396-
await self._ensure_registered()
397-
398-
limiter_config = RateLimiterConfig(
399-
max_requests=self._config.max_requests_per_second,
400-
)
401-
rate_limiter = await self._rate_limiter_manager.get_limiter(
402-
self._client_id, limiter_config
403-
)
352+
instance_limiter = self._rate_limiter.instance_limiter
353+
client_limiter = self._rate_limiter.client_limiter
404354

405355
retry_decorator = self._create_retry_decorator()
406356

@@ -412,7 +362,7 @@ async def _execute_request():
412362
instance_id=self._instance_id,
413363
token=self._token
414364
) as session:
415-
async with rate_limiter:
365+
async with instance_limiter, client_limiter:
416366
try:
417367
async with session.request(
418368
method, url, json=payload, params=params
@@ -463,23 +413,7 @@ async def _execute_request():
463413
@classmethod
464414
async def get_active_client_ids(cls) -> list[str]:
465415
"""Возвращает список client_id с активными экземплярами."""
466-
if cls._rate_limiter_manager:
467-
return await cls._rate_limiter_manager.get_active_client_ids()
468-
return list()
469-
470-
@classmethod
471-
async def get_rate_limiter_stats(cls) -> dict[str, int]:
472-
"""Возвращает статистику по ограничителям запросов."""
473-
if cls._rate_limiter_manager:
474-
return await cls._rate_limiter_manager.get_instance_stats()
475-
return dict()
476-
477-
@classmethod
478-
async def get_detailed_stats(cls) -> dict[str, dict[str, Any]]:
479-
"""Возвращает детальную статистику."""
480-
if cls._rate_limiter_manager:
481-
return await cls._rate_limiter_manager.get_limiter_stats()
482-
return dict()
416+
return RateLimiterManager.get_active_client_ids()
483417

484418
@classmethod
485419
async def get_method_limiter_stats(cls) -> dict[str, dict[str, Any]]:

0 commit comments

Comments
 (0)