Skip to content

Commit 2bd224c

Browse files
authored
[WIP] pass performance param & implement "latest results" endpoint (#53)
According to API updates: Add performance param and implement latest results. https://dune.com/docs/api/api-reference/execute-query-id/
1 parent 35e514c commit 2bd224c

File tree

5 files changed

+123
-23
lines changed

5 files changed

+123
-23
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ _version.py
66
.idea/
77
venv/
88
tmp/
9+
.vscode/

dune_client/base_client.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,9 @@ class BaseDuneClient:
2020
API_PATH = "/api/v1"
2121
DEFAULT_TIMEOUT = 10
2222

23-
def __init__(self, api_key: str):
23+
def __init__(self, api_key: str, performance: str = "medium"):
2424
self.token = api_key
25+
self.performance = performance
2526
self.logger = logging.getLogger(__name__)
2627
logging.basicConfig(format="%(asctime)s %(levelname)s %(name)s %(message)s")
2728

dune_client/client.py

Lines changed: 70 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
import time
99
from io import BytesIO
10-
from typing import Any
10+
from typing import Any, Optional, Union
1111

1212
import requests
1313
from requests import Response, JSONDecodeError
@@ -50,13 +50,14 @@ def _handle_response(
5050
def _route_url(self, route: str) -> str:
5151
return f"{self.BASE_URL}{self.API_PATH}/{route}"
5252

53-
def _get(self, route: str) -> Any:
53+
def _get(self, route: str, params: Optional[Any] = None) -> Any:
5454
url = self._route_url(route)
5555
self.logger.debug(f"GET received input url={url}")
5656
response = requests.get(
5757
url,
5858
headers={"x-dune-api-key": self.token},
5959
timeout=self.DEFAULT_TIMEOUT,
60+
params=params,
6061
)
6162
return self._handle_response(response)
6263

@@ -71,14 +72,20 @@ def _post(self, route: str, params: Any) -> Any:
7172
)
7273
return self._handle_response(response)
7374

74-
def execute(self, query: Query) -> ExecutionResponse:
75+
def execute(
76+
self, query: Query, performance: Optional[str] = None
77+
) -> ExecutionResponse:
7578
"""Post's to Dune API for execute `query`"""
79+
self.logger.info(
80+
f"executing {query.query_id} on {performance or self.performance} cluster"
81+
)
7682
response_json = self._post(
7783
route=f"query/{query.query_id}/execute",
7884
params={
7985
"query_parameters": {
8086
p.key: p.to_dict()["value"] for p in query.parameters()
81-
}
87+
},
88+
"performance": performance or self.performance,
8289
},
8390
)
8491
try:
@@ -122,6 +129,32 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
122129
response.raise_for_status()
123130
return ExecutionResultCSV(data=BytesIO(response.content))
124131

132+
def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
133+
"""
134+
GET the latest results for a query_id without having to execute the query again.
135+
136+
:param query: :class:`Query` object OR query id as string | int
137+
138+
https://dune.com/docs/api/api-reference/latest_results/
139+
"""
140+
if isinstance(query, Query):
141+
params = {
142+
f"params.{p.key}": p.to_dict()["value"] for p in query.parameters()
143+
}
144+
query_id = query.query_id
145+
else:
146+
params = None
147+
query_id = query
148+
149+
response_json = self._get(
150+
route=f"query/{query_id}/results",
151+
params=params,
152+
)
153+
try:
154+
return ResultsResponse.from_dict(response_json)
155+
except KeyError as err:
156+
raise DuneError(response_json, "ResultsResponse", err) from err
157+
125158
def cancel_execution(self, job_id: str) -> bool:
126159
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
127160
response_json = self._post(route=f"execution/{job_id}/cancel", params=None)
@@ -132,8 +165,13 @@ def cancel_execution(self, job_id: str) -> bool:
132165
except KeyError as err:
133166
raise DuneError(response_json, "CancellationResponse", err) from err
134167

