Skip to content

Commit 914b0d4

Browse files
feat: add async support for csv and dataframe methods (#56)
add async versions of get_result_csv, _refresh, refresh_csv and refresh_into_dataframe reduce diff between client.py and client_async.py, to make future maintenance easier
1 parent 742ecac commit 914b0d4

File tree

2 files changed

+130
-62
lines changed

2 files changed

+130
-62
lines changed

dune_client/client.py

Lines changed: 37 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,7 @@ class DuneClient(DuneInterface, BaseDuneClient):
3333
combining the use of endpoints (e.g. refresh)
3434
"""
3535

36-
def _handle_response(
37-
self,
38-
response: Response,
39-
) -> Any:
36+
def _handle_response(self, response: Response) -> Any:
4037
try:
4138
# Some responses can be decoded and converted to DuneErrors
4239
response_json = response.json()
@@ -48,17 +45,24 @@ def _handle_response(
4845
raise ValueError("Unreachable since previous line raises") from err
4946

5047
def _route_url(self, route: str) -> str:
51-
return f"{self.BASE_URL}{self.API_PATH}/{route}"
48+
return f"{self.BASE_URL}{self.API_PATH}{route}"
5249

53-
def _get(self, route: str, params: Optional[Any] = None) -> Any:
50+
def _get(
51+
self,
52+
route: str,
53+
params: Optional[Any] = None,
54+
raw: bool = False,
55+
) -> Any:
5456
url = self._route_url(route)
5557
self.logger.debug(f"GET received input url={url}")
5658
response = requests.get(
57-
url,
58-
headers={"x-dune-api-key": self.token},
59+
url=url,
60+
headers=self.default_headers(),
5961
timeout=self.DEFAULT_TIMEOUT,
6062
params=params,
6163
)
64+
if raw:
65+
return response
6266
return self._handle_response(response)
6367

6468
def _post(self, route: str, params: Any) -> Any:
@@ -67,7 +71,7 @@ def _post(self, route: str, params: Any) -> Any:
6771
response = requests.post(
6872
url=url,
6973
json=params,
70-
headers={"x-dune-api-key": self.token},
74+
headers=self.default_headers(),
7175
timeout=self.DEFAULT_TIMEOUT,
7276
)
7377
return self._handle_response(response)
@@ -76,17 +80,15 @@ def execute(
7680
self, query: Query, performance: Optional[str] = None
7781
) -> ExecutionResponse:
7882
"""Post's to Dune API for execute `query`"""
83+
params = query.request_format()
84+
params["performance"] = performance or self.performance
85+
7986
self.logger.info(
8087
f"executing {query.query_id} on {performance or self.performance} cluster"
8188
)
8289
response_json = self._post(
83-
route=f"query/{query.query_id}/execute",
84-
params={
85-
"query_parameters": {
86-
p.key: p.to_dict()["value"] for p in query.parameters()
87-
},
88-
"performance": performance or self.performance,
89-
},
90+
route=f"/query/{query.query_id}/execute",
91+
params=params,
9092
)
9193
try:
9294
return ExecutionResponse.from_dict(response_json)
@@ -95,17 +97,15 @@ def execute(
9597

9698
def get_status(self, job_id: str) -> ExecutionStatusResponse:
9799
"""GET status from Dune API for `job_id` (aka `execution_id`)"""
98-
response_json = self._get(
99-
route=f"execution/{job_id}/status",
100-
)
100+
response_json = self._get(route=f"/execution/{job_id}/status")
101101
try:
102102
return ExecutionStatusResponse.from_dict(response_json)
103103
except KeyError as err:
104104
raise DuneError(response_json, "ExecutionStatusResponse", err) from err
105105

106106
def get_result(self, job_id: str) -> ResultsResponse:
107107
"""GET results from Dune API for `job_id` (aka `execution_id`)"""
108-
response_json = self._get(route=f"execution/{job_id}/results")
108+
response_json = self._get(route=f"/execution/{job_id}/results")
109109
try:
110110
return ResultsResponse.from_dict(response_json)
111111
except KeyError as err:
@@ -119,13 +119,10 @@ def get_result_csv(self, job_id: str) -> ExecutionResultCSV:
119119
use this method for large results where you want lower CPU and memory overhead
120120
if you need metadata information use get_results() or get_status()
121121
"""
122-
url = self._route_url(f"execution/{job_id}/results/csv")
122+
route = f"/execution/{job_id}/results/csv"
123+
url = self._route_url(f"/execution/{job_id}/results/csv")
123124
self.logger.debug(f"GET CSV received input url={url}")
124-
response = requests.get(
125-
url,
126-
headers={"x-dune-api-key": self.token},
127-
timeout=self.DEFAULT_TIMEOUT,
128-
)
125+
response = self._get(route=route, raw=True)
129126
response.raise_for_status()
130127
return ExecutionResultCSV(data=BytesIO(response.content))
131128

@@ -147,7 +144,7 @@ def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
147144
query_id = int(query)
148145

149146
response_json = self._get(
150-
route=f"query/{query_id}/results",
147+
route=f"/query/{query_id}/results",
151148
params=params,
152149
)
153150
try:
@@ -157,7 +154,10 @@ def get_latest_result(self, query: Union[Query, str, int]) -> ResultsResponse:
157154

158155
def cancel_execution(self, job_id: str) -> bool:
159156
"""POST Execution Cancellation to Dune API for `job_id` (aka `execution_id`)"""
160-
response_json = self._post(route=f"execution/{job_id}/cancel", params=None)
157+
response_json = self._post(
158+
route=f"/execution/{job_id}/cancel",
159+
params=None,
160+
)
161161
try:
162162
# No need to make a dataclass for this since it's just a boolean.
163163
success: bool = response_json["success"]
@@ -171,6 +171,11 @@ def _refresh(
171171
ping_frequency: int = 5,
172172
performance: Optional[str] = None,
173173
) -> str:
174+
"""
175+
Executes a Dune `query`, waits until execution completes,
176+
fetches and returns the results.
177+
Sleeps `ping_frequency` seconds between each status request.
178+
"""
174179
job_id = self.execute(query=query, performance=performance).execution_id
175180
status = self.get_status(job_id)
176181
while status.state not in ExecutionState.terminal_states():
@@ -186,38 +191,28 @@ def _refresh(
186191
return job_id
187192

188193
def refresh(
189-
self,
190-
query: Query,
191-
ping_frequency: int = 5,
192-
performance: Optional[str] = None,
194+
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
193195
) -> ResultsResponse:
194196
"""
195197
Executes a Dune `query`, waits until execution completes,
196198
fetches and returns the results.
197199
Sleeps `ping_frequency` seconds between each status request.
198200
"""
199201
job_id = self._refresh(
200-
query,
201-
ping_frequency=ping_frequency,
202-
performance=performance,
202+
query, ping_frequency=ping_frequency, performance=performance
203203
)
204204
return self.get_result(job_id)
205205

206206
def refresh_csv(
207-
self,
208-
query: Query,
209-
ping_frequency: int = 5,
210-
performance: Optional[str] = None,
207+
self, query: Query, ping_frequency: int = 5, performance: Optional[str] = None
211208
) -> ExecutionResultCSV:
212209
"""
213210
Executes a Dune query, waits till execution completes,
214211
fetches and the results in CSV format
215212
(use it load the data directly in pandas.from_csv() or similar frameworks)
216213
"""
217214
job_id = self._refresh(
218-
query,
219-
ping_frequency=ping_frequency,
220-
performance=performance,
215+
query, ping_frequency=ping_frequency, performance=performance
221216
)
222217
return self.get_result_csv(job_id)
223218

0 commit comments

Comments
 (0)