Skip to content

Commit 6ad38e8

Browse files
perf: Directly read gbq table for simple plans (#1607)
1 parent aee4159 commit 6ad38e8

File tree

11 files changed

+849
-604
lines changed

11 files changed

+849
-604
lines changed

bigframes/core/nodes.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,16 @@
2020
import functools
2121
import itertools
2222
import typing
23-
from typing import Callable, cast, Iterable, Mapping, Optional, Sequence, Tuple
23+
from typing import (
24+
AbstractSet,
25+
Callable,
26+
cast,
27+
Iterable,
28+
Mapping,
29+
Optional,
30+
Sequence,
31+
Tuple,
32+
)
2433

2534
import google.cloud.bigquery as bq
2635

@@ -572,8 +581,39 @@ def with_id(self, id: identifiers.ColumnId) -> ScanItem:
572581

573582
@dataclasses.dataclass(frozen=True)
574583
class ScanList:
584+
"""
585+
Defines the set of columns to scan from a source, along with the variable to bind the columns to.
586+
"""
587+
575588
items: typing.Tuple[ScanItem, ...]
576589

590+
def filter_cols(
591+
self,
592+
ids: AbstractSet[identifiers.ColumnId],
593+
) -> ScanList:
594+
"""Drop columns from the scan that except those in the 'ids' arg."""
595+
result = ScanList(tuple(item for item in self.items if item.id in ids))
596+
if len(result.items) == 0:
597+
# We need to select something, or sql syntax breaks
598+
result = ScanList(self.items[:1])
599+
return result
600+
601+
def project(
602+
self,
603+
selections: Mapping[identifiers.ColumnId, identifiers.ColumnId],
604+
) -> ScanList:
605+
"""Project given ids from the scanlist, dropping previous bindings."""
606+
by_id = {item.id: item for item in self.items}
607+
result = ScanList(
608+
tuple(
609+
by_id[old_id].with_id(new_id) for old_id, new_id in selections.items()
610+
)
611+
)
612+
if len(result.items) == 0:
613+
# We need to select something, or sql syntax breaks
614+
result = ScanList((self.items[:1]))
615+
return result
616+
577617

578618
@dataclasses.dataclass(frozen=True, eq=False)
579619
class ReadLocalNode(LeafNode):
@@ -675,6 +715,11 @@ def from_table(table: bq.Table, columns: Sequence[str] = ()) -> GbqTable:
675715
else tuple(table.clustering_fields),
676716
)
677717

718+
def get_table_ref(self) -> bq.TableReference:
719+
return bq.TableReference(
720+
bq.DatasetReference(self.project_id, self.dataset_id), self.table_id
721+
)
722+
678723
@property
679724
@functools.cache
680725
def schema_by_id(self):
@@ -1068,6 +1113,11 @@ def variables_introduced(self) -> int:
10681113
# This operation only renames variables, doesn't actually create new ones
10691114
return 0
10701115

1116+
@property
1117+
def has_multi_referenced_ids(self) -> bool:
1118+
referenced = tuple(ref.ref.id for ref in self.input_output_pairs)
1119+
return len(referenced) != len(set(referenced))
1120+
10711121
# TODO: Reuse parent namespace
10721122
# Currently, Selection node allows renaming an reusing existing names, so it must establish a
10731123
# new namespace.

bigframes/core/rewrite/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from bigframes.core.rewrite.legacy_align import legacy_join_as_projection
1818
from bigframes.core.rewrite.order import pull_up_order
1919
from bigframes.core.rewrite.pruning import column_pruning
20+
from bigframes.core.rewrite.scan_reduction import try_reduce_to_table_scan
2021
from bigframes.core.rewrite.slices import pullup_limit_from_slice, rewrite_slice
2122
from bigframes.core.rewrite.timedeltas import rewrite_timedelta_expressions
2223
from bigframes.core.rewrite.windows import rewrite_range_rolling
@@ -31,4 +32,5 @@
3132
"pull_up_order",
3233
"column_pruning",
3334
"rewrite_range_rolling",
35+
"try_reduce_to_table_scan",
3436
]

