1+ import asyncio
12import logging
2- from abc import ABC
3+ import sys
4+ from abc import ABC , abstractmethod
35from dataclasses import dataclass
4- from typing import Optional , Generator , Union , Type
6+ from itertools import chain
7+ from os import environ
8+ from typing import Optional , Type , Union
59
6- from ratelimit import limits , sleep_and_retry
7- from requests import Response , request
10+ import ujson
11+ from aiohttp import ClientSession
12+ from requests .models import PreparedRequest
813
14+ from open_sea_v1 .helpers .rate_limiter import RateLimiter
915from open_sea_v1 .responses .abc import BaseResponse
1016
1117logger = logging .getLogger (__name__ )
1218
13- MAX_CALLS_PER_SECOND = 2 # gets overriden if API key is passed to ClientParams instance
14- RATE_LIMIT = 1 # second
15-
1619@dataclass
1720class ClientParams :
18- """Common OpenSea Endpoint parameters to pass in."""
21+ """
22+ Common OpenSea Endpoint parameters to pass in.
23+ Will automatically use OPENSEA_API_KEY environment variable as the api_key value, if it exists on the system.
24+ """
1925 offset : int = 0
2026 page_size : int = 50
2127 limit : int = 50
2228 max_pages : Optional [int ] = None
2329 api_key : Optional [str ] = None
2430
2531 def __post_init__ (self ):
26- if self .max_pages :
27- self .max_pages += 1 # prevent paginator from ending one page early
32+ # if self.max_pages:
33+ # self.max_pages += 1 # prevent paginator from ending one page early
34+ self ._attempt_setting_the_api_key ()
2835 self ._validate_attrs ()
29- self ._set_max_rate_limit ()
3036
3137 def _validate_attrs (self ) -> None :
3238 if not 0 < self .limit <= 300 :
@@ -52,84 +58,128 @@ def _decrement_max_pages_attr(self) -> None:
5258 if self .max_pages is not None :
5359 self .max_pages -= 1
5460
55- def _set_max_rate_limit (self ) -> None :
56- global MAX_CALLS_PER_SECOND
57- MAX_CALLS_PER_SECOND = 2 # per second
58- if self .api_key :
59- raise NotImplementedError ("I don't know what the rate limit is for calls with an API key is yet." )
61+ def _attempt_setting_the_api_key (self ) -> None :
62+ self .api_key = environ .get ('OPENSEA_API_KEY' )
6063
6164
6265@dataclass
6366class BaseClient (ABC ):
6467 """
68+ This is a partial implementation of a client class.
69+ You cannot instanciate this.
70+ Because of this, you can, however, access the children classes attributes and properties.
71+
6572 Parameters
6673 ----------
6774 client_params:
6875 ClientParams instance.
6976
70- rate_limiting: bool
71- If True, will throttle the amount of requests per second to the OpenSea API.
72- If you pass an API key into the client_params instance, the rate limiting will change accordingly.
73- If False, will not throttle.
77+ _rate_limit: int = 18
78+ Rate limit for the API is 20 when you have an API key.
79+ However, you run the risk of losing a a few seconds if you get throttled by the server.
80+ After some testing, it seems 18 is the sweet spot.
81+
82+ _concurrency_limit: int = 9
83+ Concurrency limit: number of simultaneous connections at a time.
84+ Best results obtained by using the largest multiple of _rate_limit, or second largest multiple.
85+ Otherwise you risk more throttling on the serverside than necessary.
7486 """
7587
7688 client_params : ClientParams
7789 url = None
78- rate_limiting : bool = True
90+
91+ _rate_limit : int = 18
92+ _concurrency_limit : int = 5
7993
8094 def __post_init__ (self ):
8195 self .processed_pages : int = 0
8296 self .response = None
8397 self .parsed_http_response = None
84- self ._http_response = None
98+ self ._latest_json_response = None
99+
100+ self ._rate_limit = 2 if not self .client_params .api_key else self ._rate_limit
101+
102+ @property
103+ @abstractmethod
104+ def _json_resp_key (self ) -> str :
105+ """To access the contents of a page from the contents of an OpenSea HTTP response,
106+ you need to use a dictionnary key."""
85107
86108 @property
87109 def http_headers (self ) -> dict :
88- params = { 'headers' : dict ()}
110+ headers = dict ()
89111 if self .client_params .api_key :
90- params ['headers' ] = {'X-API-Key' : self .client_params .api_key }
91- return params
92-
93- @sleep_and_retry
94- @limits (calls = MAX_CALLS_PER_SECOND , period = RATE_LIMIT )
95- def _get_request (self , ** kwargs ) -> Response :
96- """Get requests with a rate limiter."""
97- updated_kwargs = kwargs | self .http_headers
98- return request ('GET' , self .url , ** updated_kwargs )
99-
100- def parse_http_response (self , response_type : Type [BaseResponse ], key : str )\
101- -> list [Union [Type [BaseResponse ], BaseResponse ]]:
102- if self ._http_response :
103- the_json = self ._http_response .json ()
104- the_json = the_json [key ] if isinstance (the_json , dict ) else the_json # the collections endpoint needs this
105- responses = [response_type (element ) for element in the_json ]
106- return responses
107- return list ()
108-
109- def get_pages (self ) -> Generator [list [list [BaseResponse ]], None , None ]:
110- self .processed_pages = 0
111- self .client_params .offset = 0 if self .client_params .offset is None else self .client_params .offset
112- self ._http_response = None
113-
114- while self .remaining_pages ():
115- self ._http_response = self ._get_request ()
116- if self .parsed_http_response is not None : # edge case
117- self .processed_pages += 1
118- self .client_params .offset += self .client_params .page_size
112+ headers ['X-API-Key' ] = self .client_params .api_key
113+ return headers
114+
115+ def get_parsed_pages (self , flat : bool = True ) -> list :
116+ """Dispatches to the correct function depending on whether the user has an API key or not."""
117+ if sys .platform == 'win32' :
118+ asyncio .set_event_loop_policy (asyncio .WindowsSelectorEventLoopPolicy ()) # prevents closed loops errors on windows
119+ self ._latest_json_response = None # reset: required for pagination function
120+ results = asyncio .run (self ._aget_parsed_pages ()) # implement async generator so i can use yield from
121+ if not flat :
122+ return results
123+ flattened = list (chain .from_iterable (results ))
124+ return flattened
125+
126+ async def _aget_parsed_pages (self ) -> list [list [Type [BaseResponse ]]]: # cant be a synchronous generator
127+ all_parsed_jsons = list ()
128+
129+ async with RateLimiter (rate_limit = self ._rate_limit , concurrency_limit = self ._concurrency_limit ) as rate_limiter :
130+ async with ClientSession (headers = self .http_headers , json_serialize = ujson .dumps ) as session :
131+ json_batch = await self ._async_get_pages_jsons (session , rate_limiter = rate_limiter )
132+ parsed_json_batch = [self ._parse_json (j ) for j in json_batch ]
133+ all_parsed_jsons .extend (parsed_json_batch )
134+
135+ return all_parsed_jsons
136+
137+ async def _async_get_pages_jsons (self , session , * , rate_limiter : RateLimiter ) -> Optional [list [dict ]]:
138+ responses = list ()
139+ while self ._remaining_pages ():
140+
141+ self .client_params .offset += self .client_params .page_size
142+ params = {** self .get_params , ** {'offset' : self .client_params .offset }} # type: ignore
143+ querystring = self .mk_querystring (self .url , params = params )
144+
145+ async with rate_limiter .throttle ():
146+ resp = await session .get (querystring )
147+ json_resp = await resp .json ()
148+ self ._latest_json_response = json_resp
119149 self .client_params ._decrement_max_pages_attr ()
120- if not self .parsed_http_response :
121- break # prevents returning last empty page
122- yield self .parsed_http_response
123150
124- def remaining_pages (self ) -> bool :
125- if self ._http_response is None :
126- return True
151+ if potential_error_occurred := isinstance (json_resp , dict ) and 'detail' in json_resp .keys ():
152+ raise ConnectionError (f'{ (error_msg := json_resp ["detail" ])} ' )
127153
128- if is_the_last_page := len (self .parsed_http_response ) < self .client_params .page_size :
129- return False
154+ responses .append (json_resp )
155+ return responses
156+
157+ def _parse_json (self , the_json : Union [dict , list ]) -> list [Type [BaseResponse ]]:
158+ if not the_json :
159+ return list ()
160+
161+ if isinstance (the_json , dict ):
162+ json_list = the_json [self ._json_resp_key ] # type: ignore
163+
164+ if isinstance (the_json , list ):
165+ flattened = list (chain .from_iterable (the_json )) if isinstance (the_json [0 ], list ) else the_json # just in case multiple pages
166+ json_list = list (chain .from_iterable (j .get (self ._json_resp_key ) or [j ] for j in flattened ))
167+
168+ responses = [self ._response_type (element ) for element in json_list ] # type: ignore
169+ return responses
130170
171+ def _remaining_pages (self ) -> bool :
172+ if self ._latest_json_response is None :
173+ return True
174+ if is_the_last_page := len (self ._parse_json (self ._latest_json_response )) < self .client_params .page_size :
175+ return False
131176 max_pages_reached : bool = self .client_params .max_pages is not None and self .client_params .max_pages <= 0
132177 if max_pages_reached :
133178 return False
134-
135179 return True
180+
181+ @staticmethod
182+ def mk_querystring (url , params ) -> str :
183+ url_prepper = PreparedRequest ()
184+ url_prepper .prepare_url (url , params )
185+ return url_prepper .url
0 commit comments