Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 55 additions & 11 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@
import bigframes.exceptions as bfe
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
from bigframes.session import dry_runs
from bigframes.session import dry_runs, execution_spec
from bigframes.session import executor as executors

# Type constraint for wherever column labels are used
Expand Down Expand Up @@ -257,7 +257,10 @@ def shape(self) -> typing.Tuple[int, int]:
except Exception:
pass

row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False),
).to_py_scalar()
return (row_count, len(self.value_columns))

@property
Expand Down Expand Up @@ -557,8 +560,17 @@ def to_arrow(
allow_large_results: Optional[bool] = None,
) -> Tuple[pa.Table, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pyarrow Table."""
under_10gb = (
(not allow_large_results)
if (allow_large_results is not None)
else not bigframes.options._allow_large_results
)
execute_result = self.session._executor.execute(
self.expr, ordered=ordered, use_explicit_destination=allow_large_results
self.expr,
execution_spec.ExecutionSpec(
promise_under_10gb=under_10gb,
ordered=ordered,
),
)
pa_table = execute_result.to_arrow_table()

Expand Down Expand Up @@ -647,8 +659,15 @@ def try_peek(
self, n: int = 20, force: bool = False, allow_large_results=None
) -> typing.Optional[pd.DataFrame]:
if force or self.expr.supports_fast_peek:
result = self.session._executor.peek(
self.expr, n, use_explicit_destination=allow_large_results
# really, we should just block insane peek values and always assume <10gb
under_10gb = (
(not allow_large_results)
if (allow_large_results is not None)
else not bigframes.options._allow_large_results
)
result = self.session._executor.execute(
self.expr,
execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n),
)
df = result.to_pandas()
return self._copy_index_to_pandas(df)
Expand All @@ -665,10 +684,18 @@ def to_pandas_batches(

page_size and max_results determine the size and number of batches,
see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result"""

under_10gb = (
(not allow_large_results)
if (allow_large_results is not None)
else not bigframes.options._allow_large_results
)
execute_result = self.session._executor.execute(
self.expr,
ordered=True,
use_explicit_destination=allow_large_results,
execution_spec.ExecutionSpec(
promise_under_10gb=under_10gb,
ordered=True,
),
)

# To reduce the number of edge cases to consider when working with the
Expand Down Expand Up @@ -714,10 +741,17 @@ def _materialize_local(
) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]:
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
# TODO(swast): Allow for dry run and timeout.
under_10gb = (
(not materialize_options.allow_large_results)
if (materialize_options.allow_large_results is not None)
else (not bigframes.options._allow_large_results)
)
execute_result = self.session._executor.execute(
self.expr,
ordered=materialize_options.ordered,
use_explicit_destination=materialize_options.allow_large_results,
execution_spec.ExecutionSpec(
promise_under_10gb=under_10gb,
ordered=materialize_options.ordered,
),
)
sample_config = materialize_options.downsampling
if execute_result.total_bytes is not None:
Expand Down Expand Up @@ -1598,9 +1632,19 @@ def retrieve_repr_request_results(
config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"),
)
head_result = self.session._executor.execute(
self.expr.slice(start=None, stop=max_results, step=None)
self.expr.slice(start=None, stop=max_results, step=None),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=True,
),
)
row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar()
row_count = self.session._executor.execute(
self.expr.row_count(),
execution_spec.ExecutionSpec(
promise_under_10gb=True,
ordered=False,
),
).to_py_scalar()

head_df = head_result.to_pandas()
return self._copy_index_to_pandas(head_df), row_count, head_result.query_job
Expand Down
11 changes: 8 additions & 3 deletions bigframes/core/indexes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import bigframes.operations as ops
import bigframes.operations.aggregations as agg_ops
import bigframes.series
import bigframes.session.execution_spec as ex_spec

