Skip to content

Commit 3fca092

Browse files
committed
feat: add write_engine parameter to read_FORMATNAME methods to control how data is written to BigQuery
1 parent 62a88e8 commit 3fca092

File tree

14 files changed

+432
-79
lines changed

14 files changed

+432
-79
lines changed

bigframes/dtypes.py

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,19 @@ def arrow_dtype_to_bigframes_dtype(arrow_dtype: pa.DataType) -> Dtype:
343343
return pd.ArrowDtype(arrow_dtype)
344344
if pa.types.is_struct(arrow_dtype):
345345
return pd.ArrowDtype(arrow_dtype)
346+
347+
# BigFrames doesn't distinguish between string and large_string because the
348+
# largest string (2 GB) is already larger than the largest BigQuery row.
349+
if pa.types.is_string(arrow_dtype) or pa.types.is_large_string(arrow_dtype):
350+
return STRING_DTYPE
351+
346352
if arrow_dtype == pa.null():
347353
return DEFAULT_DTYPE
348-
else:
349-
raise ValueError(
350-
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
351-
)
354+
355+
# No other types matched.
356+
raise ValueError(
357+
f"Unexpected Arrow data type {arrow_dtype}. {constants.FEEDBACK_LINK}"
358+
)
352359

353360

354361
_BIGFRAMES_TO_ARROW = {

bigframes/pandas/io/api.py

Lines changed: 34 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
Union,
3131
)
3232

33+
import bigframes_vendored.constants as constants
3334
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
3435
from google.cloud import bigquery
3536
import numpy
@@ -105,6 +106,7 @@ def read_csv(
105106
Literal["c", "python", "pyarrow", "python-fwf", "bigquery"]
106107
] = None,
107108
encoding: Optional[str] = None,
109+
write_engine: constants.WriteEngineType = "default",
108110
**kwargs,
109111
) -> bigframes.dataframe.DataFrame:
110112
return global_session.with_default_session(
@@ -118,6 +120,7 @@ def read_csv(
118120
dtype=dtype,
119121
engine=engine,
120122
encoding=encoding,
123+
write_engine=write_engine,
121124
**kwargs,
122125
)
123126

@@ -135,6 +138,7 @@ def read_json(
135138
encoding: Optional[str] = None,
136139
lines: bool = False,
137140
engine: Literal["ujson", "pyarrow", "bigquery"] = "ujson",
141+
write_engine: constants.WriteEngineType = "default",
138142
**kwargs,
139143
) -> bigframes.dataframe.DataFrame:
140144
return global_session.with_default_session(
@@ -145,6 +149,7 @@ def read_json(
145149
encoding=encoding,
146150
lines=lines,
147151
engine=engine,
152+
write_engine=write_engine,
148153
**kwargs,
149154
)
150155

@@ -245,24 +250,41 @@ def read_gbq_table(
245250

246251

247252
@typing.overload
248-
def read_pandas(pandas_dataframe: pandas.DataFrame) -> bigframes.dataframe.DataFrame:
253+
def read_pandas(
254+
pandas_dataframe: pandas.DataFrame,
255+
*,
256+
write_engine: constants.WriteEngineType = "default",
257+
) -> bigframes.dataframe.DataFrame:
249258
...
250259

251260

252261
@typing.overload
253-
def read_pandas(pandas_dataframe: pandas.Series) -> bigframes.series.Series:
262+
def read_pandas(
263+
pandas_dataframe: pandas.Series,
264+
*,
265+
write_engine: constants.WriteEngineType = "default",
266+
) -> bigframes.series.Series:
254267
...
255268

256269

257270
@typing.overload
258-
def read_pandas(pandas_dataframe: pandas.Index) -> bigframes.core.indexes.Index:
271+
def read_pandas(
272+
pandas_dataframe: pandas.Index,
273+
*,
274+
write_engine: constants.WriteEngineType = "default",
275+
) -> bigframes.core.indexes.Index:
259276
...
260277

261278

262-
def read_pandas(pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index]):
279+
def read_pandas(
280+
pandas_dataframe: Union[pandas.DataFrame, pandas.Series, pandas.Index],
281+
*,
282+
write_engine: constants.WriteEngineType = "default",
283+
):
263284
return global_session.with_default_session(
264285
bigframes.session.Session.read_pandas,
265286
pandas_dataframe,
287+
write_engine=write_engine,
266288
)
267289

268290

@@ -273,25 +295,32 @@ def read_pickle(
273295
filepath_or_buffer: FilePath | ReadPickleBuffer,
274296
compression: CompressionOptions = "infer",
275297
storage_options: StorageOptions = None,
298+
*,
299+
write_engine: constants.WriteEngineType = "default",
276300
):
277301
return global_session.with_default_session(
278302
bigframes.session.Session.read_pickle,
279303
filepath_or_buffer=filepath_or_buffer,
280304
compression=compression,
281305
storage_options=storage_options,
306+
write_engine=write_engine,
282307
)
283308

284309

285310
read_pickle.__doc__ = inspect.getdoc(bigframes.session.Session.read_pickle)
286311

287312

288313
def read_parquet(
289-
path: str | IO["bytes"], *, engine: str = "auto"
314+
path: str | IO["bytes"],
315+
*,
316+
engine: str = "auto",
317+
write_engine: constants.WriteEngineType = "default",
290318
) -> bigframes.dataframe.DataFrame:
291319
return global_session.with_default_session(
292320
bigframes.session.Session.read_parquet,
293321
path,
294322
engine=engine,
323+
write_engine=write_engine,
295324
)
296325

297326

0 commit comments

Comments
 (0)