diff --git a/bigframes/core/blocks.py b/bigframes/core/blocks.py index 283f56fd39..f1aab473c7 100644 --- a/bigframes/core/blocks.py +++ b/bigframes/core/blocks.py @@ -69,7 +69,7 @@ import bigframes.exceptions as bfe import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops -from bigframes.session import dry_runs +from bigframes.session import dry_runs, execution_spec from bigframes.session import executor as executors # Type constraint for wherever column labels are used @@ -257,7 +257,10 @@ def shape(self) -> typing.Tuple[int, int]: except Exception: pass - row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar() + row_count = self.session._executor.execute( + self.expr.row_count(), + execution_spec.ExecutionSpec(promise_under_10gb=True, ordered=False), + ).to_py_scalar() return (row_count, len(self.value_columns)) @property @@ -557,8 +560,17 @@ def to_arrow( allow_large_results: Optional[bool] = None, ) -> Tuple[pa.Table, Optional[bigquery.QueryJob]]: """Run query and download results as a pyarrow Table.""" + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else not bigframes.options._allow_large_results + ) execute_result = self.session._executor.execute( - self.expr, ordered=ordered, use_explicit_destination=allow_large_results + self.expr, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=ordered, + ), ) pa_table = execute_result.to_arrow_table() @@ -647,8 +659,15 @@ def try_peek( self, n: int = 20, force: bool = False, allow_large_results=None ) -> typing.Optional[pd.DataFrame]: if force or self.expr.supports_fast_peek: - result = self.session._executor.peek( - self.expr, n, use_explicit_destination=allow_large_results + # really, we should just block insane peek values and always assume <10gb + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else not bigframes.options._allow_large_results + ) + result = self.session._executor.execute( + self.expr, + execution_spec.ExecutionSpec(promise_under_10gb=under_10gb, peek=n), ) df = result.to_pandas() return self._copy_index_to_pandas(df) @@ -665,10 +684,18 @@ def to_pandas_batches( page_size and max_results determine the size and number of batches, see https://cloud.google.com/python/docs/reference/bigquery/latest/google.cloud.bigquery.job.QueryJob#google_cloud_bigquery_job_QueryJob_result""" + + under_10gb = ( + (not allow_large_results) + if (allow_large_results is not None) + else not bigframes.options._allow_large_results + ) execute_result = self.session._executor.execute( self.expr, - ordered=True, - use_explicit_destination=allow_large_results, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=True, + ), ) # To reduce the number of edge cases to consider when working with the @@ -714,10 +741,17 @@ def _materialize_local( ) -> Tuple[pd.DataFrame, Optional[bigquery.QueryJob]]: """Run query and download results as a pandas DataFrame. Return the total number of results as well.""" # TODO(swast): Allow for dry run and timeout. + under_10gb = ( + (not materialize_options.allow_large_results) + if (materialize_options.allow_large_results is not None) + else (not bigframes.options._allow_large_results) + ) execute_result = self.session._executor.execute( self.expr, - ordered=materialize_options.ordered, - use_explicit_destination=materialize_options.allow_large_results, + execution_spec.ExecutionSpec( + promise_under_10gb=under_10gb, + ordered=materialize_options.ordered, + ), ) sample_config = materialize_options.downsampling if execute_result.total_bytes is not None: @@ -1598,9 +1632,19 @@ def retrieve_repr_request_results( config=executors.CacheConfig(optimize_for="head", if_cached="reuse-strict"), ) head_result = self.session._executor.execute( - self.expr.slice(start=None, stop=max_results, step=None) + self.expr.slice(start=None, stop=max_results, step=None), + execution_spec.ExecutionSpec( + promise_under_10gb=True, + ordered=True, + ), ) - row_count = self.session._executor.execute(self.expr.row_count()).to_py_scalar() + row_count = self.session._executor.execute( + self.expr.row_count(), + execution_spec.ExecutionSpec( + promise_under_10gb=True, + ordered=False, + ), + ).to_py_scalar() head_df = head_result.to_pandas() return self._copy_index_to_pandas(head_df), row_count, head_result.query_job diff --git a/bigframes/core/indexes/base.py b/bigframes/core/indexes/base.py index e022b3f151..f8ec38621d 100644 --- a/bigframes/core/indexes/base.py +++ b/bigframes/core/indexes/base.py @@ -38,6 +38,7 @@ import bigframes.operations as ops import bigframes.operations.aggregations as agg_ops import bigframes.series +import bigframes.session.execution_spec as ex_spec if typing.TYPE_CHECKING: import bigframes.dataframe @@ -283,8 +284,9 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]: # Check if key exists at all by counting count_agg = ex.UnaryAggregation(agg_ops.count_op, ex.deref(offsets_id)) count_result = filtered_block._expr.aggregate([(count_agg, "count")]) + count_scalar = self._block.session._executor.execute( - count_result + count_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ).to_py_scalar() if count_scalar == 0: @@ -295,7 +297,7 @@ def get_loc(self, key) -> typing.Union[int, slice, "bigframes.series.Series"]: min_agg = ex.UnaryAggregation(agg_ops.min_op, ex.deref(offsets_id)) position_result = filtered_block._expr.aggregate([(min_agg, "position")]) position_scalar = self._block.session._executor.execute( - position_result + position_result, ex_spec.ExecutionSpec(promise_under_10gb=True) ).to_py_scalar() return int(position_scalar) @@ -326,7 +328,10 @@ def _get_monotonic_slice(self, filtered_block, offsets_id: str) -> slice: combined_result = filtered_block._expr.aggregate(min_max_aggs) # Execute query and extract positions - result_df = self._block.session._executor.execute(combined_result).to_pandas() + result_df = self._block.session._executor.execute( + combined_result, + execution_spec=ex_spec.ExecutionSpec(promise_under_10gb=True), + ).to_pandas() min_pos = int(result_df["min_pos"].iloc[0]) max_pos = int(result_df["max_pos"].iloc[0]) diff --git a/bigframes/dataframe.py b/bigframes/dataframe.py index 85760d94bc..d6bfe61d43 100644 --- a/bigframes/dataframe.py +++ b/bigframes/dataframe.py @@ -85,6 +85,7 @@ import bigframes.operations.structs import bigframes.series import bigframes.session._io.bigquery +import bigframes.session.execution_spec as ex_spec if typing.TYPE_CHECKING: from _typeshed import SupportsRichComparison @@ -4271,17 +4272,19 @@ def to_csv( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - options = { + options: dict[str, Union[bool, str]] = { "field_delimiter": sep, "header": header, } - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path_or_buf, - format="csv", - export_options=options, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec( + uri=path_or_buf, format="csv", export_options=tuple(options.items()) + ) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_json( @@ -4324,13 +4327,13 @@ def to_json( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path_or_buf, - format="json", - export_options={}, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec(uri=path_or_buf, format="json", export_options=()) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_gbq( @@ -4403,16 +4406,21 @@ def to_gbq( ) ) - query_job = self._session._executor.export_gbq( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - destination=destination, - cluster_cols=clustering_fields, - if_exists=if_exists, + ex_spec.ExecutionSpec( + ex_spec.TableOutputSpec( + destination, + cluster_cols=tuple(clustering_fields), + if_exists=if_exists, + ) + ), ) - self._set_internal_query_job(query_job) + assert result.query_job is not None + self._set_internal_query_job(result.query_job) # The query job should have finished, so there should be always be a result table. - result_table = query_job.destination + result_table = result.query_job.destination assert result_table is not None if temp_table_ref: @@ -4480,13 +4488,17 @@ def to_parquet( index=index and self._has_index, ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID, ) - query_job = self._session._executor.export_gcs( + result = self._session._executor.execute( export_array.rename_columns(id_overrides), - path, - format="parquet", - export_options=export_options, + ex_spec.ExecutionSpec( + ex_spec.GcsOutputSpec( + uri=path, + format="parquet", + export_options=tuple(export_options.items()), + ) + ), ) - self._set_internal_query_job(query_job) + self._set_internal_query_job(result.query_job) return None def to_dict( diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index a970e75a0f..b428cd646c 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -14,11 +14,9 @@ from __future__ import annotations -import dataclasses import math -import os import threading -from typing import cast, Literal, Mapping, Optional, Sequence, Tuple, Union +from typing import Literal, Mapping, Optional, Sequence, Tuple import warnings import weakref @@ -35,12 +33,12 @@ from bigframes.core import compile, local_data, rewrite import bigframes.core.compile.sqlglot.sqlglot_ir as sqlglot_ir import bigframes.core.guid +import bigframes.core.identifiers import bigframes.core.nodes as nodes import bigframes.core.ordering as order import bigframes.core.schema as schemata import bigframes.core.tree_properties as tree_properties import bigframes.dtypes -import bigframes.features from bigframes.session import ( executor, loader, @@ -49,6 +47,7 @@ semi_executor, ) import bigframes.session._io.bigquery as bq_io +import bigframes.session.execution_spec as ex_spec import bigframes.session.metrics import bigframes.session.planner import bigframes.session.temporary_storage @@ -61,21 +60,6 @@ MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G -@dataclasses.dataclass -class OutputSpec: - require_bq_table: bool - cluster_cols: tuple[str, ...] - - def with_require_table(self, value: bool) -> OutputSpec: - return dataclasses.replace(self, require_bq_table=value) - - -def _get_default_output_spec() -> OutputSpec: - return OutputSpec( - require_bq_table=bigframes.options._allow_large_results, cluster_cols=() - ) - - SourceIdMapping = Mapping[str, str] @@ -189,7 +173,11 @@ def to_sql( ) -> str: if offset_column: array_value, _ = array_value.promote_offsets() - node = self.logical_plan(array_value.node) if enable_cache else array_value.node + node = ( + self.prepare_plan(array_value.node, target="simplify") + if enable_cache + else array_value.node + ) node = self._substitute_large_local_sources(node) compiled = compile.compile_sql(compile.CompileRequest(node, sort_rows=ordered)) return compiled.sql @@ -197,86 +185,113 @@ def to_sql( def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = None, + execution_spec: ex_spec.ExecutionSpec, ) -> executor.ExecuteResult: - if bigframes.options.compute.enable_multi_query_execution: - self._simplify_with_caching(array_value) - - output_spec = _get_default_output_spec() - if use_explicit_destination is not None: - output_spec = output_spec.with_require_table(use_explicit_destination) - - plan = self.logical_plan(array_value.node) - return self._execute_plan( - plan, - ordered=ordered, - output_spec=output_spec, - ) + # TODO: Support export jobs in combination with semi executors + if execution_spec.destination_spec is None: + plan = self.prepare_plan(array_value.node, target="simplify") + for exec in self._semi_executors: + maybe_result = exec.execute( + plan, ordered=execution_spec.ordered, peek=execution_spec.peek + ) + if maybe_result: + return maybe_result - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = None, - ) -> executor.ExecuteResult: - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - plan = self.logical_plan(array_value.node) - if not tree_properties.can_fast_peek(plan): - msg = bfe.format_message("Peeking this value cannot be done efficiently.") - warnings.warn(msg) + if isinstance(execution_spec.destination_spec, ex_spec.TableOutputSpec): + if execution_spec.peek or execution_spec.ordered: + raise NotImplementedError( + "Ordering and peeking not supported for gbq export" + ) + # separate path for export_gbq, as it has all sorts of annoying logic, such as possibly running as dml + return self._export_gbq(array_value, execution_spec.destination_spec) + + result = self._execute_plan_gbq( + array_value.node, + ordered=execution_spec.ordered, + peek=execution_spec.peek, + cache_spec=execution_spec.destination_spec + if isinstance(execution_spec.destination_spec, ex_spec.CacheSpec) + else None, + must_create_table=not execution_spec.promise_under_10gb, + ) + # post steps: export + if isinstance(execution_spec.destination_spec, ex_spec.GcsOutputSpec): + self._export_result_gcs(result, execution_spec.destination_spec) - output_spec = _get_default_output_spec() - if use_explicit_destination is not None: - output_spec = output_spec.with_require_table(use_explicit_destination) + return result - return self._execute_plan( - plan, ordered=False, output_spec=output_spec, peek=n_rows + def _export_result_gcs( + self, result: executor.ExecuteResult, gcs_export_spec: ex_spec.GcsOutputSpec + ): + query_job = result.query_job + assert query_job is not None + result_table = query_job.destination + assert result_table is not None + export_data_statement = bq_io.create_export_data_statement( + f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", + uri=gcs_export_spec.uri, + format=gcs_export_spec.format, + export_options=dict(gcs_export_spec.export_options), + ) + bq_io.start_query_with_client( + self.bqclient, + export_data_statement, + job_config=bigquery.QueryJobConfig(), + metrics=self.metrics, + project=None, + location=None, + timeout=None, + query_with_job=True, ) - def export_gbq( - self, - array_value: bigframes.core.ArrayValue, - destination: bigquery.TableReference, - if_exists: Literal["fail", "replace", "append"] = "fail", - cluster_cols: Sequence[str] = [], - ): + def _maybe_find_existing_table( + self, spec: ex_spec.TableOutputSpec + ) -> Optional[bigquery.Table]: + # validate destination table + try: + table = self.bqclient.get_table(spec.table) + if spec.if_exists == "fail": + raise ValueError(f"Table already exists: {spec.table.__str__()}") + + if len(spec.cluster_cols) != 0: + if (table.clustering_fields is None) or ( + tuple(table.clustering_fields) != spec.cluster_cols + ): + raise ValueError( + "Table clustering fields cannot be changed after the table has " + f"been created. Requested clustering fields: {spec.cluster_cols}, existing clustering fields: {table.clustering_fields}" + ) + return table + except google.api_core.exceptions.NotFound: + return None + + def _export_gbq( + self, array_value: bigframes.core.ArrayValue, spec: ex_spec.TableOutputSpec + ) -> executor.ExecuteResult: """ Export the ArrayValue to an existing BigQuery table. """ - if bigframes.options.compute.enable_multi_query_execution: - self._simplify_with_caching(array_value) + plan = self.prepare_plan(array_value.node, target="bq_execution") - table_exists = True - try: - table = self.bqclient.get_table(destination) - if if_exists == "fail": - raise ValueError(f"Table already exists: {destination.__str__()}") - except google.api_core.exceptions.NotFound: - table_exists = False + # validate destination table + existing_table = self._maybe_find_existing_table(spec) - if len(cluster_cols) != 0: - if table_exists and table.clustering_fields != cluster_cols: - raise ValueError( - "Table clustering fields cannot be changed after the table has " - f"been created. Existing clustering fields: {table.clustering_fields}" - ) + compiled = compile.compile_sql(compile.CompileRequest(plan, sort_rows=False)) + sql = compiled.sql - sql = self.to_sql(array_value, ordered=False) - if table_exists and _if_schema_match(table.schema, array_value.schema): + if (existing_table is not None) and _if_schema_match( + existing_table.schema, array_value.schema + ): # b/409086472: Uses DML for table appends and replacements to avoid # BigQuery `RATE_LIMIT_EXCEEDED` errors, as per quota limits: # https://cloud.google.com/bigquery/quotas#standard_tables job_config = bigquery.QueryJobConfig() ir = sqlglot_ir.SQLGlotIR.from_query_string(sql) - if if_exists == "append": - sql = ir.insert(destination) + if spec.if_exists == "append": + sql = ir.insert(spec.table) else: # for "replace" - assert if_exists == "replace" - sql = ir.replace(destination) + assert spec.if_exists == "replace" + sql = ir.replace(spec.table) else: dispositions = { "fail": bigquery.WriteDisposition.WRITE_EMPTY, @@ -284,14 +299,14 @@ def export_gbq( "append": bigquery.WriteDisposition.WRITE_APPEND, } job_config = bigquery.QueryJobConfig( - write_disposition=dispositions[if_exists], - destination=destination, - clustering_fields=cluster_cols if cluster_cols else None, + write_disposition=dispositions[spec.if_exists], + destination=spec.table, + clustering_fields=spec.cluster_cols if spec.cluster_cols else None, ) # TODO(swast): plumb through the api_name of the user-facing api that # caused this query. - _, query_job = self._run_execute_query( + row_iter, query_job = self._run_execute_query( sql=sql, job_config=job_config, ) @@ -300,48 +315,16 @@ def export_gbq( t == bigframes.dtypes.TIMEDELTA_DTYPE for t in array_value.schema.dtypes ) - if if_exists != "append" and has_timedelta_col: + if spec.if_exists != "append" and has_timedelta_col: # Only update schema if this is not modifying an existing table, and the # new table contains timedelta columns. - table = self.bqclient.get_table(destination) + table = self.bqclient.get_table(spec.table) table.schema = array_value.schema.to_bigquery() self.bqclient.update_table(table, ["schema"]) - return query_job - - def export_gcs( - self, - array_value: bigframes.core.ArrayValue, - uri: str, - format: Literal["json", "csv", "parquet"], - export_options: Mapping[str, Union[bool, str]], - ): - query_job = self.execute( - array_value, - ordered=False, - use_explicit_destination=True, - ).query_job - assert query_job is not None - result_table = query_job.destination - assert result_table is not None - export_data_statement = bq_io.create_export_data_statement( - f"{result_table.project}.{result_table.dataset_id}.{result_table.table_id}", - uri=uri, - format=format, - export_options=dict(export_options), - ) - - bq_io.start_query_with_client( - self.bqclient, - export_data_statement, - job_config=bigquery.QueryJobConfig(), - metrics=self.metrics, - project=None, - location=None, - timeout=None, - query_with_job=True, + return executor.ExecuteResult( + row_iter.to_arrow_iterable(), array_value.schema, query_job ) - return query_job def dry_run( self, array_value: bigframes.core.ArrayValue, ordered: bool = True @@ -446,59 +429,56 @@ def _is_trivially_executable(self, array_value: bigframes.core.ArrayValue): # Once rewriting is available, will want to rewrite before # evaluating execution cost. return tree_properties.is_trivially_executable( - self.logical_plan(array_value.node) + self.prepare_plan(array_value.node) ) - def logical_plan(self, root: nodes.BigFrameNode) -> nodes.BigFrameNode: + def prepare_plan( + self, + plan: nodes.BigFrameNode, + target: Literal["simplify", "bq_execution"] = "simplify", + ) -> nodes.BigFrameNode: """ - Apply universal logical simplifications that are helpful regardless of engine. + Prepare the plan by simplifying it with caches, removing unused operators. Has modes for different contexts. + + "simplify" removes unused operations and subsitutes subtrees with their previously cached equivalents + "bq_execution" is the most heavy option, preparing the plan for bq execution by also caching subtrees, uploading large local sources """ - plan = self.replace_cached_subtrees(root) + # TODO: We should model plan decomposition and data uploading as work steps rather than as plan preparation. + if ( + target == "bq_execution" + and bigframes.options.compute.enable_multi_query_execution + ): + self._simplify_with_caching(plan) + + plan = self.replace_cached_subtrees(plan) plan = rewrite.column_pruning(plan) plan = plan.top_down(rewrite.fold_row_counts) + + if target == "bq_execution": + plan = self._substitute_large_local_sources(plan) + return plan def _cache_with_cluster_cols( self, array_value: bigframes.core.ArrayValue, cluster_cols: Sequence[str] ): """Executes the query and uses the resulting table to rewrite future executions.""" - plan = self.logical_plan(array_value.node) - plan = self._substitute_large_local_sources(plan) - compiled = compile.compile_sql( - compile.CompileRequest( - plan, sort_rows=False, materialize_all_order_keys=True - ) - ) - tmp_table_ref, num_rows = self._sql_as_cached_temp_table( - compiled.sql, - compiled.sql_schema, - cluster_cols=bq_io.select_cluster_cols(compiled.sql_schema, cluster_cols), + execution_spec = ex_spec.ExecutionSpec( + destination_spec=ex_spec.CacheSpec(cluster_cols=tuple(cluster_cols)) ) - tmp_table = self.bqclient.get_table(tmp_table_ref) - assert compiled.row_order is not None - self.cache.cache_results_table( - array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + self.execute( + array_value, + execution_spec=execution_spec, ) def _cache_with_offsets(self, array_value: bigframes.core.ArrayValue): """Executes the query and uses the resulting table to rewrite future executions.""" - offset_column = bigframes.core.guid.generate_guid("bigframes_offsets") - w_offsets, offset_column = array_value.promote_offsets() - compiled = compile.compile_sql( - compile.CompileRequest( - self.logical_plan(self._substitute_large_local_sources(w_offsets.node)), - sort_rows=False, - ) + execution_spec = ex_spec.ExecutionSpec( + destination_spec=ex_spec.CacheSpec(cluster_cols=tuple()) ) - tmp_table_ref, num_rows = self._sql_as_cached_temp_table( - compiled.sql, - compiled.sql_schema, - cluster_cols=[offset_column], - ) - tmp_table = self.bqclient.get_table(tmp_table_ref) - assert compiled.row_order is not None - self.cache.cache_results_table( - array_value.node, tmp_table, compiled.row_order, num_rows=num_rows + self.execute( + array_value, + execution_spec=execution_spec, ) def _cache_with_session_awareness( @@ -520,17 +500,17 @@ def _cache_with_session_awareness( else: self._cache_with_cluster_cols(bigframes.core.ArrayValue(target), []) - def _simplify_with_caching(self, array_value: bigframes.core.ArrayValue): + def _simplify_with_caching(self, plan: nodes.BigFrameNode): """Attempts to handle the complexity by caching duplicated subtrees and breaking the query into pieces.""" # Apply existing caching first for _ in range(MAX_SUBTREE_FACTORINGS): if ( - self.logical_plan(array_value.node).planning_complexity + self.prepare_plan(plan, "simplify").planning_complexity < QUERY_COMPLEXITY_LIMIT ): return - did_cache = self._cache_most_complex_subtree(array_value.node) + did_cache = self._cache_most_complex_subtree(plan) if not did_cache: return @@ -552,52 +532,6 @@ def _cache_most_complex_subtree(self, node: nodes.BigFrameNode) -> bool: self._cache_with_cluster_cols(bigframes.core.ArrayValue(selection), []) return True - def _sql_as_cached_temp_table( - self, - sql: str, - schema: Sequence[bigquery.SchemaField], - cluster_cols: Sequence[str], - ) -> tuple[bigquery.TableReference, Optional[int]]: - assert len(cluster_cols) <= _MAX_CLUSTER_COLUMNS - temp_table = self.storage_manager.create_temp_table(schema, cluster_cols) - - # TODO: Get default job config settings - job_config = cast( - bigquery.QueryJobConfig, - bigquery.QueryJobConfig.from_api_repr({}), - ) - job_config.destination = temp_table - _, query_job = self._run_execute_query( - sql, - job_config=job_config, - ) - assert query_job is not None - iter = query_job.result() - return query_job.destination, iter.total_rows - - def _validate_result_schema( - self, - array_value: bigframes.core.ArrayValue, - bq_schema: list[bigquery.SchemaField], - ): - actual_schema = _sanitize(tuple(bq_schema)) - ibis_schema = compile.test_only_ibis_inferred_schema( - self.logical_plan(array_value.node) - ).to_bigquery() - internal_schema = _sanitize(array_value.schema.to_bigquery()) - if not bigframes.features.PANDAS_VERSIONS.is_arrow_list_dtype_usable: - return - - if internal_schema != actual_schema: - raise ValueError( - f"This error should only occur while testing. BigFrames internal schema: {internal_schema} does not match actual schema: {actual_schema}" - ) - - if ibis_schema != actual_schema: - raise ValueError( - f"This error should only occur while testing. Ibis schema: {ibis_schema} does not match actual schema: {actual_schema}" - ) - def _substitute_large_local_sources(self, original_root: nodes.BigFrameNode): """ Replace large local sources with the uploaded version of those datasources. @@ -646,52 +580,80 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable): ) self.cache.cache_remote_replacement(local_table, uploaded) - def _execute_plan( + def _execute_plan_gbq( self, plan: nodes.BigFrameNode, ordered: bool, - output_spec: OutputSpec, peek: Optional[int] = None, + cache_spec: Optional[ex_spec.CacheSpec] = None, + must_create_table: bool = True, ) -> executor.ExecuteResult: """Just execute whatever plan as is, without further caching or decomposition.""" - # First try to execute fast-paths - if not output_spec.require_bq_table: - for exec in self._semi_executors: - maybe_result = exec.execute(plan, ordered=ordered, peek=peek) - if maybe_result: - return maybe_result + # TODO(swast): plumb through the api_name of the user-facing api that + # caused this query. + + og_plan = plan + og_schema = plan.schema + + plan = self.prepare_plan(plan, target="bq_execution") + create_table = must_create_table + cluster_cols: Sequence[str] = [] + if cache_spec is not None: + if peek is not None: + raise ValueError("peek is not compatible with caching.") + + create_table = True + if not cache_spec.cluster_cols: + assert len(cache_spec.cluster_cols) <= _MAX_CLUSTER_COLUMNS + offsets_id = bigframes.core.identifiers.ColumnId( + bigframes.core.guid.generate_guid() + ) + plan = nodes.PromoteOffsetsNode(plan, offsets_id) + cluster_cols = [offsets_id.sql] + else: + cluster_cols = cache_spec.cluster_cols - # Use explicit destination to avoid 10GB limit of temporary table - destination_table = ( - self.storage_manager.create_temp_table( - plan.schema.to_bigquery(), cluster_cols=output_spec.cluster_cols + compiled = compile.compile_sql( + compile.CompileRequest( + plan, + sort_rows=ordered, + peek_count=peek, + materialize_all_order_keys=(cache_spec is not None), ) - if output_spec.require_bq_table - else None ) + # might have more columns than og schema, for hidden ordering columns + compiled_schema = compiled.sql_schema + + destination_table: Optional[bigquery.TableReference] = None - # TODO(swast): plumb through the api_name of the user-facing api that - # caused this query. job_config = bigquery.QueryJobConfig() - # Use explicit destination to avoid 10GB limit of temporary table - if destination_table is not None: + if create_table: + destination_table = self.storage_manager.create_temp_table( + compiled_schema, cluster_cols + ) job_config.destination = destination_table - plan = self._substitute_large_local_sources(plan) - compiled = compile.compile_sql( - compile.CompileRequest(plan, sort_rows=ordered, peek_count=peek) - ) iterator, query_job = self._run_execute_query( sql=compiled.sql, job_config=job_config, query_with_job=(destination_table is not None), ) - if query_job: - size_bytes = self.bqclient.get_table(query_job.destination).num_bytes + table_info: Optional[bigquery.Table] = None + if query_job and query_job.destination: + table_info = self.bqclient.get_table(query_job.destination) + size_bytes = table_info.num_bytes else: size_bytes = None + # we could actually cache even when caching is not explicitly requested, but being conservative for now + if cache_spec is not None: + assert table_info is not None + assert compiled.row_order is not None + self.cache.cache_results_table( + og_plan, table_info, compiled.row_order, num_rows=table_info.num_rows + ) + if size_bytes is not None and size_bytes >= MAX_SMALL_RESULT_BYTES: msg = bfe.format_message( "The query result size has exceeded 10 GB. In BigFrames 2.0 and " @@ -700,18 +662,12 @@ def _execute_plan( "`bigframes.options.compute.allow_large_results=True`." ) warnings.warn(msg, FutureWarning) - # Runs strict validations to ensure internal type predictions and ibis are completely in sync - # Do not execute these validations outside of testing suite. - if "PYTEST_CURRENT_TEST" in os.environ: - self._validate_result_schema( - bigframes.core.ArrayValue(plan), iterator.schema - ) return executor.ExecuteResult( _arrow_batches=iterator.to_arrow_iterable( bqstorage_client=self.bqstoragereadclient ), - schema=plan.schema, + schema=og_schema, query_job=query_job, total_bytes=size_bytes, total_rows=iterator.total_rows, @@ -731,19 +687,3 @@ def _if_schema_match( ): return False return True - - -def _sanitize( - schema: Tuple[bigquery.SchemaField, ...] -) -> Tuple[bigquery.SchemaField, ...]: - # Schema inferred from SQL strings and Ibis expressions contain only names, types and modes, - # so we disregard other fields (e.g timedelta description for timedelta columns) for validations. - return tuple( - bigquery.SchemaField( - f.name, - f.field_type, - f.mode, # type:ignore - fields=_sanitize(f.fields), - ) - for f in schema - ) diff --git a/bigframes/session/execution_spec.py b/bigframes/session/execution_spec.py new file mode 100644 index 0000000000..c9431dbd11 --- /dev/null +++ b/bigframes/session/execution_spec.py @@ -0,0 +1,53 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import annotations + +import dataclasses +from typing import Literal, Optional, Union + +from google.cloud import bigquery + + +@dataclasses.dataclass(frozen=True) +class ExecutionSpec: + destination_spec: Union[TableOutputSpec, GcsOutputSpec, CacheSpec, None] = None + peek: Optional[int] = None + ordered: bool = ( + False # ordered and promise_under_10gb must both be together for bq execution + ) + # This is an optimization flag for gbq execution, it doesn't change semantics, but if promise is falsely made, errors may occur + promise_under_10gb: bool = False + + +# This one is temporary, in future, caching will not be done through immediate execution, but will label nodes +# that will be cached only when a super-tree is executed +@dataclasses.dataclass(frozen=True) +class CacheSpec: + cluster_cols: tuple[str, ...] + + +@dataclasses.dataclass(frozen=True) +class TableOutputSpec: + table: bigquery.TableReference + cluster_cols: tuple[str, ...] + if_exists: Literal["fail", "replace", "append"] = "fail" + + +@dataclasses.dataclass(frozen=True) +class GcsOutputSpec: + uri: str + format: Literal["json", "csv", "parquet"] + # sequence of (option, value) pairs + export_options: tuple[tuple[str, Union[bool, str]], ...] diff --git a/bigframes/session/executor.py b/bigframes/session/executor.py index cc8f086f9f..748b10647a 100644 --- a/bigframes/session/executor.py +++ b/bigframes/session/executor.py @@ -18,7 +18,7 @@ import dataclasses import functools import itertools -from typing import Iterator, Literal, Mapping, Optional, Sequence, Union +from typing import Iterator, Literal, Optional, Union from google.cloud import bigquery import pandas as pd @@ -29,6 +29,7 @@ from bigframes.core import pyarrow_utils import bigframes.core.schema import bigframes.session._io.pandas as io_pandas +import bigframes.session.execution_spec as ex_spec _ROW_LIMIT_EXCEEDED_TEMPLATE = ( "Execution has downloaded {result_rows} rows so far, which exceeds the " @@ -147,41 +148,16 @@ def to_sql( """ raise NotImplementedError("to_sql not implemented for this executor") + @abc.abstractmethod def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = False, + execution_spec: ex_spec.ExecutionSpec, ) -> ExecuteResult: """ - Execute the ArrayValue, storing the result to a temporary session-owned table. - """ - raise NotImplementedError("execute not implemented for this executor") - - def export_gbq( - self, - array_value: bigframes.core.ArrayValue, - destination: bigquery.TableReference, - if_exists: Literal["fail", "replace", "append"] = "fail", - cluster_cols: Sequence[str] = [], - ) -> bigquery.QueryJob: - """ - Export the ArrayValue to an existing BigQuery table. + Execute the ArrayValue. """ - raise NotImplementedError("export_gbq not implemented for this executor") - - def export_gcs( - self, - array_value: bigframes.core.ArrayValue, - uri: str, - format: Literal["json", "csv", "parquet"], - export_options: Mapping[str, Union[bool, str]], - ) -> bigquery.QueryJob: - """ - Export the ArrayValue to gcs. - """ - raise NotImplementedError("export_gcs not implemented for this executor") + ... def dry_run( self, array_value: bigframes.core.ArrayValue, ordered: bool = True @@ -193,17 +169,6 @@ def dry_run( """ raise NotImplementedError("dry_run not implemented for this executor") - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = False, - ) -> ExecuteResult: - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - raise NotImplementedError("peek not implemented for this executor") - def cached( self, array_value: bigframes.core.ArrayValue, diff --git a/bigframes/testing/compiler_session.py b/bigframes/testing/compiler_session.py index 35114d95d0..289b2600fd 100644 --- a/bigframes/testing/compiler_session.py +++ b/bigframes/testing/compiler_session.py @@ -41,3 +41,10 @@ def to_sql( return self.compiler.SQLGlotCompiler().compile( array_value.node, ordered=ordered ) + + def execute( + self, + array_value, + execution_spec, + ): + raise NotImplementedError("SQLCompilerExecutor.execute not implemented") diff --git a/bigframes/testing/polars_session.py b/bigframes/testing/polars_session.py index 3710c40eae..29eae20b7a 100644 --- a/bigframes/testing/polars_session.py +++ b/bigframes/testing/polars_session.py @@ -13,7 +13,7 @@ # limitations under the License. import dataclasses -from typing import Optional, Union +from typing import Union import weakref import pandas @@ -23,48 +23,31 @@ import bigframes.core.blocks import bigframes.core.compile.polars import bigframes.dataframe +import bigframes.session.execution_spec import bigframes.session.executor import bigframes.session.metrics -# Does not support to_sql, export_gbq, export_gcs, dry_run, peek, head, get_row_count, cached +# Does not support to_sql, dry_run, peek, cached @dataclasses.dataclass class TestExecutor(bigframes.session.executor.Executor): compiler = bigframes.core.compile.polars.PolarsCompiler() - def peek( - self, - array_value: bigframes.core.ArrayValue, - n_rows: int, - use_explicit_destination: Optional[bool] = False, - ): - """ - A 'peek' efficiently accesses a small number of rows in the dataframe. - """ - lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) - pa_table = lazy_frame.collect().limit(n_rows).to_arrow() - # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. - # Nullability may be different, and might use large versions of list, string datatypes. - return bigframes.session.executor.ExecuteResult( - _arrow_batches=pa_table.to_batches(), - schema=array_value.schema, - total_bytes=pa_table.nbytes, - total_rows=pa_table.num_rows, - ) - def execute( self, array_value: bigframes.core.ArrayValue, - *, - ordered: bool = True, - use_explicit_destination: Optional[bool] = False, - page_size: Optional[int] = None, - max_results: Optional[int] = None, + execution_spec: bigframes.session.execution_spec.ExecutionSpec, ): """ Execute the ArrayValue, storing the result to a temporary session-owned table. """ + if execution_spec.destination_spec is not None: + raise ValueError( + f"TestExecutor does not support destination spec: {execution_spec.destination_spec}" + ) lazy_frame: polars.LazyFrame = self.compiler.compile(array_value.node) + if execution_spec.peek is not None: + lazy_frame = lazy_frame.limit(execution_spec.peek) pa_table = lazy_frame.collect().to_arrow() # Currently, pyarrow types might not quite be exactly the ones in the bigframes schema. # Nullability may be different, and might use large versions of list, string datatypes. diff --git a/tests/system/small/test_session.py b/tests/system/small/test_session.py index f0a6302c7b..08c3359b00 100644 --- a/tests/system/small/test_session.py +++ b/tests/system/small/test_session.py @@ -36,6 +36,7 @@ import bigframes.dataframe import bigframes.dtypes import bigframes.ml.linear_model +import bigframes.session.execution_spec from bigframes.testing import utils all_write_engines = pytest.mark.parametrize( @@ -113,7 +114,10 @@ def test_read_gbq_tokyo( # use_explicit_destination=True, otherwise might use path with no query_job exec_result = session_tokyo._executor.execute( - df._block.expr, use_explicit_destination=True + df._block.expr, + bigframes.session.execution_spec.ExecutionSpec( + bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + ), ) assert exec_result.query_job is not None assert exec_result.query_job.location == tokyo_location @@ -896,7 +900,10 @@ def test_read_pandas_tokyo( expected = scalars_pandas_df_index result = session_tokyo._executor.execute( - df._block.expr, use_explicit_destination=True + df._block.expr, + bigframes.session.execution_spec.ExecutionSpec( + bigframes.session.execution_spec.CacheSpec(()), promise_under_10gb=False + ), ) assert result.query_job is not None assert result.query_job.location == tokyo_location