Skip to content

Commit e084e54

Browse files
authored
fix: include index_col when selecting columns and filters in read_gbq_table (#648)
* fix: include `index_col` when selecting `columns` and `filters` in `read_gbq_table` Fixes internal issue 339430305 * feat: warn with a more specific `DefaultLocationWarning` category when no location can be detected (#648) test: refactor `read_gbq` / `read_gbq_table` tests to test with all parameters combined (#648) refactor: move query generation code to BigQuery I/O module (#648)
1 parent bcc054b commit e084e54

File tree

8 files changed

+349
-223
lines changed

8 files changed

+349
-223
lines changed

bigframes/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
# NOTE: This module should not depend on any others in the package.
1818

1919

20+
# Uses UserWarning for backwards compatibility with warning without a category
21+
# set.
22+
class DefaultLocationWarning(UserWarning):
23+
"""No location was specified, so using a default one."""
24+
25+
2026
class UnknownLocationWarning(Warning):
2127
"""The location is set to an unknown value."""
2228

bigframes/pandas/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import bigframes.operations as ops
6868
import bigframes.series
6969
import bigframes.session
70+
import bigframes.session._io.bigquery
7071
import bigframes.session.clients
7172

7273

@@ -391,7 +392,7 @@ def _set_default_session_location_if_possible(query):
391392

392393
bqclient = clients_provider.bqclient
393394

394-
if bigframes.session._is_query(query):
395+
if bigframes.session._io.bigquery.is_query(query):
395396
job = bqclient.query(query, bigquery.QueryJobConfig(dry_run=True))
396397
options.bigquery.location = job.location
397398
else:

bigframes/session/__init__.py

Lines changed: 49 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
import datetime
2121
import logging
2222
import os
23-
import re
2423
import secrets
2524
import typing
2625
from typing import (
@@ -86,10 +85,11 @@
8685
import bigframes.core.tree_properties as tree_properties
8786
import bigframes.core.utils as utils
8887
import bigframes.dtypes
88+
import bigframes.exceptions
8989
import bigframes.formatting_helpers as formatting_helpers
9090
from bigframes.functions.remote_function import read_gbq_function as bigframes_rgf
9191
from bigframes.functions.remote_function import remote_function as bigframes_rf
92-
import bigframes.session._io.bigquery as bigframes_io
92+
import bigframes.session._io.bigquery as bf_io_bigquery
9393
import bigframes.session._io.bigquery.read_gbq_table as bf_read_gbq_table
9494
import bigframes.session.clients
9595
import bigframes.version
@@ -145,14 +145,18 @@
145145
)
146146

147147

148-
def _is_query(query_or_table: str) -> bool:
149-
"""Determine if `query_or_table` is a table ID or a SQL string"""
150-
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None
148+
def _to_index_cols(
149+
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
150+
) -> List[str]:
151+
"""Convert index_col into a list of column names."""
152+
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
153+
index_cols: List[str] = []
154+
elif isinstance(index_col, str):
155+
index_cols = [index_col]
156+
else:
157+
index_cols = list(index_col)
151158

152-
153-
def _is_table_with_wildcard_suffix(query_or_table: str) -> bool:
154-
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
155-
return not _is_query(query_or_table) and query_or_table.endswith("*")
159+
return index_cols
156160

157161

158162
class Session(
@@ -181,12 +185,26 @@ def __init__(
181185
if context is None:
182186
context = bigquery_options.BigQueryOptions()
183187

184-
# TODO(swast): Get location from the environment.
185188
if context.location is None:
186189
self._location = "US"
187190
warnings.warn(
188191
f"No explicit location is set, so using location {self._location} for the session.",
189-
stacklevel=2,
192+
# User's code
193+
# -> get_global_session()
194+
# -> connect()
195+
# -> Session()
196+
#
197+
# Note: We could also have:
198+
# User's code
199+
# -> read_gbq()
200+
# -> with_default_session()
201+
# -> get_global_session()
202+
# -> connect()
203+
# -> Session()
204+
# but we currently have no way to disambiguate these
205+
# situations.
206+
stacklevel=4,
207+
category=bigframes.exceptions.DefaultLocationWarning,
190208
)
191209
else:
192210
self._location = context.location
@@ -322,13 +340,19 @@ def read_gbq(
322340
columns = col_order
323341

324342
filters = list(filters)
325-
if len(filters) != 0 or _is_table_with_wildcard_suffix(query_or_table):
343+
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(
344+
query_or_table
345+
):
326346
# TODO(b/338111344): This appears to be missing index_cols, which
327347
# are necessary to be selected.
328-
# TODO(b/338039517): Also, need to account for primary keys.
329-
query_or_table = self._to_query(query_or_table, columns, filters)
348+
# TODO(b/338039517): Refactor this to be called inside both
349+
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
350+
# so we can make sure index_col/index_cols reflects primary keys.
351+
query_or_table = bf_io_bigquery.to_query(
352+
query_or_table, _to_index_cols(index_col), columns, filters
353+
)
330354

331-
if _is_query(query_or_table):
355+
if bf_io_bigquery.is_query(query_or_table):
332356
return self._read_gbq_query(
333357
query_or_table,
334358
index_col=index_col,
@@ -355,85 +379,6 @@ def read_gbq(
355379
use_cache=use_cache if use_cache is not None else True,
356380
)
357381

358-
def _to_query(
359-
self,
360-
query_or_table: str,
361-
columns: Iterable[str],
362-
filters: third_party_pandas_gbq.FiltersType,
363-
) -> str:
364-
"""Compile query_or_table with conditions(filters, wildcards) to query."""
365-
filters = list(filters)
366-
sub_query = (
367-
f"({query_or_table})"
368-
if _is_query(query_or_table)
369-
else f"`{query_or_table}`"
370-
)
371-
372-
# TODO(b/338111344): Generate an index based on DefaultIndexKind if we
373-
# don't have index columns specified.
374-
select_clause = "SELECT " + (
375-
", ".join(f"`{column}`" for column in columns) if columns else "*"
376-
)
377-
378-
where_clause = ""
379-
if filters:
380-
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
381-
"in": "IN",
382-
"not in": "NOT IN",
383-
"LIKE": "LIKE",
384-
"==": "=",
385-
">": ">",
386-
"<": "<",
387-
">=": ">=",
388-
"<=": "<=",
389-
"!=": "!=",
390-
}
391-
392-
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
393-
if isinstance(filters[0], tuple) and (
394-
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
395-
):
396-
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])
397-
398-
or_expressions = []
399-
for group in filters:
400-
if not isinstance(group, Iterable):
401-
group = [group]
402-
403-
and_expressions = []
404-
for filter_item in group:
405-
if not isinstance(filter_item, tuple) or (len(filter_item) != 3):
406-
raise ValueError(
407-
f"Filter condition should be a tuple of length 3, {filter_item} is not valid."
408-
)
409-
410-
column, operator, value = filter_item
411-
412-
if not isinstance(column, str):
413-
raise ValueError(
414-
f"Column name should be a string, but received '{column}' of type {type(column).__name__}."
415-
)
416-
417-
if operator not in valid_operators:
418-
raise ValueError(f"Operator {operator} is not valid.")
419-
420-
operator_str = valid_operators[operator]
421-
422-
if operator_str in ["IN", "NOT IN"]:
423-
value_list = ", ".join([repr(v) for v in value])
424-
expression = f"`{column}` {operator_str} ({value_list})"
425-
else:
426-
expression = f"`{column}` {operator_str} {repr(value)}"
427-
and_expressions.append(expression)
428-
429-
or_expressions.append(" AND ".join(and_expressions))
430-
431-
if or_expressions:
432-
where_clause = " WHERE " + " OR ".join(or_expressions)
433-
434-
full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}"
435-
return full_query
436-
437382
def _query_to_destination(
438383
self,
439384
query: str,
@@ -610,12 +555,7 @@ def _read_gbq_query(
610555
True if use_cache is None else use_cache
611556
)
612557

613-
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
614-
index_cols = []
615-
elif isinstance(index_col, str):
616-
index_cols = [index_col]
617-
else:
618-
index_cols = list(index_col)
558+
index_cols = _to_index_cols(index_col)
619559

620560
destination, query_job = self._query_to_destination(
621561
query,
@@ -682,8 +622,13 @@ def read_gbq_table(
682622
columns = col_order
683623

684624
filters = list(filters)
685-
if len(filters) != 0 or _is_table_with_wildcard_suffix(query):
686-
query = self._to_query(query, columns, filters)
625+
if len(filters) != 0 or bf_io_bigquery.is_table_with_wildcard_suffix(query):
626+
# TODO(b/338039517): Refactor this to be called inside both
627+
# _read_gbq_query and _read_gbq_table (after detecting primary keys)
628+
# so we can make sure index_col/index_cols reflects primary keys.
629+
query = bf_io_bigquery.to_query(
630+
query, _to_index_cols(index_col), columns, filters
631+
)
687632

688633
return self._read_gbq_query(
689634
query,
@@ -838,12 +783,7 @@ def _read_bigquery_load_job(
838783
index_col: Iterable[str] | str | bigframes.enums.DefaultIndexKind = (),
839784
columns: Iterable[str] = (),
840785
) -> dataframe.DataFrame:
841-
if isinstance(index_col, bigframes.enums.DefaultIndexKind):
842-
index_cols = []
843-
elif isinstance(index_col, str):
844-
index_cols = [index_col]
845-
else:
846-
index_cols = list(index_col)
786+
index_cols = _to_index_cols(index_col)
847787

848788
if not job_config.clustering_fields and index_cols:
849789
job_config.clustering_fields = index_cols[:_MAX_CLUSTER_COLUMNS]
@@ -1430,7 +1370,7 @@ def _create_empty_temp_table(
14301370
datetime.datetime.now(datetime.timezone.utc) + constants.DEFAULT_EXPIRATION
14311371
)
14321372

1433-
table = bigframes_io.create_temp_table(
1373+
table = bf_io_bigquery.create_temp_table(
14341374
self,
14351375
expiration,
14361376
schema=schema,

bigframes/session/_io/bigquery/__init__.py

Lines changed: 96 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,13 @@
1919
import datetime
2020
import itertools
2121
import os
22+
import re
2223
import textwrap
2324
import types
24-
from typing import Dict, Iterable, Optional, Sequence, Tuple, Union
25+
import typing
26+
from typing import Dict, Iterable, Mapping, Optional, Sequence, Tuple, Union
2527

28+
import bigframes_vendored.pandas.io.gbq as third_party_pandas_gbq
2629
import google.api_core.exceptions
2730
import google.cloud.bigquery as bigquery
2831

@@ -311,3 +314,95 @@ def create_bq_dataset_reference(
311314
query_destination.project,
312315
query_destination.dataset_id,
313316
)
317+
318+
319+
def is_query(query_or_table: str) -> bool:
320+
"""Determine if `query_or_table` is a table ID or a SQL string"""
321+
return re.search(r"\s", query_or_table.strip(), re.MULTILINE) is not None
322+
323+
324+
def is_table_with_wildcard_suffix(query_or_table: str) -> bool:
325+
"""Determine if `query_or_table` is a table and contains a wildcard suffix."""
326+
return not is_query(query_or_table) and query_or_table.endswith("*")
327+
328+
329+
def to_query(
330+
query_or_table: str,
331+
index_cols: Iterable[str],
332+
columns: Iterable[str],
333+
filters: third_party_pandas_gbq.FiltersType,
334+
) -> str:
335+
"""Compile query_or_table with conditions(filters, wildcards) to query."""
336+
filters = list(filters)
337+
sub_query = (
338+
f"({query_or_table})" if is_query(query_or_table) else f"`{query_or_table}`"
339+
)
340+
341+
# TODO(b/338111344): Generate an index based on DefaultIndexKind if we
342+
# don't have index columns specified.
343+
if columns:
344+
# We only reduce the selection if columns is set, but we always
345+
# want to make sure index_cols is also included.
346+
all_columns = itertools.chain(index_cols, columns)
347+
select_clause = "SELECT " + ", ".join(f"`{column}`" for column in all_columns)
348+
else:
349+
select_clause = "SELECT *"
350+
351+
where_clause = ""
352+
if filters:
353+
valid_operators: Mapping[third_party_pandas_gbq.FilterOps, str] = {
354+
"in": "IN",
355+
"not in": "NOT IN",
356+
"LIKE": "LIKE",
357+
"==": "=",
358+
">": ">",
359+
"<": "<",
360+
">=": ">=",
361+
"<=": "<=",
362+
"!=": "!=",
363+
}
364+
365+
# If single layer filter, add another pseudo layer. So the single layer represents "and" logic.
366+
if isinstance(filters[0], tuple) and (
367+
len(filters[0]) == 0 or not isinstance(list(filters[0])[0], tuple)
368+
):
369+
filters = typing.cast(third_party_pandas_gbq.FiltersType, [filters])
370+
371+
or_expressions = []
372+
for group in filters:
373+
if not isinstance(group, Iterable):
374+
group = [group]
375+
376+
and_expressions = []
377+
for filter_item in group:
378+
if not isinstance(filter_item, tuple) or (len(filter_item) != 3):
379+
raise ValueError(
380+
f"Filter condition should be a tuple of length 3, {filter_item} is not valid."
381+
)
382+
383+
column, operator, value = filter_item
384+
385+
if not isinstance(column, str):
386+
raise ValueError(
387+
f"Column name should be a string, but received '{column}' of type {type(column).__name__}."
388+
)
389+
390+
if operator not in valid_operators:
391+
raise ValueError(f"Operator {operator} is not valid.")
392+
393+
operator_str = valid_operators[operator]
394+
395+
if operator_str in ["IN", "NOT IN"]:
396+
value_list = ", ".join([repr(v) for v in value])
397+
expression = f"`{column}` {operator_str} ({value_list})"
398+
else:
399+
expression = f"`{column}` {operator_str} {repr(value)}"
400+
and_expressions.append(expression)
401+
402+
or_expressions.append(" AND ".join(and_expressions))
403+
404+
if or_expressions:
405+
where_clause = " WHERE " + " OR ".join(or_expressions)
406+
407+
full_query = f"{select_clause} FROM {sub_query} AS sub{where_clause}"
408+
return full_query

0 commit comments

Comments
 (0)