Skip to content

Commit 019804f

Browse files
committed
FIX: Fix writing on timeseries.get_range error
1 parent 38677e7 commit 019804f

29 files changed

+1012
-982
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22

33
## 0.14.0 - TBD
44
- Added support for reusing a `Live` client to reconnect
5-
- Changed iteration of `Live` to no longer yield DBN metadata
6-
- Changed `Live` callbacks to no longer yield DBN metadata
75
- Added `metadata` property to `Live`
86
- Added `DatatbentoLiveProtocol` class
97
- Added support for emitting warnings in API response headers
8+
- Changed iteration of `Live` to no longer yield DBN metadata
9+
- Changed `Live` callbacks to no longer yield DBN metadata
10+
- Fixed issue where `Historical.timeseries.get_range` would write empty files on error
1011
- Fixed issue with `numpy` types not being handled in symbols field
1112
- Upgraded `aiohttp` to 3.8.3
1213
- Upgraded `numpy` to to 1.23.5

databento/__init__.py

Lines changed: 30 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,43 @@
11
import logging
22
import warnings
33

4+
from databento_dbn import ErrorMsg
5+
from databento_dbn import ImbalanceMsg
6+
from databento_dbn import InstrumentDefMsg
7+
from databento_dbn import MBOMsg
8+
from databento_dbn import MBP1Msg
9+
from databento_dbn import MBP10Msg
10+
from databento_dbn import Metadata
11+
from databento_dbn import OHLCVMsg
12+
from databento_dbn import StatMsg
13+
from databento_dbn import SymbolMappingMsg
14+
from databento_dbn import SystemMsg
15+
from databento_dbn import TradeMsg
16+
417
from databento.common import bentologging
518
from databento.common.dbnstore import DBNStore
6-
from databento.common.enums import (
7-
Compression,
8-
Dataset,
9-
Delivery,
10-
Encoding,
11-
FeedMode,
12-
HistoricalGateway,
13-
Packaging,
14-
RecordFlags,
15-
RollRule,
16-
Schema,
17-
SplitDuration,
18-
SType,
19-
SymbologyResolution,
20-
)
21-
from databento.common.error import (
22-
BentoClientError,
23-
BentoError,
24-
BentoHttpError,
25-
BentoServerError,
26-
)
19+
from databento.common.enums import Compression
20+
from databento.common.enums import Dataset
21+
from databento.common.enums import Delivery
22+
from databento.common.enums import Encoding
23+
from databento.common.enums import FeedMode
24+
from databento.common.enums import HistoricalGateway
25+
from databento.common.enums import Packaging
26+
from databento.common.enums import RecordFlags
27+
from databento.common.enums import RollRule
28+
from databento.common.enums import Schema
29+
from databento.common.enums import SplitDuration
30+
from databento.common.enums import SType
31+
from databento.common.enums import SymbologyResolution
32+
from databento.common.error import BentoClientError
33+
from databento.common.error import BentoError
34+
from databento.common.error import BentoHttpError
35+
from databento.common.error import BentoServerError
2736
from databento.historical.api import API_VERSION
2837
from databento.historical.client import Historical
2938
from databento.live import DBNRecord
3039
from databento.live.client import Live
3140
from databento.version import __version__ # noqa
32-
from databento_dbn import (
33-
ErrorMsg,
34-
ImbalanceMsg,
35-
InstrumentDefMsg,
36-
MBOMsg,
37-
MBP1Msg,
38-
MBP10Msg,
39-
Metadata,
40-
OHLCVMsg,
41-
StatMsg,
42-
SymbolMappingMsg,
43-
SystemMsg,
44-
TradeMsg,
45-
)
4641

4742

