Skip to content

Commit 74ffc53

Browse files
refactor: Remove col_id_overrides from executor interfaces (#1438)
1 parent dd2f488 commit 74ffc53

File tree

5 files changed

+29
-50
lines changed

5 files changed

+29
-50
lines changed

bigframes/core/array_value.py

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import functools
1919
import io
2020
import typing
21-
from typing import Iterable, List, Optional, Sequence, Tuple
21+
from typing import Iterable, List, Mapping, Optional, Sequence, Tuple
2222
import warnings
2323

2424
import google.cloud.bigquery
@@ -349,6 +349,20 @@ def select_columns(self, column_ids: typing.Sequence[str]) -> ArrayValue:
349349
)
350350
)
351351

352+
def rename_columns(self, col_id_overrides: Mapping[str, str]) -> ArrayValue:
353+
if not col_id_overrides:
354+
return self
355+
output_ids = [col_id_overrides.get(id, id) for id in self.node.schema.names]
356+
return ArrayValue(
357+
nodes.SelectionNode(
358+
self.node,
359+
tuple(
360+
nodes.AliasedRef(ex.DerefOp(old_id), ids.ColumnId(out_id))
361+
for old_id, out_id in zip(self.node.ids, output_ids)
362+
),
363+
)
364+
)
365+
352366
def drop_columns(self, columns: Iterable[str]) -> ArrayValue:
353367
return self.select_columns(
354368
[col_id for col_id in self.column_ids if col_id not in columns]

bigframes/core/blocks.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2431,7 +2431,7 @@ def to_sql_query(
24312431
# implementaton. It will reference cached tables instead of original data sources.
24322432
# Maybe should just compile raw BFET? Depends on user intent.
24332433
sql = self.session._executor.to_sql(
2434-
array_value, col_id_overrides=substitutions, enable_cache=enable_cache
2434+
array_value.rename_columns(substitutions), enable_cache=enable_cache
24352435
)
24362436
return (
24372437
sql,

bigframes/dataframe.py

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3606,8 +3606,7 @@ def to_csv(
36063606
"header": header,
36073607
}
36083608
query_job = self._session._executor.export_gcs(
3609-
export_array,
3610-
id_overrides,
3609+
export_array.rename_columns(id_overrides),
36113610
path_or_buf,
36123611
format="csv",
36133612
export_options=options,
@@ -3656,8 +3655,7 @@ def to_json(
36563655
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
36573656
)
36583657
query_job = self._session._executor.export_gcs(
3659-
export_array,
3660-
id_overrides,
3658+
export_array.rename_columns(id_overrides),
36613659
path_or_buf,
36623660
format="json",
36633661
export_options={},
@@ -3736,9 +3734,8 @@ def to_gbq(
37363734
)
37373735
)
37383736
query_job = self._session._executor.export_gbq(
3739-
export_array,
3737+
export_array.rename_columns(id_overrides),
37403738
destination=destination,
3741-
col_id_overrides=id_overrides,
37423739
cluster_cols=clustering_fields,
37433740
if_exists=if_exists,
37443741
)
@@ -3814,8 +3811,7 @@ def to_parquet(
38143811
ordering_id=bigframes.session._io.bigquery.IO_ORDERING_ID,
38153812
)
38163813
query_job = self._session._executor.export_gcs(
3817-
export_array,
3818-
id_overrides,
3814+
export_array.rename_columns(id_overrides),
38193815
path,
38203816
format="parquet",
38213817
export_options=export_options,
@@ -4070,7 +4066,9 @@ def _prepare_export(
40704066
# the arbitrary unicode column labels feature in BigQuery, which is
40714067
# currently (June 2023) in preview.
40724068
id_overrides = {
4073-
col_id: col_label for col_id, col_label in zip(columns, column_labels)
4069+
col_id: col_label
4070+
for col_id, col_label in zip(columns, column_labels)
4071+
if (col_id != col_label)
40744072
}
40754073

40764074
if ordering_id is not None:

bigframes/session/executor.py

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import pyarrow
4141

4242
import bigframes.core
43-
from bigframes.core import expression
4443
import bigframes.core.compile
4544
import bigframes.core.guid
4645
import bigframes.core.identifiers
@@ -91,7 +90,6 @@ def to_sql(
9190
self,
9291
array_value: bigframes.core.ArrayValue,
9392
offset_column: Optional[str] = None,
94-
col_id_overrides: Mapping[str, str] = {},
9593
ordered: bool = False,
9694
enable_cache: bool = True,
9795
) -> str:
@@ -105,7 +103,6 @@ def execute(
105103
array_value: bigframes.core.ArrayValue,
106104
*,
107105
ordered: bool = True,
108-
col_id_overrides: Mapping[str, str] = {},
109106
use_explicit_destination: Optional[bool] = False,
110107
get_size_bytes: bool = False,
111108
page_size: Optional[int] = None,
@@ -119,7 +116,6 @@ def execute(
119116
def export_gbq(
120117
self,
121118
array_value: bigframes.core.ArrayValue,
122-
col_id_overrides: Mapping[str, str],
123119
destination: bigquery.TableReference,
124120
if_exists: Literal["fail", "replace", "append"] = "fail",
125121
cluster_cols: Sequence[str] = [],
@@ -132,7 +128,6 @@ def export_gbq(
132128
def export_gcs(
133129
self,
134130
array_value: bigframes.core.ArrayValue,
135-
col_id_overrides: Mapping[str, str],
136131
uri: str,
137132
format: Literal["json", "csv", "parquet"],
138133
export_options: Mapping[str, Union[bool, str]],
@@ -220,29 +215,23 @@ def to_sql(
220215
self,
221216
array_value: bigframes.core.ArrayValue,
222217
offset_column: Optional[str] = None,
223-
col_id_overrides: Mapping[str, str] = {},
224218
ordered: bool = False,
225219
enable_cache: bool = True,
226220
) -> str:
227221
if offset_column:
228222
array_value, internal_offset_col = array_value.promote_offsets()
229-
col_id_overrides = dict(col_id_overrides)
230-
col_id_overrides[internal_offset_col] = offset_column
231223
node = (
232224
self.replace_cached_subtrees(array_value.node)
233225
if enable_cache
234226
else array_value.node
235227
)
236-
if col_id_overrides:
237-
node = override_ids(node, col_id_overrides)
238228
return self.compiler.compile(node, ordered=ordered)
239229

240230
def execute(
241231
self,
242232
array_value: bigframes.core.ArrayValue,
243233
*,
244234
ordered: bool = True,
245-
col_id_overrides: Mapping[str, str] = {},
246235
use_explicit_destination: Optional[bool] = False,
247236
get_size_bytes: bool = False,
248237
page_size: Optional[int] = None,
@@ -254,15 +243,12 @@ def execute(
254243
if bigframes.options.compute.enable_multi_query_execution:
255244
self._simplify_with_caching(array_value)
256245

257-
sql = self.to_sql(
258-
array_value, ordered=ordered, col_id_overrides=col_id_overrides
259-
)
260-
adjusted_schema = array_value.schema.rename(col_id_overrides)
246+
sql = self.to_sql(array_value, ordered=ordered)
261247
job_config = bigquery.QueryJobConfig()
262248
# Use explicit destination to avoid 10GB limit of temporary table
263249
if use_explicit_destination:
264250
destination_table = self.storage_manager.create_temp_table(
265-
adjusted_schema.to_bigquery(), cluster_cols=[]
251+
array_value.schema.to_bigquery(), cluster_cols=[]
266252
)
267253
job_config.destination = destination_table
268254
# TODO(swast): plumb through the api_name of the user-facing api that
@@ -293,12 +279,12 @@ def iterator_supplier():
293279
)
294280
# Runs strict validations to ensure internal type predictions and ibis are completely in sync
295281
# Do not execute these validations outside of testing suite.
296-
if "PYTEST_CURRENT_TEST" in os.environ and len(col_id_overrides) == 0:
282+
if "PYTEST_CURRENT_TEST" in os.environ:
297283
self._validate_result_schema(array_value, iterator.schema)
298284

299285
return ExecuteResult(
300286
arrow_batches=iterator_supplier,
301-
schema=adjusted_schema,
287+
schema=array_value.schema,
302288
query_job=query_job,
303289
total_bytes=size_bytes,
304290
total_rows=iterator.total_rows,
@@ -307,7 +293,6 @@ def iterator_supplier():
307293
def export_gbq(
308294
self,
309295
array_value: bigframes.core.ArrayValue,
310-
col_id_overrides: Mapping[str, str],
311296
destination: bigquery.TableReference,
312297
if_exists: Literal["fail", "replace", "append"] = "fail",
313298
cluster_cols: Sequence[str] = [],
@@ -323,7 +308,7 @@ def export_gbq(
323308
"replace": bigquery.WriteDisposition.WRITE_TRUNCATE,
324309
"append": bigquery.WriteDisposition.WRITE_APPEND,
325310
}
326-
sql = self.to_sql(array_value, ordered=False, col_id_overrides=col_id_overrides)
311+
sql = self.to_sql(array_value, ordered=False)
327312
job_config = bigquery.QueryJobConfig(
328313
write_disposition=dispositions[if_exists],
329314
destination=destination,
@@ -340,15 +325,13 @@ def export_gbq(
340325
def export_gcs(
341326
self,
342327
array_value: bigframes.core.ArrayValue,
343-
col_id_overrides: Mapping[str, str],
344328
uri: str,
345329
format: Literal["json", "csv", "parquet"],
346330
export_options: Mapping[str, Union[bool, str]],
347331
):
348332
query_job = self.execute(
349333
array_value,
350334
ordered=False,
351-
col_id_overrides=col_id_overrides,
352335
use_explicit_destination=True,
353336
).query_job
354337
result_table = query_job.destination
@@ -678,18 +661,3 @@ def generate_head_plan(node: nodes.BigFrameNode, n: int):
678661

679662
def generate_row_count_plan(node: nodes.BigFrameNode):
680663
return nodes.RowCountNode(node)
681-
682-
683-
def override_ids(
684-
node: nodes.BigFrameNode, col_id_overrides: Mapping[str, str]
685-
) -> nodes.SelectionNode:
686-
output_ids = [col_id_overrides.get(id, id) for id in node.schema.names]
687-
return nodes.SelectionNode(
688-
node,
689-
tuple(
690-
nodes.AliasedRef(
691-
expression.DerefOp(old_id), bigframes.core.identifiers.ColumnId(out_id)
692-
)
693-
for old_id, out_id in zip(node.ids, output_ids)
694-
),
695-
)

tests/unit/polars_session.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# limitations under the License.
1414

1515
import dataclasses
16-
from typing import Mapping, Optional, Union
16+
from typing import Optional, Union
1717
import weakref
1818

1919
import polars
@@ -39,7 +39,6 @@ def execute(
3939
array_value: bigframes.core.ArrayValue,
4040
*,
4141
ordered: bool = True,
42-
col_id_overrides: Mapping[str, str] = {},
4342
use_explicit_destination: Optional[bool] = False,
4443
get_size_bytes: bool = False,
4544
page_size: Optional[int] = None,

0 commit comments

Comments
 (0)