Skip to content

Commit a633421

Browse files
committed
ADD: Add programmatic batch downloads
1 parent ead74c6 commit a633421

File tree

12 files changed

+275
-39
lines changed

12 files changed

+275
-39
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33
## 0.8.0 - TBD
44
- Renamed 'dbz' encoding to 'dbn'
5+
- Added `batch.list_files(...)` method
6+
- Added `batch.download(...)` method
57
- Changed `.to_df(...)` `pretty_ts` default argument to `True`
68
- Changed `.to_df(...)` `pretty_px` default argument to `True`
79
- Changed `.to_df(...)` `map_symbols` default argument to `True`

databento/common/bento.py

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import datetime as dt
22
import io
33
import os.path
4-
from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Optional
4+
from pathlib import Path
5+
from typing import TYPE_CHECKING, Any, BinaryIO, Callable, Dict, List, Optional, Union
56

67
import numpy as np
78
import pandas as pd
@@ -536,13 +537,13 @@ def replay(self, callback: Callable[[Any], None]) -> None:
536537
callback(record[0])
537538

538539
@staticmethod
539-
def from_file(path: str) -> "FileBento":
540+
def from_file(path: Union[Path, str]) -> "FileBento":
540541
"""
541542
Load the data from a DBN file at the given path.
542543
543544
Parameters
544545
----------
545-
path : str
546+
path : Path or str
546547
The path to read from.
547548
548549
Returns
@@ -569,7 +570,7 @@ def from_file(path: str) -> "FileBento":
569570

570571
return bento
571572

572-
def to_file(self, path: str) -> "FileBento":
573+
def to_file(self, path: Union[Path, str]) -> "FileBento":
573574
"""
574575
Write the data to a DBN file at the given path.
575576
@@ -591,13 +592,13 @@ def to_file(self, path: str) -> "FileBento":
591592

592593
return bento
593594

594-
def to_csv(self, path: str) -> None:
595+
def to_csv(self, path: Union[Path, str]) -> None:
595596
"""
596597
Write the data to a file in CSV format.
597598
598599
Parameters
599600
----------
600-
path : str
601+
path : Path or str
601602
The file path to write to.
602603
603604
Notes
@@ -611,13 +612,13 @@ def to_csv(self, path: str) -> None:
611612
map_symbols=False,
612613
).to_csv(path)
613614

614-
def to_json(self, path: str) -> None:
615+
def to_json(self, path: Union[Path, str]) -> None:
615616
"""
616617
Write the data to a file in JSON format.
617618
618619
Parameters
619620
----------
620-
path : str
621+
path : Path or str
621622
The file path to write to.
622623
623624
Notes
@@ -664,7 +665,7 @@ def request_symbology(self, client: "Historical") -> Dict[str, Any]:
664665
def request_full_definitions(
665666
self,
666667
client: "Historical",
667-
path: Optional[str] = None,
668+
path: Optional[Union[Path, str]] = None,
668669
) -> "Bento":
669670
"""
670671
Request full instrument definitions based on the metadata properties.
@@ -675,7 +676,7 @@ def request_full_definitions(
675676
----------
676677
client : Historical
677678
The historical client to use for the request (contains the API key).
678-
path : str, optional
679+
path : Path or str, optional
679680
The path to stream the data to on disk (will then return a `FileBento`).
680681
681682
Returns
@@ -771,11 +772,11 @@ class FileBento(Bento):
771772
772773
Parameters
773774
----------
774-
path : str
775+
path : Path or str
775776
The path to the data file.
776777
"""
777778

778-
def __init__(self, path: str):
779+
def __init__(self, path: Union[Path, str]):
779780
super().__init__()
780781

781782
self._path = path
@@ -790,7 +791,7 @@ def path(self) -> str:
790791
str
791792
792793
"""
793-
return self._path
794+
return str(self._path)
794795

795796
@property
796797
def nbytes(self) -> int:

databento/common/parsing.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def optional_values_list_to_string(
4242
4343
Returns
4444
-------
45-
str or ``None``
45+
str or `None`
4646
4747
"""
4848
if values is None:
@@ -99,7 +99,7 @@ def optional_date_to_string(value: Optional[Union[date, str]]) -> Optional[str]:
9999
100100
Returns
101101
-------
102-
str or ``None``
102+
str or `None`
103103
104104
"""
105105
if value is None:
@@ -155,7 +155,7 @@ def optional_datetime_to_string(
155155
156156
Returns
157157
-------
158-
str or ``None``
158+
str or `None`
159159
160160
"""
161161
if value is None:

databento/historical/api/batch.py

Lines changed: 143 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
1+
import os
12
from datetime import date
3+
from pathlib import Path
24
from typing import Any, Dict, List, Optional, Tuple, Union
35

46
import pandas as pd
7+
import requests
58
from databento.common.enums import (
69
Compression,
710
Dataset,
@@ -12,6 +15,7 @@
1215
SplitDuration,
1316
SType,
1417
)
18+
from databento.common.logging import log_error, log_info
1519
from databento.common.parsing import (
1620
datetime_to_string,
1721
optional_datetime_to_string,
@@ -20,7 +24,8 @@
2024
)
2125
from databento.common.validation import validate_enum
2226
from databento.historical.api import API_VERSION
23-
from databento.historical.http import BentoHttpAPI
27+
from databento.historical.http import BentoHttpAPI, check_http_error
28+
from requests.auth import HTTPBasicAuth
2429

2530

2631
class BatchHttpAPI(BentoHttpAPI):
@@ -69,7 +74,7 @@ def submit_job(
6974
symbols : List[Union[str, int]] or str
7075
The product symbols to filter for. Takes up to 2,000 symbols per request.
7176
If more than 1 symbol is specified, the data is merged and sorted by time.
72-
If 'ALL_SYMBOLS' or ``None`` then will be for **all** symbols.
77+
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
7378
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
7479
The data record schema for the request.
7580
encoding : Encoding or str {'dbn', 'csv', 'json'}, default 'dbn'
@@ -90,7 +95,7 @@ def submit_job(
9095
stype_out : SType or str, default 'product_id'
9196
The output symbology type to resolve to.
9297
limit : int, optional
93-
The maximum number of records for the request. If ``None`` then no limit.
98+
The maximum number of records for the request. If `None` then no limit.
9499
95100
Returns
96101
-------
@@ -172,3 +177,138 @@ def list_jobs(
172177
params=params,
173178
basic_auth=True,
174179
).json()
180+
181+
def list_files(self, job_id: str) -> List[Dict[str, Any]]:
182+
"""
183+
Request details of all files for a specific batch job.
184+
185+
Makes a `GET /batch.list_files` HTTP request.
186+
187+
Parameters
188+
----------
189+
job_id : str
190+
The batch job identifier.
191+
192+
Returns
193+
-------
194+
List[Dict[str, Any]]
195+
The file details for the batch job.
196+
197+
"""
198+
params: List[Tuple[str, str]] = [
199+
("job_id", job_id),
200+
]
201+
202+
return self._get(
203+
url=self._base_url + ".list_files",
204+
params=params,
205+
basic_auth=True,
206+
).json()
207+
208+
def download(
209+
self,
210+
output_dir: Union[Path, str],
211+
job_id: str,
212+
filename_to_download: Optional[str] = None,
213+
) -> None:
214+
"""
215+
Download a batch job or a specific file to `{output_dir}/{job_id}/`.
216+
- Creates the directories if any do not already exist
217+
- Partially downloaded files will be retried using a range request
218+
219+
Makes one or many `GET /batch.download` HTTP request(s).
220+
221+
Parameters
222+
----------
223+
output_dir: Path or str
224+
The directory to download the file(s) to.
225+
job_id : str
226+
The batch job identifier.
227+
filename_to_download : str, optional
228+
The specific file to download.
229+
If `None` then will download all files for the batch job.
230+
231+
"""
232+
params: List[Tuple[str, str]] = [
233+
("job_id", job_id),
234+
]
235+
236+
job_files: List[Dict[str, Union[str, int]]] = self._get(
237+
url=self._base_url + ".list_files",
238+
params=params,
239+
basic_auth=True,
240+
).json()
241+
242+
if not job_files:
243+
log_error(f"Cannot download batch job {job_id} (no files found).")
244+
return
245+
246+
if filename_to_download:
247+
# A specific file is being requested
248+
is_file_found = False
249+
for details in job_files:
250+
if details["filename"] == filename_to_download:
251+
# Reduce files to download only the single file
252+
job_files = [details]
253+
is_file_found = True
254+
break
255+
if not is_file_found:
256+
log_error(
257+
f"Cannot download batch job {job_id} file "
258+
f"({filename_to_download} not found)",
259+
)
260+
return
261+
262+
# Prepare job directory
263+
job_dir = os.path.join(output_dir, job_id)
264+
os.makedirs(job_dir, exist_ok=True)
265+
266+
for details in job_files:
267+
filename = str(details["filename"])
268+
output_path = os.path.join(job_dir, filename)
269+
log_info(
270+
f"Downloading batch job file {filename} to {output_path} ...",
271+
)
272+
273+
self._download_file(
274+
job_id=job_id,
275+
filename=filename,
276+
filesize=int(details["size"]),
277+
output_path=output_path,
278+
)
279+
280+
def _download_file(
281+
self,
282+
job_id: str,
283+
filename: str,
284+
filesize: int,
285+
output_path: str,
286+
) -> None:
287+
params: List[Tuple[str, str]] = [
288+
("job_id", job_id),
289+
("filename", filename),
290+
]
291+
292+
headers: Dict[str, str] = self._headers.copy()
293+
294+
# Check if file already exists in partially downloaded state
295+
if os.path.isfile(output_path):
296+
existing_size = os.path.getsize(output_path)
297+
if existing_size < filesize:
298+
# Make range request for partial download,
299+
# will be from next byte to end of file.
300+
headers["Range"] = f"bytes={existing_size}-{filesize - 1}"
301+
302+
with requests.get(
303+
url=self._base_url + ".download",
304+
params=params,
305+
headers=headers,
306+
auth=HTTPBasicAuth(username=self._key, password=None),
307+
allow_redirects=True,
308+
stream=True,
309+
) as response:
310+
check_http_error(response)
311+
312+
with open(output_path, mode="wb") as f:
313+
for chunk in response.iter_content():
314+
f.write(chunk)

databento/historical/api/metadata.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -302,14 +302,14 @@ def get_record_count(
302302
If an integer is passed, then this represents nanoseconds since UNIX epoch.
303303
symbols : List[Union[str, int]] or str, optional
304304
The product symbols to filter for. Takes up to 2,000 symbols per request.
305-
If 'ALL_SYMBOLS' or ``None`` then will be for **all** symbols.
305+
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
306306
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
307307
The data record schema for the request.
308308
stype_in : SType or str, default 'native'
309309
The input symbology type to resolve from.
310310
limit : int, optional
311311
The maximum number of records to include in the query.
312-
If ``None`` then no limit.
312+
If `None` then no limit.
313313
314314
Returns
315315
-------
@@ -368,13 +368,13 @@ def get_billable_size(
368368
If an integer is passed, then this represents nanoseconds since UNIX epoch.
369369
symbols : List[Union[str, int]] or str, optional
370370
The product symbols to filter for. Takes up to 2,000 symbols per request.
371-
If 'ALL_SYMBOLS' or ``None`` then will be for **all** symbols.
371+
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
372372
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
373373
The data record schema for the request.
374374
stype_in : SType or str, default 'native'
375375
The input symbology type to resolve from.
376376
limit : int, optional
377-
The maximum number of records to include in the query. If ``None`` then no limit.
377+
The maximum number of records to include in the query. If `None` then no limit.
378378
379379
Returns
380380
-------
@@ -436,13 +436,13 @@ def get_cost(
436436
The data feed mode for the request.
437437
symbols : List[Union[str, int]] or str, optional
438438
The product symbols to filter for. Takes up to 2,000 symbols per request.
439-
If 'ALL_SYMBOLS' or ``None`` then will be for **all** symbols.
439+
If 'ALL_SYMBOLS' or `None` then will be for **all** symbols.
440440
schema : Schema or str {'mbo', 'mbp-1', 'mbp-10', 'trades', 'tbbo', 'ohlcv-1s', 'ohlcv-1m', 'ohlcv-1h', 'ohlcv-1d', 'definition', 'statistics', 'status'}, default 'trades' # noqa
441441
The data record schema for the request.
442442
stype_in : SType or str, default 'native'
443443
The input symbology type to resolve from.
444444
limit : int, optional
445-
The maximum number of records to include in the query. If ``None`` then no limit.
445+
The maximum number of records to include in the query. If `None` then no limit.
446446
447447
Returns
448448
-------

0 commit comments

Comments
 (0)