4843
__all__ = [

databento/common/data.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Dict, List, Tuple, Union
22

33
import numpy as np
4+
45
from databento.common.enums import Schema
56

67

databento/common/dbnstore.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -495,7 +495,8 @@ def _map_symbols(self, df: pd.DataFrame, pretty_ts: bool) -> pd.DataFrame:
495495
df_index = df.index if pretty_ts else pd.to_datetime(df.index, utc=True)
496496
dates = [ts.date() for ts in df_index]
497497
df["symbol"] = [
498-
self._instrument_id_index[dates[i]][p] for i, p in enumerate(df["instrument_id"])
498+
self._instrument_id_index[dates[i]][p]
499+
for i, p in enumerate(df["instrument_id"])
499500
]
500501

501502
return df

databento/common/validation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
from os import PathLike
55
from pathlib import Path
66
from typing import Optional, Type, TypeVar, Union
7-
from urllib.parse import urlsplit, urlunsplit
7+
from urllib.parse import urlsplit
8+
from urllib.parse import urlunsplit
89

910

1011
E = TypeVar("E", bound=Enum)

databento/historical/api/timeseries.py

Lines changed: 5 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from __future__ import annotations
22

33
from datetime import date
4-
from io import BufferedIOBase
5-
from io import BytesIO
64
from os import PathLike
75
from typing import List, Optional, Tuple, Union
86

@@ -118,24 +116,13 @@ def get_range(
118116
if limit is not None:
119117
params.append(("limit", str(limit)))
120118

121-
if path is not None:
122-
writer: BufferedIOBase = open(path, "x+b")
123-
else:
124-
writer = BytesIO()
125-
126-
self._stream(
119+
return self._stream(
127120
url=self._base_url + ".get_range",
128121
params=params,
129122
basic_auth=True,
130-
writer=writer,
123+
path=path,
131124
)
132125

133-
if path is not None:
134-
writer.close()
135-
return DBNStore.from_file(path)
136-
writer.seek(0) # rewind for read
137-
return DBNStore.from_bytes(writer.read())
138-
139126
async def get_range_async(
140127
self,
141128
dataset: Union[Dataset, str],
@@ -218,23 +205,13 @@ async def get_range_async(
218205
("compression", str(Compression.ZSTD)), # Always request zstd
219206
]
220207

208+
# Optional Parameters
221209
if limit is not None:
222210
params.append(("limit", str(limit)))
223211

224-
if path is not None:
225-
writer: BufferedIOBase = open(path, "x+b")
226-
else:
227-
writer = BytesIO()
228-
229-
await self._stream_async(
212+
return await self._stream_async(
230213
url=self._base_url + ".get_range",
231214
params=params,
232215
basic_auth=True,
233-
writer=writer,
216+
path=path,
234217
)
235-
236-
if path is not None:
237-
writer.close()
238-
return DBNStore.from_file(path)
239-
writer.seek(0) # rewind for read
240-
return DBNStore.from_bytes(writer.read())

databento/historical/http.py

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1+
from __future__ import annotations
2+
13
import json
24
import sys
35
import warnings
4-
from io import BufferedIOBase
6+
from io import BytesIO
57
from json.decoder import JSONDecodeError
6-
from typing import Any, List, Optional, Tuple, Union
8+
from os import PathLike
9+
from typing import IO, Any, List, Optional, Tuple, Union
710

811
import aiohttp
912
import requests
@@ -12,6 +15,7 @@
1215
from requests import Response
1316
from requests.auth import HTTPBasicAuth
1417

18+
from databento.common.dbnstore import DBNStore
1519
from databento.common.error import BentoClientError
1620
from databento.common.error import BentoDeprecationWarning
1721
from databento.common.error import BentoServerError
@@ -108,8 +112,8 @@ def _stream(
108112
url: str,
109113
params: List[Tuple[str, Optional[str]]],
110114
basic_auth: bool,
111-
writer: BufferedIOBase,
112-
) -> None:
115+
path: Optional[Union[PathLike[str], str]] = None,
116+
) -> DBNStore:
113117
self._check_api_key()
114118

115119
with requests.get(
@@ -123,16 +127,28 @@ def _stream(
123127
check_backend_warnings(response)
124128
check_http_error(response)
125129

130+
if path is None:
131+
writer: IO[bytes] = BytesIO()
132+
else:
133+
writer = open(path, "x+b")
134+
126135
for chunk in response.iter_content(chunk_size=_32KB):
127136
writer.write(chunk)
128137

138+
if path is None:
139+
writer.seek(0)
140+
return DBNStore.from_bytes(writer)
141+
142+
writer.close()
143+
return DBNStore.from_file(path)
144+
129145
async def _stream_async(
130146
self,
131147
url: str,
132148
params: List[Tuple[str, Optional[str]]],
133149
basic_auth: bool,
134-
writer: BufferedIOBase,
135-
) -> None:
150+
path: Optional[Union[PathLike[str], str]] = None,
151+
) -> DBNStore:
136152
self._check_api_key()
137153

138154
async with aiohttp.ClientSession() as session:
@@ -148,9 +164,21 @@ async def _stream_async(
148164
check_backend_warnings(response)
149165
await check_http_error_async(response)
150166

167+
if path is None:
168+
writer: IO[bytes] = BytesIO()
169+
else:
170+
writer = open(path, "x+b")
171+
151172
async for chunk in response.content.iter_chunks():
152173
writer.write(chunk[0])
153174

175+
if path is None:
176+
writer.seek(0)
177+
return DBNStore.from_bytes(writer)
178+
179+
writer.close()
180+
return DBNStore.from_file(path)
181+
154182

155183
def is_400_series_error(status: int) -> bool:
156184
return status // 100 == 4

databento/live/protocol.py

Lines changed: 5 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
from __future__ import annotations
2+
13
import asyncio
24
import logging
3-
from functools import singledispatch
4-
from functools import update_wrapper
5+
from collections.abc import Iterable
6+
from functools import singledispatchmethod
57
from numbers import Number
6-
from typing import Any, Callable, Iterable, Optional, Union
8+
from typing import Optional, Union
79

810
import databento_dbn
911

@@ -46,32 +48,6 @@
4648
logger = logging.getLogger(__name__)
4749

4850

49-
def singledispatchmethod(
50-
func: Callable[..., Any],
51-
) -> Any:
52-
"""
53-
Decorate a function to dispatch arguments by type.
54-
This is a custom implementation of functools.singledispatchmethod.
55-
56-
See Also
57-
--------
58-
functools.singledispatch
59-
60-
Notes
61-
-----
62-
This should be removed when python 3.7 is no longer supported.
63-
64-
"""
65-
dispatcher: Any = singledispatch(func)
66-
67-
def _wrapper(*args: object, **kw: object) -> Any:
68-
return dispatcher.dispatch(args[1].__class__)(*args, **kw)
69-
70-
setattr(_wrapper, "register", getattr(dispatcher, "register"))
71-
update_wrapper(_wrapper, func)
72-
return _wrapper
73-
74-
7551
class DatabentoLiveProtocol(asyncio.BufferedProtocol):
7652
"""
7753
A BufferedProtocol implementation for the Databento live subscription

examples/historical_symbology_resolve.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from pprint import pprint
22

3-
from databento import Historical, SType
3+
from databento import Historical
4+
from databento import SType
45

56

67
if __name__ == "__main__":

examples/historical_timeseries_async.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import asyncio
22
from pprint import pprint
33

4-
from databento import DBNStore, Historical
4+
from databento import DBNStore
5+
from databento import Historical
56

67

78
async def example_get_range_async() -> None:

0 commit comments

Comments
 (0)