Skip to content

Commit f7e4354

Browse files
perf: Reduce dry runs from read_gbq with table (#1129)
* perf: Reduce dry runs from read_gbq with table * add check for table creation time to not travel before that * handle table.created is None
1 parent 8033dc5 commit f7e4354

File tree

3 files changed

+58
-46
lines changed

3 files changed

+58
-46
lines changed

bigframes/session/_io/bigquery/read_gbq_table.py

Lines changed: 45 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ def get_table_metadata(
7777
return cached_table
7878

7979
table = bqclient.get_table(table_ref)
80+
# local time will lag a little bit do to network latency
81+
# make sure it is at least table creation time.
82+
# This is relevant if the table was created immediately before loading it here.
83+
if (table.created is not None) and (table.created > bq_time):
84+
bq_time = table.created
8085

8186
cached_table = (bq_time, table)
8287
cache[table_ref] = cached_table
@@ -85,64 +90,66 @@ def get_table_metadata(
8590

8691
def validate_table(
8792
bqclient: bigquery.Client,
88-
table_ref: bigquery.table.TableReference,
93+
table: bigquery.table.Table,
8994
columns: Optional[Sequence[str]],
9095
snapshot_time: datetime.datetime,
91-
table_type: str,
9296
filter_str: Optional[str] = None,
9397
) -> bool:
9498
"""Validates that the table can be read, returns True iff snapshot is supported."""
95-
# First run without snapshot to verify table can be read
96-
sql = bigframes.session._io.bigquery.to_query(
97-
query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}",
98-
columns=columns or (),
99-
sql_predicate=filter_str,
100-
)
101-
dry_run_config = bigquery.QueryJobConfig()
102-
dry_run_config.dry_run = True
103-
try:
104-
bqclient.query_and_wait(sql, job_config=dry_run_config)
105-
except google.api_core.exceptions.Forbidden as ex:
106-
if "Drive credentials" in ex.message:
107-
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
108-
raise
10999

100+
time_travel_not_found = False
110101
# Anonymous dataset, does not support snapshot ever
111-
if table_ref.dataset_id.startswith("_"):
112-
return False
113-
114-
# Materialized views,does not support snapshot
115-
if table_type == "MATERIALIZED_VIEW":
116-
warnings.warn(
117-
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
118-
"Attempting query without time travel. Be aware that as materialized views "
119-
"are updated periodically, modifications to the underlying data in the view may "
120-
"result in errors or unexpected behavior.",
121-
category=bigframes.exceptions.TimeTravelDisabledWarning,
102+
if table.dataset_id.startswith("_"):
103+
pass
104+
# Only true tables support time travel
105+
elif table.table_type != "TABLE":
106+
if table.table_type == "MATERIALIZED_VIEW":
107+
warnings.warn(
108+
"Materialized views do not support FOR SYSTEM_TIME AS OF queries. "
109+
"Attempting query without time travel. Be aware that as materialized views "
110+
"are updated periodically, modifications to the underlying data in the view may "
111+
"result in errors or unexpected behavior.",
112+
category=bigframes.exceptions.TimeTravelDisabledWarning,
113+
)
114+
else:
115+
# table might support time travel, lets do a dry-run query with time travel
116+
snapshot_sql = bigframes.session._io.bigquery.to_query(
117+
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
118+
columns=columns or (),
119+
sql_predicate=filter_str,
120+
time_travel_timestamp=snapshot_time,
122121
)
123-
return False
122+
try:
123+
# If this succeeds, we don't need to query without time travel, that would surely succeed
124+
bqclient.query_and_wait(
125+
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
126+
)
127+
return True
128+
except google.api_core.exceptions.NotFound:
129+
# note that a notfound caused by a simple typo will be
130+
# caught above when the metadata is fetched, not here
131+
time_travel_not_found = True
124132

125-
# Second, try with snapshot to verify table supports this feature
133+
# At this point, time travel is known to fail, but can we query without time travel?
126134
snapshot_sql = bigframes.session._io.bigquery.to_query(
127-
query_or_table=f"{table_ref.project}.{table_ref.dataset_id}.{table_ref.table_id}",
135+
query_or_table=f"{table.reference.project}.{table.reference.dataset_id}.{table.reference.table_id}",
128136
columns=columns or (),
129137
sql_predicate=filter_str,
130-
time_travel_timestamp=snapshot_time,
138+
time_travel_timestamp=None,
131139
)
132-
try:
133-
bqclient.query_and_wait(snapshot_sql, job_config=dry_run_config)
134-
return True
135-
except google.api_core.exceptions.NotFound:
136-
# note that a notfound caused by a simple typo will be
137-
# caught above when the metadata is fetched, not here
140+
# Any erorrs here should just be raised to user
141+
bqclient.query_and_wait(
142+
snapshot_sql, job_config=bigquery.QueryJobConfig(dry_run=True)
143+
)
144+
if time_travel_not_found:
138145
warnings.warn(
139146
"NotFound error when reading table with time travel."
140147
" Attempting query without time travel. Warning: Without"
141148
" time travel, modifications to the underlying table may"
142149
" result in errors or unexpected behavior.",
143150
category=bigframes.exceptions.TimeTravelDisabledWarning,
144151
)
145-
return False
152+
return False
146153

147154

148155
def are_index_cols_unique(

bigframes/session/loader.py

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -343,14 +343,18 @@ def read_gbq_table(
343343
else (*columns, *[col for col in index_cols if col not in columns])
344344
)
345345

346-
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
347-
self._bqclient,
348-
table_ref,
349-
all_columns,
350-
time_travel_timestamp,
351-
table.table_type,
352-
filter_str,
353-
)
346+
try:
347+
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
348+
self._bqclient,
349+
table,
350+
all_columns,
351+
time_travel_timestamp,
352+
filter_str,
353+
)
354+
except google.api_core.exceptions.Forbidden as ex:
355+
if "Drive credentials" in ex.message:
356+
ex.message += "\nCheck https://cloud.google.com/bigquery/docs/query-drive-data#Google_Drive_permissions."
357+
raise
354358

355359
# ----------------------------
356360
# Create ordering and validate

tests/unit/session/test_session.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@ def test_read_gbq_cached_table():
181181
table._properties["location"] = session._location
182182
table._properties["numRows"] = "1000000000"
183183
table._properties["location"] = session._location
184+
table._properties["type"] = "TABLE"
184185
session._loader._df_snapshot[table_ref] = (
185186
datetime.datetime(1999, 1, 2, 3, 4, 5, 678901, tzinfo=datetime.timezone.utc),
186187
table,

0 commit comments

Comments
 (0)