|
37 | 37 | from databento_dbn import SType |
38 | 38 | from databento_dbn import Transcoder |
39 | 39 | from databento_dbn import VersionUpgradePolicy |
40 | | -from pandas.io.common import os |
41 | 40 |
|
42 | 41 | from databento.common.constants import DEFINITION_TYPE_MAX_MAP |
43 | 42 | from databento.common.constants import INT64_NULL |
@@ -1083,15 +1082,16 @@ def to_ndarray( |
1083 | 1082 | if schema is None: |
1084 | 1083 | raise ValueError("a schema must be specified for mixed DBN data") |
1085 | 1084 |
|
1086 | | - schema_struct = self._schema_struct_map[schema] |
1087 | | - schema_rtype = RType.from_schema(schema) |
| 1085 | + # Always use the latest since DBNStore iteration upgrades |
| 1086 | + schema_struct = SCHEMA_STRUCT_MAP[schema] |
1088 | 1087 | schema_dtype = schema_struct._dtypes |
| 1088 | + schema_rtype = RType.from_schema(schema) |
| 1089 | + schema_filter = filter(lambda r: r.rtype == schema_rtype, self) |
1089 | 1090 |
|
1090 | 1091 | reader = self.reader |
1091 | 1092 | reader.seek(self._metadata_length) |
1092 | 1093 | ndarray_iter = NDArrayBytesIterator( |
1093 | | - stream=reader, |
1094 | | - rtype=schema_rtype, |
| 1094 | + records=map(bytes, schema_filter), |
1095 | 1095 | dtype=schema_dtype, |
1096 | 1096 | count=count, |
1097 | 1097 | ) |
@@ -1234,35 +1234,22 @@ class NDArrayBytesIterator(NDArrayIterator): |
1234 | 1234 |
|
1235 | 1235 | def __init__( |
1236 | 1236 | self, |
1237 | | - stream: IO[bytes], |
1238 | | - rtype: RType, |
| 1237 | + records: Iterator[bytes], |
1239 | 1238 | dtype: list[tuple[str, str]], |
1240 | 1239 | count: int | None, |
1241 | 1240 | ): |
1242 | | - self._stream = stream |
1243 | | - self._rtype = rtype |
| 1241 | + self._records = records |
1244 | 1242 | self._dtype = dtype |
1245 | 1243 | self._count = count |
1246 | 1244 | self._first_next = True |
1247 | 1245 |
|
1248 | 1246 | def __iter__(self) -> NDArrayIterator: |
1249 | 1247 | return self |
1250 | 1248 |
|
1251 | | - def __iter_rtype__(self) -> Generator[bytes, None, None]: |
1252 | | - while header := self._stream.read(2): |
1253 | | - length, rtype = header[:2] |
1254 | | - read_size = length * 4 - 2 |
1255 | | - if rtype == self._rtype: |
1256 | | - yield header + self._stream.read(read_size) |
1257 | | - else: |
1258 | | - self._stream.seek(read_size, os.SEEK_CUR) |
1259 | | - return |
1260 | | - |
1261 | 1249 | def __next__(self) -> np.ndarray[Any, Any]: |
1262 | 1250 | record_bytes = BytesIO() |
1263 | 1251 | num_records = 0 |
1264 | | - |
1265 | | - for record in itertools.islice(self.__iter_rtype__(), self._count): |
| 1252 | + for record in itertools.islice(self._records, self._count): |
1266 | 1253 | num_records += 1 |
1267 | 1254 | record_bytes.write(record) |
1268 | 1255 |
|
|
0 commit comments