if typing.TYPE_CHECKING:
import bigframes.dataframe
Expand Down Expand Up @@ -283,8 +284,9 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
# Check if key exists at all by counting
count_agg = ex.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id))
count_result = filtered_block._expr.aggregate([(count_agg, "count")])

count_scalar = self._block.session._executor.execute(
count_result
count_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
).to_py_scalar()

if count_scalar == 0:
Expand All @@ -295,7 +297,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]:
min_agg = ex.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id))
position_result = filtered_block._expr.aggregate([(min_agg, "position")])
position_scalar = self._block.session._executor.execute(
position_result
position_result, ex_spec.ExecutionSpec(promise_under_10gb=True)
).to_py_scalar()
return int(position_scalar)

Expand Down Expand Up @@ -326,7 +328,10 @@ def _get_monotonic_slice(self, filtered_block, offsets_id: str) -> slice:
combined_result = filtered_block._expr.aggregate(min_max_aggs)

# Execute query and extract positions
result_df = self._block.session._executor.execute(combined_result).to_pandas()
result_df = self._block.session._executor.execute(
combined_result,
execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True),
).to_pandas()
min_pos = int(result_df["min_pos"].iloc[0])
max_pos = int(result_df["max_pos"].iloc[0])

Expand Down
56 changes: 34 additions & 22 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import bigframes.operations.structs
import bigframes.series
import bigframes.session._io.bigquery
import bigframes.session.execution_spec as ex_spec

if typing.TYPE_CHECKING:
from _typeshed import SupportsRichComparison
Expand Down Expand Up @@ -4271,17 +4272,19 @@ def to_csv(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
options = {
options: dict[str, Union[bool, str]] = {
"field_delimiter": sep,
"header": header,
}
query_job = self._session._executor.export_gcs(
result = self._session._executor.execute(
export_array.rename_columns(id_overrides),
path_or_buf,
format="csv",
export_options=options,
ex_spec.ExecutionSpec(
ex_spec.GcsOutputSpec(
uri=path_or_buf, format="csv", export_options=tuple(options.items())
)
),
)
self._set_internal_query_job(query_job)
self._set_internal_query_job(result.query_job)
return None

def to_json(
Expand Down Expand Up @@ -4324,13 +4327,13 @@ def to_json(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
query_job = self._session._executor.export_gcs(
result = self._session._executor.execute(
export_array.rename_columns(id_overrides),
path_or_buf,
format="json",
export_options={},
ex_spec.ExecutionSpec(
ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=())
),
)
self._set_internal_query_job(query_job)
self._set_internal_query_job(result.query_job)
return None

def to_gbq(
Expand Down Expand Up @@ -4403,16 +4406,21 @@ def to_gbq(
)
)

query_job = self._session._executor.export_gbq(
result = self._session._executor.execute(
export_array.rename_columns(id_overrides),
destination=destination,
cluster_cols=clustering_fields,
if_exists=if_exists,
ex_spec.ExecutionSpec(
ex_spec.TableOutputSpec(
destination,
cluster_cols=tuple(clustering_fields),
if_exists=if_exists,
)
),
)
self._set_internal_query_job(query_job)
assert result.query_job is not None
self._set_internal_query_job(result.query_job)

# The query job should have finished, so there should be always be a result table.
result_table = query_job.destination
result_table = result.query_job.destination
assert result_table is not None

if temp_table_ref:
Expand Down Expand Up @@ -4480,13 +4488,17 @@ def to_parquet(
index=index and self._has_index,
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
)
query_job = self._session._executor.export_gcs(
result = self._session._executor.execute(
export_array.rename_columns(id_overrides),
path,
format="parquet",
export_options=export_options,
ex_spec.ExecutionSpec(
ex_spec.GcsOutputSpec(
uri=path,
format="parquet",
export_options=tuple(export_options.items()),
)
),
)
self._set_internal_query_job(query_job)
self._set_internal_query_job(result.query_job)
return None

def to_dict(
Expand Down
Loading