Skip to content

Commit 8d4815f

Browse files
committed
DOC: Improve and document to_parquet kwargs
1 parent faa1e89 commit 8d4815f

File tree

3 files changed

+40
-5
lines changed

3 files changed

+40
-5
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
## 0.43.1 - TBD
44

5+
#### Enhancements
6+
- Keyword arguments to `DBNStore.to_parquet` will now allow `where` and `schema` to be specified
7+
58
#### Bug fixes
69
- Fixed an issue where validating the checksum of a batch file loaded the entire file into memory
710

databento/common/dbnstore.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -963,7 +963,7 @@ def to_df(
963963
def to_parquet(
964964
self,
965965
path: PathLike[str] | str,
966-
price_type: Literal["fixed", "float"] = "float",
966+
price_type: PriceType | str = PriceType.FLOAT,
967967
pretty_ts: bool = True,
968968
map_symbols: bool = True,
969969
schema: Schema | str | None = None,
@@ -992,6 +992,9 @@ def to_parquet(
992992
This is only required when reading a DBN stream with mixed record types.
993993
mode : str, default "w"
994994
The file write mode to use, either "x" or "w".
995+
**kwargs : Any
996+
Keyword arguments to pass to the `pyarrow.parquet.ParquetWriter`.
997+
These can be used to override the default behavior of the writer.
995998
996999
Raises
9971000
------
@@ -1000,10 +1003,12 @@ def to_parquet(
10001003
If the DBN schema is unspecified and cannot be determined.
10011004
10021005
"""
1003-
if price_type == "decimal":
1006+
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
1007+
price_type = validate_enum(price_type, PriceType, "price_type")
1008+
1009+
if price_type == PriceType.DECIMAL:
10041010
raise ValueError("the 'decimal' price type is not currently supported")
10051011

1006-
file_path = validate_file_write_path(path, "path", exist_ok=mode == "w")
10071012
schema = validate_maybe_enum(schema, Schema, "schema")
10081013
if schema is None:
10091014
if self.schema is None:
@@ -1025,8 +1030,8 @@ def to_parquet(
10251030
# Initialize the writer using the first DataFrame
10261031
parquet_schema = pa.Schema.from_pandas(frame)
10271032
writer = pq.ParquetWriter(
1028-
where=file_path,
1029-
schema=parquet_schema,
1033+
where=kwargs.pop("where", file_path),
1034+
schema=kwargs.pop("schema", parquet_schema),
10301035
**kwargs,
10311036
)
10321037
writer.write_table(

tests/test_historical_bento.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,33 @@ def test_to_parquet(
731731
pd.testing.assert_frame_equal(actual, expected)
732732

733733

734+
def test_to_parquet_kwargs(
735+
monkeypatch: pytest.MonkeyPatch,
736+
tmp_path: Path,
737+
test_data: Callable[[Dataset, Schema], bytes],
738+
) -> None:
739+
# Arrange
740+
monkeypatch.setattr(databento.common.dbnstore, "PARQUET_CHUNK_SIZE", 1)
741+
stub_data = test_data(Dataset.GLBX_MDP3, Schema.MBO)
742+
data = DBNStore.from_bytes(data=stub_data)
743+
parquet_file = tmp_path / "test.parquet"
744+
745+
# Act
746+
expected = data.to_df()
747+
data.to_parquet(
748+
parquet_file,
749+
compression="zstd",
750+
write_statistics="false",
751+
)
752+
actual = pd.read_parquet(parquet_file)
753+
754+
# Replace None values with np.nan
755+
actual.fillna(value=np.nan)
756+
757+
# Assert
758+
pd.testing.assert_frame_equal(actual, expected)
759+
760+
734761
@pytest.mark.parametrize(
735762
"expected_schema",
736763
[pytest.param(schema, id=str(schema)) for schema in Schema.variants()],

0 commit comments

Comments
 (0)