Skip to content

Commit d17b711

Browse files
authored
perf: remove an unnecessary extra dry_run query from read_gbq_table (#1972)
Also, removes some unnecessary warnings from SQL Cell code paths.
1 parent d38e42c commit d17b711

File tree

5 files changed

+178
-84
lines changed

5 files changed

+178
-84
lines changed

bigframes/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ class TimeTravelDisabledWarning(Warning):
7979
"""A query was reattempted without time travel."""
8080

8181

82+
class TimeTravelCacheWarning(Warning):
83+
"""Reads from the same table twice in the same session pull time travel from cache."""
84+
85+
8286
class AmbiguousWindowWarning(Warning):
8387
"""A query may produce nondeterministic results as the window may be ambiguously ordered."""
8488

bigframes/pandas/io/api.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
Tuple,
3434
Union,
3535
)
36+
import warnings
3637

3738
import bigframes_vendored.constants as constants
3839
import bigframes_vendored.pandas.io.gbq as vendored_pandas_gbq
@@ -348,7 +349,11 @@ def _read_gbq_colab(
348349
)
349350
_set_default_session_location_if_possible_deferred_query(create_query)
350351
if not config.options.bigquery._session_started:
351-
config.options.bigquery.enable_polars_execution = True
352+
with warnings.catch_warnings():
353+
# Don't warning about Polars in SQL cell.
354+
# Related to b/437090788.
355+
warnings.simplefilter("ignore", bigframes.exceptions.PreviewWarning)
356+
config.options.bigquery.enable_polars_execution = True
352357

