Skip to content

Commit a87bf53

Browse files
authored
Asynchronous client: Enable pagination by default (#109)
1 parent 53aa1b1 commit a87bf53

File tree

2 files changed

+237
-53
lines changed

2 files changed

+237
-53
lines changed

dune_client/client_async.py

Lines changed: 194 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
import asyncio
1010
from io import BytesIO
11-
from typing import Any, Callable, Optional, Union
11+
from typing import Any, Callable, Dict, Optional, Union
1212

1313
from aiohttp import (
1414
ClientResponseError,
@@ -19,7 +19,12 @@
1919
ClientTimeout,
2020
)
2121

22-
from dune_client.api.base import BaseDuneClient
22+
from dune_client.api.base import (
23+
BaseDuneClient,
24+
DUNE_CSV_NEXT_URI_HEADER,
25+
DUNE_CSV_NEXT_OFFSET_HEADER,
26+
MAX_NUM_ROWS_PER_BATCH,
27+
)
2328
from dune_client.models import (
2429
ExecutionResponse,
2530
ExecutionResultCSV,
@@ -119,8 +124,20 @@ async def _handle_response(self, response: ClientResponse) -> Any:
119124
response.raise_for_status()
120125
raise ValueError("Unreachable since previous line raises") from err
121126

122-
def _route_url(self, route: str) -> str:
123-
return f"{self.api_version}{route}"
127+
def _route_url(
128+
self,
129+
route: Optional[str] = None,
130+
url: Optional[str] = None,
131+
) -> str:
132+
if route is not None:
133+
final_route = f"{self.api_version}{route}"
134+
elif url is not None:
135+
assert url.startswith(self.base_url)
136+
final_route = url[len(self.base_url) :]
137+
else:
138+
assert route is not None or url is not None
139+
140+
return final_route
124141

125142
async def _handle_ratelimit(self, call: Callable[..., Any], url: str) -> Any:
126143
"""Generic wrapper around request callables. If the request fails due to rate limiting,
@@ -142,26 +159,27 @@ async def _handle_ratelimit(self, call: Callable[..., Any], url: str) -> Any:
142159

143160
async def _get(
144161
self,
145-
route: str,
162+
route: Optional[str] = None,
146163
params: Optional[Any] = None,
147164
raw: bool = False,
165+
url: Optional[str] = None,
148166
) -> Any:
149-
url = self._route_url(route)
150-
self.logger.debug(f"GET received input url={url}")
167+
final_route = self._route_url(route=route, url=url)
168+
self.logger.debug(f"GET received input route={final_route}")
151169

152170
async def _get() -> Any:
153171
if self._session is None:
154172
raise ValueError("Client is not connected; call `await cl.connect()`")
155173
response = await self._session.get(
156-
url=url,
174+
url=final_route,
157175
headers=self.default_headers(),
158176
params=params,
159177
)
160178
if raw:
161179
return response
162180
return await self._handle_response(response)
163181

164-
return await self._handle_ratelimit(_get, route)
182+
return await self._handle_ratelimit(_get, final_route)
165183

166184
async def _post(self, route: str, params: Any) -> Any:
167185
url = self._route_url(route)
@@ -206,31 +224,44 @@ async def get_status(self, job_id: str) -> ExecutionStatusResponse:
206224
except KeyError as err:
207225
raise DuneError(response_json, "ExecutionStatusResponse", err) from err
208226

209-
async def get_result(self, job_id: str) -> ResultsResponse:
227+
async def get_result(
228+
self,
229+
job_id: str,
230+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
231+
) -> ResultsResponse:
210232
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
211-
response_json = await self._get(route=f"/execution/{job_id}/results")
212-
try:
213-
return ResultsResponse.from_dict(response_json)
214-
except KeyError as err:
215-
raise DuneError(response_json, "ResultsResponse", err) from err
216233

217-
async def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
234+
results = await self._get_result_page(job_id, limit=batch_size)
235+
while results.next_uri is not None:
236+
batch = await self._get_result_by_url(results.next_uri)
237+
results += batch
238+
239+
return results
240+
241+
async def get_result_csv(
242+
self,
243+
job_id: str,
244+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
245+
) -> ExecutionResultCSV:
218246
"""
219247
GET results in CSV format from Dune API for `job_id` (aka `execution_id`)
220248
221249
this API only returns the raw data in CSV format, it is faster & lighterweight
222250
use this method for large results where you want lower CPU and memory overhead
223251
if you need metadata information use get_results() or get_status()
224252
"""
225-
route = f"/execution/{job_id}/results/csv"
226-
url = self._route_url(f"/execution/{job_id}/results/csv")
227-
self.logger.debug(f"GET CSV received input url={url}")
228-
response = await self._get(route=route, raw=True)
229-
response.raise_for_status()
230-
return ExecutionResultCSV(data=BytesIO(await response.content.read(-1)))
253+
254+
results = await self._get_result_csv_page(job_id, limit=batch_size)
255+
while results.next_uri is not None:
256+
batch = await self._get_result_csv_by_url(results.next_uri)
257+
results += batch
258+
259+
return results
231260

232261
async def get_latest_result(
233-
self, query: Union[QueryBase, str, int]
262+
self,
263+
query: Union[QueryBase, str, int],
264+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
234265
) -> ResultsResponse:
235266
"""
236267
GET the latest results for a query_id without having to execute the query again.
@@ -240,12 +271,23 @@ async def get_latest_result(
240271
https://dune.com/docs/api/api-reference/latest_results/
241272
"""
242273
params, query_id = parse_query_object_or_id(query)
274+
275+
if params is None:
276+
params = {}
277+
278+
params["limit"] = batch_size
279+
243280
response_json = await self._get(
244281
route=f"/query/{query_id}/results",
245282
params=params,
246283
)
247284
try:
248-
return ResultsResponse.from_dict(response_json)
285+
results = ResultsResponse.from_dict(response_json)
286+
while results.next_uri is not None:
287+
batch = await self._get_result_by_url(results.next_uri)
288+
results += batch
289+
290+
return results
249291
except KeyError as err:
250292
raise DuneError(response_json, "ResultsResponse", err) from err
251293

@@ -262,36 +304,16 @@ async def cancel_execution(self, job_id: str) -> bool:
262304
except KeyError as err:
263305
raise DuneError(response_json, "CancellationResponse", err) from err
264306

265-
async def _refresh(
266-
self,
267-
query: QueryBase,
268-
ping_frequency: int = 5,
269-
performance: Optional[str] = None,
270-
) -> str:
271-
"""
272-
Executes a Dune `query`, waits until execution completes,
273-
fetches and returns the results.
274-
Sleeps `ping_frequency` seconds between each status request.
275-
"""
276-
job_id = (await self.execute(query=query, performance=performance)).execution_id
277-
status = await self.get_status(job_id)
278-
while status.state not in ExecutionState.terminal_states():
279-
self.logger.info(
280-
f"waiting for query execution {job_id} to complete: {status}"
281-
)
282-
await asyncio.sleep(ping_frequency)
283-
status = await self.get_status(job_id)
284-
if status.state == ExecutionState.FAILED:
285-
self.logger.error(status)
286-
raise QueryFailed(f"Error data: {status.error}")
287-
288-
return job_id
307+
########################
308+
# Higher level functions
309+
########################
289310

290311
async def refresh(
291312
self,
292313
query: QueryBase,
293314
ping_frequency: int = 5,
294315
performance: Optional[str] = None,
316+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
295317
) -> ResultsResponse:
296318
"""
297319
Executes a Dune `query`, waits until execution completes,
@@ -301,13 +323,14 @@ async def refresh(
301323
job_id = await self._refresh(
302324
query, ping_frequency=ping_frequency, performance=performance
303325
)
304-
return await self.get_result(job_id)
326+
return await self.get_result(job_id, batch_size=batch_size)
305327

306328
async def refresh_csv(
307329
self,
308330
query: QueryBase,
309331
ping_frequency: int = 5,
310332
performance: Optional[str] = None,
333+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
311334
) -> ExecutionResultCSV:
312335
"""
313336
Executes a Dune query, waits till execution completes,
@@ -317,10 +340,13 @@ async def refresh_csv(
317340
job_id = await self._refresh(
318341
query, ping_frequency=ping_frequency, performance=performance
319342
)
320-
return await self.get_result_csv(job_id)
343+
return await self.get_result_csv(job_id, batch_size=batch_size)
321344

322345
async def refresh_into_dataframe(
323-
self, query: QueryBase, performance: Optional[str] = None
346+
self,
347+
query: QueryBase,
348+
performance: Optional[str] = None,
349+
batch_size: int = MAX_NUM_ROWS_PER_BATCH,
324350
) -> Any:
325351
"""
326352
Execute a Dune Query, waits till execution completes,
@@ -334,5 +360,120 @@ async def refresh_into_dataframe(
334360
raise ImportError(
335361
"dependency failure, pandas is required but missing"
336362
) from exc
337-
data = (await self.refresh_csv(query, performance=performance)).data
338-
return pandas.read_csv(data)
363+
results = await self.refresh_csv(
364+
query, performance=performance, batch_size=batch_size
365+
)
366+
return pandas.read_csv(results.data)
367+
368+
#################
369+
# Private Methods
370+
#################
371+
372+
async def _get_result_page(
373+
self,
374+
job_id: str,
375+
limit: int = MAX_NUM_ROWS_PER_BATCH,
376+
offset: int = 0,
377+
) -> ResultsResponse:
378+
"""GET a page of results from Dune API for `job_id` (aka `execution_id`)"""
379+
380+
params = {
381+
"limit": limit,
382+
"offset": offset,
383+
}
384+
response_json = await self._get(
385+
route=f"/execution/{job_id}/results",
386+
params=params,
387+
)
388+
389+
try:
390+
return ResultsResponse.from_dict(response_json)
391+
except KeyError as err:
392+
raise DuneError(response_json, "ResultsResponse", err) from err
393+
394+
async def _get_result_by_url(
395+
self,
396+
url: str,
397+
params: Optional[Dict[str, Any]] = None,
398+
) -> ResultsResponse:
399+
"""
400+
GET results from Dune API with a given URL. This is particularly useful for pagination.
401+
"""
402+
response_json = await self._get(url=url, params=params)
403+
404+
try:
405+
return ResultsResponse.from_dict(response_json)
406+
except KeyError as err:
407+
raise DuneError(response_json, "ResultsResponse", err) from err
408+
409+
async def _get_result_csv_page(
410+
self,
411+
job_id: str,
412+
limit: int = MAX_NUM_ROWS_PER_BATCH,
413+
offset: int = 0,
414+
) -> ExecutionResultCSV:
415+
"""
416+
GET a page of results in CSV format from Dune API for `job_id` (aka `execution_id`)
417+
"""
418+
419+
params = {
420+
"limit": limit,
421+
"offset": offset,
422+
}
423+
424+
route = f"/execution/{job_id}/results/csv"
425+
response = await self._get(route=route, params=params, raw=True)
426+
response.raise_for_status()
427+
428+
next_uri = response.headers.get(DUNE_CSV_NEXT_URI_HEADER)
429+
next_offset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER)
430+
return ExecutionResultCSV(
431+
data=BytesIO(await response.content.read(-1)),
432+
next_uri=next_uri,
433+
next_offset=next_offset,
434+
)
435+
436+
async def _get_result_csv_by_url(
437+
self,
438+
url: str,
439+
params: Optional[Dict[str, Any]] = None,
440+
) -> ExecutionResultCSV:
441+
"""
442+
GET results in CSV format from Dune API with a given URL.
443+
This is particularly useful for pagination.
444+
"""
445+
response = await self._get(url=url, params=params, raw=True)
446+
response.raise_for_status()
447+
448+
next_uri = response.headers.get(DUNE_CSV_NEXT_URI_HEADER)
449+
next_offset = response.headers.get(DUNE_CSV_NEXT_OFFSET_HEADER)
450+
return ExecutionResultCSV(
451+
data=BytesIO(await response.content.read(-1)),
452+
next_uri=next_uri,
453+
next_offset=next_offset,
454+
)
455+
456+
async def _refresh(
457+
self,
458+
query: QueryBase,
459+
ping_frequency: int = 5,
460+
performance: Optional[str] = None,
461+
) -> str:
462+
"""
463+
Executes a Dune `query`, waits until execution completes,
464+
fetches and returns the results.
465+
Sleeps `ping_frequency` seconds between each status request.
466+
"""
467+
job_id = (await self.execute(query=query, performance=performance)).execution_id
468+
status = await self.get_status(job_id)
469+
while status.state not in ExecutionState.terminal_states():
470+
self.logger.info(
471+
f"waiting for query execution {job_id} to complete: {status}"
472+
)
473+
await asyncio.sleep(ping_frequency)
474+
status = await self.get_status(job_id)
475+
if status.state == ExecutionState.FAILED:
476+
self.logger.error(status)
477+
raise QueryFailed(f"Error data: {status.error}")
478+
479+
return job_id

0 commit comments

Comments
 (0)