bigframes/core/rewrite/pruning.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ def prune_readlocal(
170170
node: bigframes.core.nodes.ReadLocalNode,
171171
selection: AbstractSet[identifiers.ColumnId],
172172
) -> bigframes.core.nodes.ReadLocalNode:
173-
new_scan_list = filter_scanlist(node.scan_list, selection)
173+
new_scan_list = node.scan_list.filter_cols(selection)
174174
return dataclasses.replace(
175175
node,
176176
scan_list=new_scan_list,
@@ -183,18 +183,5 @@ def prune_readtable(
183183
node: bigframes.core.nodes.ReadTableNode,
184184
selection: AbstractSet[identifiers.ColumnId],
185185
) -> bigframes.core.nodes.ReadTableNode:
186-
new_scan_list = filter_scanlist(node.scan_list, selection)
186+
new_scan_list = node.scan_list.filter_cols(selection)
187187
return dataclasses.replace(node, scan_list=new_scan_list)
188-
189-
190-
def filter_scanlist(
191-
scanlist: bigframes.core.nodes.ScanList,
192-
ids: AbstractSet[identifiers.ColumnId],
193-
):
194-
result = bigframes.core.nodes.ScanList(
195-
tuple(item for item in scanlist.items if item.id in ids)
196-
)
197-
if len(result.items) == 0:
198-
# We need to select something, or stuff breaks
199-
result = bigframes.core.nodes.ScanList(scanlist.items[:1])
200-
return result
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# Copyright 2025 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
import dataclasses
15+
import functools
16+
from typing import Optional
17+
18+
from bigframes.core import nodes
19+
20+
21+
def try_reduce_to_table_scan(root: nodes.BigFrameNode) -> Optional[nodes.ReadTableNode]:
22+
for node in root.unique_nodes():
23+
if not isinstance(node, (nodes.ReadTableNode, nodes.SelectionNode)):
24+
return None
25+
result = root.bottom_up(merge_scan)
26+
if isinstance(result, nodes.ReadTableNode):
27+
return result
28+
return None
29+
30+
31+
@functools.singledispatch
32+
def merge_scan(node: nodes.BigFrameNode) -> nodes.BigFrameNode:
33+
return node
34+
35+
36+
@merge_scan.register
37+
def _(node: nodes.SelectionNode) -> nodes.BigFrameNode:
38+
if not isinstance(node.child, nodes.ReadTableNode):
39+
return node
40+
if node.has_multi_referenced_ids:
41+
return node
42+
43+
selection = {
44+
aliased_ref.ref.id: aliased_ref.id for aliased_ref in node.input_output_pairs
45+
}
46+
new_scan_list = node.child.scan_list.project(selection)
47+
return dataclasses.replace(node.child, scan_list=new_scan_list)

bigframes/session/__init__.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,10 @@
6464
# to register new and replacement ops with the Ibis BigQuery backend.
6565
import bigframes.functions._function_session as bff_session
6666
import bigframes.functions.function as bff
67-
from bigframes.session import bigquery_session
67+
from bigframes.session import bigquery_session, bq_caching_executor, executor
6868
import bigframes.session._io.bigquery as bf_io_bigquery
6969
import bigframes.session.anonymous_dataset
7070
import bigframes.session.clients
71-
import bigframes.session.executor
7271
import bigframes.session.loader
7372
import bigframes.session.metrics
7473
import bigframes.session.validation
@@ -245,14 +244,12 @@ def __init__(
245244
self._temp_storage_manager = (
246245
self._session_resource_manager or self._anon_dataset_manager
247246
)
248-
self._executor: bigframes.session.executor.Executor = (
249-
bigframes.session.executor.BigQueryCachingExecutor(
250-
bqclient=self._clients_provider.bqclient,
251-
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
252-
storage_manager=self._temp_storage_manager,
253-
strictly_ordered=self._strictly_ordered,
254-
metrics=self._metrics,
255-
)
247+
self._executor: executor.Executor = bq_caching_executor.BigQueryCachingExecutor(
248+
bqclient=self._clients_provider.bqclient,
249+
bqstoragereadclient=self._clients_provider.bqstoragereadclient,
250+
storage_manager=self._temp_storage_manager,
251+
strictly_ordered=self._strictly_ordered,
252+
metrics=self._metrics,
256253
)
257254
self._loader = bigframes.session.loader.GbqDataLoader(
258255
session=self,

0 commit comments

Comments
 (0)