353358
return global_session.with_default_session(
354359
bigframes.session.Session._read_gbq_colab,

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 123 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -54,26 +54,43 @@ def get_table_metadata(
5454

5555
cached_table = cache.get(table_ref)
5656
if use_cache and cached_table is not None:
57-
snapshot_timestamp, _ = cached_table
58-
59-
# Cache hit could be unexpected. See internal issue 329545805.
60-
# Raise a warning with more information about how to avoid the
61-
# problems with the cache.
62-
msg = bfe.format_message(
63-
f"Reading cached table from {snapshot_timestamp} to avoid "
64-
"incompatibilies with previous reads of this table. To read "
65-
"the latest version, set `use_cache=False` or close the "
66-
"current session with Session.close() or "
67-
"bigframes.pandas.close_session()."
68-
)
69-
# There are many layers before we get to (possibly) the user's code:
70-
# pandas.read_gbq_table
71-
# -> with_default_session
72-
# -> Session.read_gbq_table
73-
# -> _read_gbq_table
74-
# -> _get_snapshot_sql_and_primary_key
75-
# -> get_snapshot_datetime_and_table_metadata
76-
warnings.warn(msg, stacklevel=7)
57+
snapshot_timestamp, table = cached_table
58+
59+
if is_time_travel_eligible(
60+
bqclient=bqclient,
61+
table=table,
62+
columns=None,
63+
snapshot_time=snapshot_timestamp,
64+
filter_str=None,
65+
# Don't warn, because that will already have been taken care of.
66+
should_warn=False,
67+
should_dry_run=False,
68+
):
69+
# This warning should only happen if the cached snapshot_time will
70+
# have any effect on bigframes (b/437090788). For example, with
71+
# cached query results, such as after re-running a query, time
72+
# travel won't be applied and thus this check is irrelevent.
73+
#
74+
# In other cases, such as an explicit read_gbq_table(), Cache hit
75+
# could be unexpected. See internal issue 329545805. Raise a
76+
# warning with more information about how to avoid the problems
77+
# with the cache.
78+
msg = bfe.format_message(
79+
f"Reading cached table from {snapshot_timestamp} to avoid "
80+
"incompatibilies with previous reads of this table. To read "
81+
"the latest version, set `use_cache=False` or close the "
82+
"current session with Session.close() or "
83+
"bigframes.pandas.close_session()."
84+
)
85+
# There are many layers before we get to (possibly) the user's code:
86+
# pandas.read_gbq_table
87+
# -> with_default_session
88+
# -> Session.read_gbq_table
89+
# -> _read_gbq_table
90+
# -> _get_snapshot_sql_and_primary_key
91+
# -> get_snapshot_datetime_and_table_metadata
92+
warnings.warn(msg, category=bfe.TimeTravelCacheWarning, stacklevel=7)
93+
7794
return cached_table
7895

7996
table = bqclient.get_table(table_ref)
@@ -88,77 +105,114 @@ def get_table_metadata(
88105
return cached_table
89106

90107

91-
def validate_table(
108+
def is_time_travel_eligible(
92109
bqclient: bigquery.Client,
93110
table: bigquery.table.Table,
94111
columns: Optional[Sequence[str]],
95112
snapshot_time: datetime.datetime,
96113
filter_str: Optional[str] = None,
97-
) -> bool:
98-
"""Validates that the table can be read, returns True iff snapshot is supported."""
114+
*,
115+
should_warn: bool,
116+
should_dry_run: bool,
117+
):
118+
"""Check if a table is eligible to use time-travel.
119+
120+
121+
Args:
122+
table: BigQuery table to check.
123+
should_warn:
124+
If true, raises a warning when time travel is disabled and the
125+
underlying table is likely mutable.
126+
127+
Return:
128+
bool:
129+
True if there is a chance that time travel may be supported on this
130+
table. If ``should_dry_run`` is True, then this is validated with a
131+
``dry_run`` query.
132+
"""
133+
134+
# user code
135+
# -> pandas.read_gbq_table
136+
# -> with_default_session
137+
# -> session.read_gbq_table
138+
# -> session._read_gbq_table
139+
# -> loader.read_gbq_table
140+
# -> is_time_travel_eligible
141+
stacklevel = 7
99142

100-
time_travel_not_found = False
101143
# Anonymous dataset, does not support snapshot ever
102144
if table.dataset_id.startswith("_"):
103-
pass
145+
return False
104146

105147
# Only true tables support time travel
106-
elif table.table_id.endswith("*"):
107-
msg = bfe.format_message(
108-
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
109-
"Attempting query without time travel. Be aware that "
110-
"modifications to the underlying data may result in errors or "
111-
"unexpected behavior."
112-
)
113-
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
114-
elif table.table_type != "TABLE":
115-
if table.table_type == "MATERIALIZED_VIEW":
148+
if table.table_id.endswith("*"):
149+
if should_warn:
116150
msg = bfe.format_message(
117-
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
118-
"Attempting query without time travel. Be aware that as materialized views "
119-
"are updated periodically, modifications to the underlying data in the view may "
120-
"result in errors or unexpected behavior."
151+
"Wildcard tables do not support FOR SYSTEM_TIME AS OF queries. "
152+
"Attempting query without time travel. Be aware that "
153+
"modifications to the underlying data may result in errors or "
154+
"unexpected behavior."
121155
)
122-
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
123-
else:
124-
# table might support time travel, lets do a dry-run query with time travel
156+
warnings.warn(
157+
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
158+
)
159+
return False
160+
elif table.table_type != "TABLE":
161+
if table.table_type == "MATERIALIZED_VIEW":
162+
if should_warn:
163+
msg = bfe.format_message(
164+
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
165+
"Attempting query without time travel. Be aware that as materialized views "
166+
"are updated periodically, modifications to the underlying data in the view may "
167+
"result in errors or unexpected behavior."
168+
)
169+
warnings.warn(
170+
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
171+
)
172+
return False
173+
174+
# table might support time travel, lets do a dry-run query with time travel
175+
if should_dry_run:
125176
snapshot_sql = bigframes.session._io.bigquery.to_query(
126177
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
127178
columns=columns or (),
128179
sql_predicate=filter_str,
129180
time_travel_timestamp=snapshot_time,
130181
)
131182
try:
132-
# If this succeeds, we don't need to query without time travel, that would surely succeed
133-
bqclient.query_and_wait(
134-
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
183+
# If this succeeds, we know that time travel will for sure work.
184+
bigframes.session._io.bigquery.start_query_with_client(
185+
bq_client=bqclient,
186+
sql=snapshot_sql,
187+
job_config=bigquery.QueryJobConfig(dry_run=True),
188+
location=None,
189+
project=None,
190+
timeout=None,
191+
metrics=None,
192+
query_with_job=False,
135193
)
136194
return True
195+
137196
except google.api_core.exceptions.NotFound:
138-
# note that a notfound caused by a simple typo will be
139-
# caught above when the metadata is fetched, not here
140-
time_travel_not_found = True
141-
142-
# At this point, time travel is known to fail, but can we query without time travel?
143-
snapshot_sql = bigframes.session._io.bigquery.to_query(
144-
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
145-
columns=columns or (),
146-
sql_predicate=filter_str,
147-
time_travel_timestamp=None,
148-
)
149-
# Any errors here should just be raised to user
150-
bqclient.query_and_wait(
151-
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
152-
)
153-
if time_travel_not_found:
154-
msg = bfe.format_message(
155-
"NotFound error when reading table with time travel."
156-
" Attempting query without time travel. Warning: Without"
157-
" time travel, modifications to the underlying table may"
158-
" result in errors or unexpected behavior."
159-
)
160-
warnings.warn(msg, category=bfe.TimeTravelDisabledWarning)
161-
return False
197+
# If system time isn't supported, it returns NotFound error?
198+
# Note that a notfound caused by a simple typo will be
199+
# caught above when the metadata is fetched, not here.
200+
if should_warn:
201+
msg = bfe.format_message(
202+
"NotFound error when reading table with time travel."
203+
" Attempting query without time travel. Warning: Without"
204+
" time travel, modifications to the underlying table may"
205+
" result in errors or unexpected behavior."
206+
)
207+
warnings.warn(
208+
msg, category=bfe.TimeTravelDisabledWarning, stacklevel=stacklevel
209+
)
210+
211+
# If we make it to here, we know for sure that time travel won't work.
212+
return False
213+
else:
214+
# We haven't validated it, but there's a chance that time travel could work.
215+
return True
162216

163217

164218
def infer_unique_columns(

bigframes/session/loader.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -744,18 +744,15 @@ def read_gbq_table(
744744
else (*columns, *[col for col in index_cols if col not in columns])
745745
)
746746

747-
try:
748-
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
749-
self._bqclient,
750-
table,
751-
all_columns,
752-
time_travel_timestamp,
753-
filter_str,
754-
)
755-
except google.api_core.exceptions.Forbidden as ex:
756-
if "Drive credentials" in ex.message:
757-
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
758-
raise
747+
enable_snapshot = enable_snapshot and bf_read_gbq_table.is_time_travel_eligible(
748+
self._bqclient,
749+
table,
750+
all_columns,
751+
time_travel_timestamp,
752+
filter_str,
753+
should_warn=True,
754+
should_dry_run=True,
755+
)
759756

760757
# ----------------------------
761758
# Create ordering and validate

tests/unit/session/test_session.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,46 @@ def test_read_gbq_cached_table():
252252
)
253253
session.bqclient.get_table.return_value = table
254254

255-
with pytest.warns(UserWarning, match=re.escape("use_cache=False")):
255+
with pytest.warns(
256+
bigframes.exceptions.TimeTravelCacheWarning, match=re.escape("use_cache=False")
257+
):
256258
df = session.read_gbq("my-project.my_dataset.my_table")
257259

258260
assert "1999-01-02T03:04:05.678901" in df.sql
259261

260262

263+
def test_read_gbq_cached_table_doesnt_warn_for_anonymous_tables_and_doesnt_include_time_travel():
264+
session = mocks.create_bigquery_session()
265+
table_ref = google.cloud.bigquery.TableReference(
266+
google.cloud.bigquery.DatasetReference("my-project", "_anonymous_dataset"),
267+
"my_table",
268+
)
269+
table = google.cloud.bigquery.Table(
270+
table_ref, (google.cloud.bigquery.SchemaField("col", "INTEGER"),)
271+
)
272+
table._properties["location"] = session._location
273+
table._properties["numRows"] = "1000000000"
274+
table._properties["location"] = session._location
275+
table._properties["type"] = "TABLE"
276+
session._loader._df_snapshot[table_ref] = (
277+
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
278+
table,
279+
)
280+
281+
session.bqclient.query_and_wait = mock.MagicMock(
282+
return_value=({"total_count": 3, "distinct_count": 2},)
283+
)
284+
session.bqclient.get_table.return_value = table
285+
286+
with warnings.catch_warnings():
287+
warnings.simplefilter(
288+
"error", category=bigframes.exceptions.TimeTravelCacheWarning
289+
)
290+
df = session.read_gbq("my-project._anonymous_dataset.my_table")
291+
292+
assert "1999-01-02T03:04:05.678901" not in df.sql
293+
294+
261295
@pytest.mark.parametrize("table", CLUSTERED_OR_PARTITIONED_TABLES)
262296
def test_default_index_warning_raised_by_read_gbq(table):
263297
"""Because of the windowing operation to create a default index, row
@@ -474,7 +508,7 @@ def get_table_mock(table_ref):
474508
google.api_core.exceptions.Forbidden,
475509
match="Check https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions.",
476510
):
477-
api(query_or_table)
511+
api(query_or_table).to_pandas()
478512

479513

480514
@mock.patch.dict(os.environ, {}, clear=True)

0 commit comments

Comments
 (0)