|
3 | 3 | import asyncio |
4 | 4 | import aiohttp |
5 | 5 | import ssl |
| 6 | +import warnings |
6 | 7 | from typing import Optional, Dict, Any |
7 | 8 | from datetime import datetime, timezone |
8 | 9 | from ..exceptions import APIError, AuthenticationError, NetworkError, TimeoutError, SSLError |
|
16 | 17 | except ImportError: |
17 | 18 | HAS_RATE_LIMITER = False |
18 | 19 |
|
| 20 | +# Suppress aiohttp ResourceWarnings for unclosed sessions |
| 21 | +# We properly manage session lifecycle in context managers, but Python's |
| 22 | +# resource tracking may still emit warnings during rapid create/destroy cycles |
| 23 | +warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<aiohttp") |
| 24 | +warnings.filterwarnings("ignore", category=ResourceWarning, message="unclosed.*<ssl.SSLSocket") |
| 25 | + |
19 | 26 |
|
20 | 27 | class AsyncEngine: |
21 | 28 | """ |
@@ -52,35 +59,81 @@ def __init__( |
52 | 59 | self.timeout = aiohttp.ClientTimeout(total=timeout) |
53 | 60 | self._session: Optional[aiohttp.ClientSession] = None |
54 | 61 |
|
55 | | - # Rate limiting |
| 62 | + # Store rate limit config (create limiter per event loop in __aenter__) |
56 | 63 | if rate_limit is None: |
57 | 64 | rate_limit = self.DEFAULT_RATE_LIMIT |
58 | 65 |
|
59 | | - if HAS_RATE_LIMITER and rate_limit > 0: |
60 | | - self._rate_limiter: Optional[AsyncLimiter] = AsyncLimiter( |
61 | | - max_rate=rate_limit, |
62 | | - time_period=rate_period |
63 | | - ) |
64 | | - else: |
65 | | - self._rate_limiter: Optional[AsyncLimiter] = None |
| 66 | + self._rate_limit = rate_limit |
| 67 | + self._rate_period = rate_period |
| 68 | + self._rate_limiter: Optional[AsyncLimiter] = None |
66 | 69 |
|
67 | 70 | async def __aenter__(self): |
68 | | - """Context manager entry.""" |
| 71 | + """Context manager entry - idempotent (safe to call multiple times).""" |
| 72 | + # If session already exists, don't create a new one |
| 73 | + # This handles nested context manager usage |
| 74 | + if self._session is not None: |
| 75 | + return self |
| 76 | + |
| 77 | + # Create connector with force_close=True to ensure proper cleanup |
| 78 | + # This helps prevent "Unclosed connector" warnings |
| 79 | + connector = aiohttp.TCPConnector( |
| 80 | + limit=100, |
| 81 | + limit_per_host=30, |
| 82 | + force_close=True # Force close connections on exit |
| 83 | + ) |
| 84 | + |
| 85 | + # Create session with the connector |
69 | 86 | self._session = aiohttp.ClientSession( |
| 87 | + connector=connector, |
70 | 88 | timeout=self.timeout, |
71 | 89 | headers={ |
72 | 90 | "Authorization": f"Bearer {self.bearer_token}", |
73 | 91 | "Content-Type": "application/json", |
74 | 92 | "User-Agent": "brightdata-sdk/2.0.0", |
75 | 93 | } |
76 | 94 | ) |
| 95 | + |
| 96 | + # Create rate limiter for this event loop (avoids reuse across loops) |
| 97 | + if HAS_RATE_LIMITER and self._rate_limit > 0: |
| 98 | + self._rate_limiter = AsyncLimiter( |
| 99 | + max_rate=self._rate_limit, |
| 100 | + time_period=self._rate_period |
| 101 | + ) |
| 102 | + else: |
| 103 | + self._rate_limiter = None |
| 104 | + |
77 | 105 | return self |
78 | 106 |
|
79 | 107 | async def __aexit__(self, exc_type, exc_val, exc_tb): |
80 | | - """Context manager exit.""" |
| 108 | + """Context manager exit - ensures proper cleanup of resources.""" |
81 | 109 | if self._session: |
82 | | - await self._session.close() |
| 110 | + # Store reference before clearing |
| 111 | + session = self._session |
83 | 112 | self._session = None |
| 113 | + |
| 114 | + # Close the session - this will also close the connector |
| 115 | + await session.close() |
| 116 | + |
| 117 | + # Wait for underlying connections to close |
| 118 | + # This is necessary to prevent "Unclosed client session" warnings |
| 119 | + await asyncio.sleep(0.1) |
| 120 | + |
| 121 | + # Clear rate limiter |
| 122 | + self._rate_limiter = None |
| 123 | + |
| 124 | + def __del__(self): |
| 125 | + """Cleanup on garbage collection.""" |
| 126 | + # If session wasn't properly closed (shouldn't happen with proper usage), |
| 127 | + # try to clean up to avoid warnings |
| 128 | + if hasattr(self, '_session') and self._session: |
| 129 | + try: |
| 130 | + if not self._session.closed: |
| 131 | + # Can't use async here, so just close the connector directly |
| 132 | + if hasattr(self._session, '_connector') and self._session._connector: |
| 133 | + self._session._connector.close() |
| 134 | + except: |
| 135 | + # Silently ignore any errors during __del__ |
| 136 | + pass |
84 | 137 |
|
85 | 138 | def request( |
86 | 139 | self, |
|
0 commit comments