Skip to content

Commit cbbbce3

Browse files
refactor: Unify bigquery execution paths (#2007)
1 parent a7963fe commit cbbbce3

File tree

9 files changed

+375
-359
lines changed

9 files changed

+375
-359
lines changed

bigframes/core/blocks.py

Lines changed: 55 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
import bigframes.exceptions as bfe
7070
import bigframes.operations as ops
7171
import bigframes.operations.aggregations as agg_ops
72-
from bigframes.session import dry_runs
72+
from bigframes.session import dry_runs, execution_spec
7373
from bigframes.session import executor as executors
7474

7575
# Type constraint for wherever column labels are used
@@ -257,7 +257,10 @@ def shape(self) -> typing.Tuple[int, int]:
257257
except Exception:
258258
pass
259259

260-
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
260+
row_count = self.session._executor.execute(
261+
self.expr.row_count(),
262+
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
263+
).to_py_scalar()
261264
return (row_count, len(self.value_columns))
262265

263266
@property
@@ -557,8 +560,17 @@ def to_arrow(
557560
allow_large_results: Optional[bool] = None,
558561
) -> Tuple[pa.Table, Optional[bigquery.QueryJob]]:
559562
"""Run query and download results as a pyarrow Table."""
563+
under_10gb = (
564+
(not allow_large_results)
565+
if (allow_large_results is not None)
566+
else not bigframes.options._allow_large_results
567+
)
560568
execute_result = self.session._executor.execute(
561-
self.expr, ordered=ordered, use_explicit_destination=allow_large_results
569+
self.expr,
570+
execution_spec.ExecutionSpec(
571+
promise_under_10gb=under_10gb,
572+
ordered=ordered,
573+
),
562574
)
563575
pa_table = execute_result.to_arrow_table()
564576

@@ -647,8 +659,15 @@ def try_peek(
647659
self, n: int = 20, force: bool = False, allow_large_results=None
648660
) -> typing.Optional[pd.DataFrame]:
649661
if force or self.expr.supports_fast_peek:
650-
result = self.session._executor.peek(
651-
self.expr, n, use_explicit_destination=allow_large_results
662+
# really, we should just block insane peek values and always assume <10gb
663+
under_10gb = (
664+
(not allow_large_results)
665+
if (allow_large_results is not None)
666+
else not bigframes.options._allow_large_results
667+
)
668+
result = self.session._executor.execute(
669+
self.expr,
670+
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
652671
)
653672
df = result.to_pandas()
654673
return self._copy_index_to_pandas(df)
@@ -665,10 +684,18 @@ def to_pandas_batches(
665684
666685
page_size and max_results determine the size and number of batches,
667686
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result"""
687+
688+
under_10gb = (
689+
(not allow_large_results)
690+
if (allow_large_results is not None)
691+
else not bigframes.options._allow_large_results
692+
)
668693
execute_result = self.session._executor.execute(
669694
self.expr,
670-
ordered=True,
671-
use_explicit_destination=allow_large_results,
695+
execution_spec.ExecutionSpec(
696+
promise_under_10gb=under_10gb,
697+
ordered=True,
698+
),
672699
)
673700

674701
# To reduce the number of edge cases to consider when working with the
@@ -714,10 +741,17 @@ def _materialize_local(
714741
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
715742
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
716743
# TODO(swast): Allow for dry run and timeout.
744+
under_10gb = (
745+
(not materialize_options.allow_large_results)
746+
if (materialize_options.allow_large_results is not None)
747+
else (not bigframes.options._allow_large_results)
748+
)
717749
execute_result = self.session._executor.execute(
718750
self.expr,
719-
ordered=materialize_options.ordered,
720-
use_explicit_destination=materialize_options.allow_large_results,
751+
execution_spec.ExecutionSpec(
752+
promise_under_10gb=under_10gb,
753+
ordered=materialize_options.ordered,
754+
),
721755
)
722756
sample_config = materialize_options.downsampling
723757
if execute_result.total_bytes is not None:
@@ -1598,9 +1632,19 @@ def retrieve_repr_request_results(
15981632
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
15991633
)
16001634
head_result = self.session._executor.execute(
1601-
self.expr.slice(start=None, stop=max_results, step=None)
1635+
self.expr.slice(start=None, stop=max_results, step=None),
1636+
execution_spec.ExecutionSpec(
1637+
promise_under_10gb=True,
1638+
ordered=True,
1639+
),
16021640
)
1603-
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
1641+
row_count = self.session._executor.execute(
1642+
self.expr.row_count(),
1643+
execution_spec.ExecutionSpec(
1644+
promise_under_10gb=True,
1645+
ordered=False,
1646+
),
1647+
).to_py_scalar()
16041648

16051649
head_df = head_result.to_pandas()
16061650
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job

bigframes/core/indexes/base.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import bigframes.operations as ops
3939
import bigframes.operations.aggregations as agg_ops
4040
import bigframes.series
41+
import bigframes.session.execution_spec as ex_spec
4142

4243
if typing.TYPE_CHECKING:
4344
import bigframes.dataframe
@@ -283,8 +284,9 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
283284
# Check if key exists at all by counting
284285
count_agg = ex.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id))
285286
count_result = filtered_block._expr.aggregate([(count_agg, "count")])
287+
286288
count_scalar = self._block.session._executor.execute(
287-
count_result
289+
count_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
288290
).to_py_scalar()
289291

290292
if count_scalar == 0:
@@ -295,7 +297,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
295297
min_agg = ex.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id))
296298
position_result = filtered_block._expr.aggregate([(min_agg, "position")])
297299
position_scalar = self._block.session._executor.execute(
298-
position_result
300+
position_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
299301
).to_py_scalar()
300302
return int(position_scalar)
301303

@@ -326,7 +328,10 @@ def _get_monotonic_slice(self, filtered_block, offsets_id: str) -> slice:
326328
combined_result = filtered_block._expr.aggregate(min_max_aggs)
327329

328330
# Execute query and extract positions
329-
result_df = self._block.session._executor.execute(combined_result).to_pandas()
331+
result_df = self._block.session._executor.execute(
332+
combined_result,
333+
execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True),
334+
).to_pandas()
330335
min_pos = int(result_df["min_pos"].iloc[0])
331336
max_pos = int(result_df["max_pos"].iloc[0])
332337

bigframes/dataframe.py

Lines changed: 34 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@
8686
import bigframes.operations.structs
8787
import bigframes.series
8888
import bigframes.session._io.bigquery
89+
import bigframes.session.execution_spec as ex_spec
8990

9091
if typing.TYPE_CHECKING:
9192
from _typeshed import SupportsRichComparison
@@ -4268,17 +4269,19 @@ def to_csv(
42684269
index=index and self._has_index,
42694270
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
42704271
)
4271-
options = {
4272+
options: dict[str, Union[bool, str]] = {
42724273
"field_delimiter": sep,
42734274
"header": header,
42744275
}
4275-
query_job = self._session._executor.export_gcs(
4276+
result = self._session._executor.execute(
42764277
export_array.rename_columns(id_overrides),
4277-
path_or_buf,
4278-
format="csv",
4279-
export_options=options,
4278+
ex_spec.ExecutionSpec(
4279+
ex_spec.GcsOutputSpec(
4280+
uri=path_or_buf, format="csv", export_options=tuple(options.items())
4281+
)
4282+
),
42804283
)
4281-
self._set_internal_query_job(query_job)
4284+
self._set_internal_query_job(result.query_job)
42824285
return None
42834286

42844287
def to_json(
@@ -4321,13 +4324,13 @@ def to_json(
43214324
index=index and self._has_index,
43224325
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
43234326
)
4324-
query_job = self._session._executor.export_gcs(
4327+
result = self._session._executor.execute(
43254328
export_array.rename_columns(id_overrides),
4326-
path_or_buf,
4327-
format="json",
4328-
export_options={},
4329+
ex_spec.ExecutionSpec(
4330+
ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=())
4331+
),
43294332
)
4330-
self._set_internal_query_job(query_job)
4333+
self._set_internal_query_job(result.query_job)
43314334
return None
43324335

43334336
def to_gbq(
@@ -4400,16 +4403,21 @@ def to_gbq(
44004403
)
44014404
)
44024405

4403-
query_job = self._session._executor.export_gbq(
4406+
result = self._session._executor.execute(
44044407
export_array.rename_columns(id_overrides),
4405-
destination=destination,
4406-
cluster_cols=clustering_fields,
4407-
if_exists=if_exists,
4408+
ex_spec.ExecutionSpec(
4409+
ex_spec.TableOutputSpec(
4410+
destination,
4411+
cluster_cols=tuple(clustering_fields),
4412+
if_exists=if_exists,
4413+
)
4414+
),
44084415
)
4409-
self._set_internal_query_job(query_job)
4416+
assert result.query_job is not None
4417+
self._set_internal_query_job(result.query_job)
44104418

44114419
# The query job should have finished, so there should be always be a result table.
4412-
result_table = query_job.destination
4420+
result_table = result.query_job.destination
44134421
assert result_table is not None
44144422

44154423
if temp_table_ref:
@@ -4477,13 +4485,17 @@ def to_parquet(
44774485
index=index and self._has_index,
44784486
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
44794487
)
4480-
query_job = self._session._executor.export_gcs(
4488+
result = self._session._executor.execute(
44814489
export_array.rename_columns(id_overrides),
4482-
path,
4483-
format="parquet",
4484-
export_options=export_options,
4490+
ex_spec.ExecutionSpec(
4491+
ex_spec.GcsOutputSpec(
4492+
uri=path,
4493+
format="parquet",
4494+
export_options=tuple(export_options.items()),
4495+
)
4496+
),
44854497
)
4486-
self._set_internal_query_job(query_job)
4498+
self._set_internal_query_job(result.query_job)
44874499
return None
44884500

44894501
def to_dict(

0 commit comments

Comments
 (0)