135-
def _refresh(self, query: Query, ping_frequency: int = 5) -> str:
136-
job_id = self.execute(query).execution_id
168+
def _refresh(
169+
self,
170+
query: Query,
171+
ping_frequency: int = 5,
172+
performance: Optional[str] = None,
173+
) -> str:
174+
job_id = self.execute(query=query, performance=performance).execution_id
137175
status = self.get_status(job_id)
138176
while status.state not in ExecutionState.terminal_states():
139177
self.logger.info(
@@ -147,25 +185,45 @@ def _refresh(self, query: Query, ping_frequency: int = 5) -> str:
147185

148186
return job_id
149187

150-
def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
188+
def refresh(
189+
self,
190+
query: Query,
191+
ping_frequency: int = 5,
192+
performance: Optional[str] = None,
193+
) -> ResultsResponse:
151194
"""
152195
Executes a Dune `query`, waits until execution completes,
153196
fetches and returns the results.
154197
Sleeps `ping_frequency` seconds between each status request.
155198
"""
156-
job_id = self._refresh(query, ping_frequency=ping_frequency)
199+
job_id = self._refresh(
200+
query,
201+
ping_frequency=ping_frequency,
202+
performance=performance,
203+
)
157204
return self.get_result(job_id)
158205

159-
def refresh_csv(self, query: Query, ping_frequency: int = 5) -> ExecutionResultCSV:
206+
def refresh_csv(
207+
self,
208+
query: Query,
209+
ping_frequency: int = 5,
210+
performance: Optional[str] = None,
211+
) -> ExecutionResultCSV:
160212
"""
161213
Executes a Dune query, waits till execution completes,
162214
fetches and the results in CSV format
163215
(use it load the data directly in pandas.from_csv() or similar frameworks)
164216
"""
165-
job_id = self._refresh(query, ping_frequency=ping_frequency)
217+
job_id = self._refresh(
218+
query,
219+
ping_frequency=ping_frequency,
220+
performance=performance,
221+
)
166222
return self.get_result_csv(job_id)
167223

168-
def refresh_into_dataframe(self, query: Query) -> Any:
224+
def refresh_into_dataframe(
225+
self, query: Query, performance: Optional[str] = None
226+
) -> Any:
169227
"""
170228
Execute a Dune Query, waits till execution completes,
171229
fetched and returns the result as a Pandas DataFrame
@@ -178,5 +236,5 @@ def refresh_into_dataframe(self, query: Query) -> Any:
178236
raise ImportError(
179237
"dependency failure, pandas is required but missing"
180238
) from exc
181-
data = self.refresh_csv(query).data
239+
data = self.refresh_csv(query, performance=performance).data
182240
return pandas.read_csv(data)

dune_client/client_async.py

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
"""
66
from __future__ import annotations
77
import asyncio
8-
from typing import Any, Optional
8+
from typing import Any, Optional, Union
99

1010
from aiohttp import (
1111
ClientSession,
@@ -37,13 +37,15 @@ class AsyncDuneClient(BaseDuneClient):
3737

3838
_connection_limit = 3
3939

40-
def __init__(self, api_key: str, connection_limit: int = 3):
40+
def __init__(
41+
self, api_key: str, connection_limit: int = 3, performance: str = "medium"
42+
):
4143
"""
4244
api_key - Dune API key
4345
connection_limit - number of parallel requests to execute.
4446
For non-pro accounts Dune allows only up to 3 requests but that number can be increased.
4547
"""
46-
super().__init__(api_key=api_key)
48+
super().__init__(api_key=api_key, performance=performance)
4749
self._connection_limit = connection_limit
4850
self._session: Optional[ClientSession] = None
4951

@@ -85,13 +87,14 @@ async def _handle_response(
8587
response.raise_for_status()
8688
raise ValueError("Unreachable since previous line raises") from err
8789

88-
async def _get(self, url: str) -> Any:
90+
async def _get(self, url: str, params: Optional[Any] = None) -> Any:
8991
if self._session is None:
9092
raise ValueError("Client is not connected; call `await cl.connect()`")
9193
self.logger.debug(f"GET received input url={url}")
9294
response = await self._session.get(
9395
url=f"{self.API_PATH}{url}",
9496
headers=self.default_headers(),
97+
params=params,
9598
)
9699
return await self._handle_response(response)
97100

@@ -106,11 +109,19 @@ async def _post(self, url: str, params: Any) -> Any:
106109
)
107110
return await self._handle_response(response)
108111

109-
async def execute(self, query: Query) -> ExecutionResponse:
112+
async def execute(
113+
self, query: Query, performance: Optional[str] = None
114+
) -> ExecutionResponse:
110115
"""Post's to Dune API for execute `query`"""
116+
params = query.request_format()
117+
params["performance"] = performance or self.performance
118+
119+
self.logger.info(
120+
f"executing {query.query_id} on {performance or self.performance} cluster"
121+
)
111122
response_json = await self._post(
112123
url=f"/query/{query.query_id}/execute",
113-
params=query.request_format(),
124+
params=params,
114125
)
115126
try:
116127
return ExecutionResponse.from_dict(response_json)
@@ -135,6 +146,30 @@ async def get_result(self, job_id: str) -> ResultsResponse:
135146
except KeyError as err:
136147
raise DuneError(response_json, "ResultsResponse", err) from err
137148

149+
async def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
150+
"""
151+
GET the latest results for a query_id without having to execute the query again.
152+
153+
https://dune.com/docs/api/api-reference/latest_results/
154+
"""
155+
if isinstance(query, Query):
156+
params = {
157+
f"params.{p.key}": p.to_dict()["value"] for p in query.parameters()
158+
}
159+
query_id = query.query_id
160+
else:
161+
params = None
162+
query_id = query
163+
164+
response_json = await self._get(
165+
url=f"/query/{query_id}/results",
166+
params=params,
167+
)
168+
try:
169+
return ResultsResponse.from_dict(response_json)
170+
except KeyError as err:
171+
raise DuneError(response_json, "ResultsResponse", err) from err
172+
138173
async def cancel_execution(self, job_id: str) -> bool:
139174
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
140175
response_json = await self._post(url=f"/execution/{job_id}/cancel", params=None)
@@ -145,13 +180,18 @@ async def cancel_execution(self, job_id: str) -> bool:
145180
except KeyError as err:
146181
raise DuneError(response_json, "CancellationResponse", err) from err
147182

148-
async def refresh(self, query: Query, ping_frequency: int = 5) -> ResultsResponse:
183+
async def refresh(
184+
self,
185+
query: Query,
186+
ping_frequency: int = 5,
187+
performance: Optional[str] = None,
188+
) -> ResultsResponse:
149189
"""
150190
Executes a Dune `query`, waits until execution completes,
151191
fetches and returns the results.
152192
Sleeps `ping_frequency` seconds between each status request.
153193
"""
154-
job_id = (await self.execute(query)).execution_id
194+
job_id = (await self.execute(query=query, performance=performance)).execution_id
155195
status = await self.get_status(job_id)
156196
while status.state not in ExecutionState.terminal_states():
157197
self.logger.info(

dune_client/query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
"""
44
import urllib.parse
55
from dataclasses import dataclass
6-
from typing import Optional, List, Dict
6+
from typing import Optional, List, Dict, Union
77

88
from dune_client.types import QueryParameter
99

@@ -41,7 +41,7 @@ def __hash__(self) -> int:
4141
"""
4242
return self.url().__hash__()
4343

44-
def request_format(self) -> Dict[str, Dict[str, str]]:
44+
def request_format(self) -> Dict[str, Union[Dict[str, str], str, None]]:
4545
"""Transforms Query objects to params to pass in API"""
4646
return {
4747
"query_parameters": {p.key: p.to_dict()["value"] for p in self.parameters()}

0 commit comments

Comments
 (0)