Skip to content

Commit 0afbcec

Browse files
refactor: create internal session ordering mode flag (#772)
1 parent 2de4129 commit 0afbcec

File tree

10 files changed

+87
-33
lines changed

10 files changed

+87
-33
lines changed

bigframes/_config/bigquery_options.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ def __init__(
7070
application_name: Optional[str] = None,
7171
kms_key_name: Optional[str] = None,
7272
skip_bq_connection_check: bool = False,
73+
*,
74+
_strictly_ordered: bool = True,
7375
):
7476
self._credentials = credentials
7577
self._project = project
@@ -80,6 +82,8 @@ def __init__(
8082
self._kms_key_name = kms_key_name
8183
self._skip_bq_connection_check = skip_bq_connection_check
8284
self._session_started = False
85+
# Determines the ordering strictness for the session. For internal use only.
86+
self._strictly_ordered_internal = _strictly_ordered
8387

8488
@property
8589
def application_name(self) -> Optional[str]:
@@ -235,3 +239,8 @@ def kms_key_name(self, value: str):
235239
raise ValueError(SESSION_STARTED_MESSAGE.format(attribute="kms_key_name"))
236240

237241
self._kms_key_name = value
242+
243+
@property
244+
def _strictly_ordered(self) -> bool:
245+
"""Internal use only. Controls whether total row order is always maintained for DataFrame/Series."""
246+
return self._strictly_ordered_internal

bigframes/core/blocks.py

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ def shape(self) -> typing.Tuple[int, int]:
209209
except Exception:
210210
pass
211211

212-
iter, _ = self.session._execute(row_count_expr, sorted=False)
212+
iter, _ = self.session._execute(row_count_expr, ordered=False)
213213
row_count = next(iter)[0]
214214
return (row_count, len(self.value_columns))
215215

@@ -518,7 +518,7 @@ def to_pandas_batches(
518518
dtypes = dict(zip(self.index_columns, self.index.dtypes))
519519
dtypes.update(zip(self.value_columns, self.dtypes))
520520
_, query_job = self.session._query_to_destination(
521-
self.session._to_sql(self.expr, sorted=True),
521+
self.session._to_sql(self.expr, ordered=self.session._strictly_ordered),
522522
list(self.index_columns),
523523
api_name="cached",
524524
do_clustering=False,
@@ -553,7 +553,7 @@ def _materialize_local(
553553
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
554554
# TODO(swast): Allow for dry run and timeout.
555555
_, query_job = self.session._query_to_destination(
556-
self.session._to_sql(self.expr, sorted=materialize_options.ordered),
556+
self.session._to_sql(self.expr, ordered=materialize_options.ordered),
557557
list(self.index_columns),
558558
api_name="cached",
559559
do_clustering=False,
@@ -1736,7 +1736,7 @@ def transpose(
17361736
original_row_index = (
17371737
original_row_index
17381738
if original_row_index is not None
1739-
else self.index.to_pandas()
1739+
else self.index.to_pandas(ordered=True)
17401740
)
17411741
original_row_count = len(original_row_index)
17421742
if original_row_count > bigframes.constants.MAX_COLUMNS:
@@ -2507,7 +2507,7 @@ def column_ids(self) -> Sequence[str]:
25072507
"""Column(s) to use as row labels."""
25082508
return self._block._index_columns
25092509

2510-
def to_pandas(self) -> pd.Index:
2510+
def to_pandas(self, *, ordered: Optional[bool] = None) -> pd.Index:
25112511
"""Executes deferred operations and downloads the results."""
25122512
if len(self.column_ids) == 0:
25132513
raise bigframes.exceptions.NullIndexError(
@@ -2517,7 +2517,12 @@ def to_pandas(self) -> pd.Index:
25172517
index_columns = list(self._block.index_columns)
25182518
dtypes = dict(zip(index_columns, self.dtypes))
25192519
expr = self._expr.select_columns(index_columns)
2520-
results, _ = self.session._execute(expr)
2520+
results, _ = self.session._execute(
2521+
expr,
2522+
ordered=ordered
2523+
if (ordered is not None)
2524+
else self.session._strictly_ordered,
2525+
)
25212526
df = expr.session._rows_to_dataframe(results, dtypes)
25222527
df = df.set_index(index_columns)
25232528
index = df.index

bigframes/core/compile/api.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def compile_ordered(
4040
) -> str:
4141
"""Compile node into sql where rows are sorted with ORDER BY."""
4242
return compiler.compile_ordered_ir(node).to_sql(
43-
col_id_overrides=col_id_overrides, sorted=True
43+
col_id_overrides=col_id_overrides, ordered=True
4444
)
4545

4646

bigframes/core/compile/compiled.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -257,9 +257,9 @@ def to_sql(
257257
self,
258258
offset_column: typing.Optional[str] = None,
259259
col_id_overrides: typing.Mapping[str, str] = {},
260-
sorted: bool = False,
260+
ordered: bool = False,
261261
) -> str:
262-
if offset_column or sorted:
262+
if offset_column or ordered:
263263
raise ValueError("Cannot produce sorted sql in unordered mode")
264264
sql = ibis_bigquery.Backend().compile(
265265
self._to_ibis_expr(
@@ -890,9 +890,9 @@ def _reproject_to_table(self) -> OrderedIR:
890890
def to_sql(
891891
self,
892892
col_id_overrides: typing.Mapping[str, str] = {},
893-
sorted: bool = False,
893+
ordered: bool = False,
894894
) -> str:
895-
if sorted:
895+
if ordered:
896896
# Need to bake ordering expressions into the selected column in order for our ordering clause builder to work.
897897
baked_ir = self._bake_ordering()
898898
sql = ibis_bigquery.Backend().compile(

bigframes/core/indexes/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,9 @@ def to_pandas(self) -> pandas.Index:
481481
pandas.Index:
482482
A pandas Index with all of the labels from this Index.
483483
"""
484-
return self._block.index.to_pandas()
484+
return self._block.index.to_pandas(
485+
ordered=self._block.session._strictly_ordered
486+
)
485487

486488
def to_numpy(self, dtype=None, **kwargs) -> np.ndarray:
487489
return self.to_pandas().to_numpy(dtype, **kwargs)

bigframes/dataframe.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1189,7 +1189,7 @@ def to_pandas(
11891189
sampling_method: Optional[str] = None,
11901190
random_state: Optional[int] = None,
11911191
*,
1192-
ordered: bool = True,
1192+
ordered: Optional[bool] = None,
11931193
) -> pandas.DataFrame:
11941194
"""Write DataFrame to pandas DataFrame.
11951195
@@ -1209,9 +1209,10 @@ def to_pandas(
12091209
The seed for the uniform downsampling algorithm. If provided, the uniform method may
12101210
take longer to execute and require more computation. If set to a value other than
12111211
None, this will supersede the global config.
1212-
ordered (bool, default True):
1212+
ordered (bool, default None):
12131213
Determines whether the resulting pandas dataframe will be deterministically ordered.
1214-
In some cases, unordered may result in a faster-executing query.
1214+
In some cases, unordered may result in a faster-executing query. If set to a value
1215+
other than None, will override Session default.
12151216
12161217
Returns:
12171218
pandas.DataFrame: A pandas DataFrame with all rows and columns of this DataFrame if the
@@ -1224,7 +1225,7 @@ def to_pandas(
12241225
max_download_size=max_download_size,
12251226
sampling_method=sampling_method,
12261227
random_state=random_state,
1227-
ordered=ordered,
1228+
ordered=ordered if ordered is not None else self._session._strictly_ordered,
12281229
)
12291230
self._set_internal_query_job(query_job)
12301231
return df.set_axis(self._block.column_labels, axis=1, copy=False)
@@ -3339,7 +3340,7 @@ def _run_io_query(
33393340
_, query_job = session._execute(
33403341
export_array,
33413342
job_config=job_config,
3342-
sorted=False,
3343+
ordered=False,
33433344
col_id_overrides=id_overrides,
33443345
)
33453346
self._set_internal_query_job(query_job)

bigframes/series.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -323,7 +323,7 @@ def to_pandas(
323323
sampling_method: Optional[str] = None,
324324
random_state: Optional[int] = None,
325325
*,
326-
ordered: bool = True,
326+
ordered: Optional[bool] = None,
327327
) -> pandas.Series:
328328
"""Writes Series to pandas Series.
329329
@@ -343,9 +343,10 @@ def to_pandas(
343343
The seed for the uniform downsampling algorithm. If provided, the uniform method may
344344
take longer to execute and require more computation. If set to a value other than
345345
None, this will supersede the global config.
346-
ordered (bool, default True):
346+
ordered (bool, default None):
347347
Determines whether the resulting pandas series will be deterministically ordered.
348-
In some cases, unordered may result in a faster-executing query.
348+
In some cases, unordered may result in a faster-executing query. If set to a value
349+
other than None, will override Session default.
349350
350351
351352
Returns:
@@ -357,7 +358,7 @@ def to_pandas(
357358
max_download_size=max_download_size,
358359
sampling_method=sampling_method,
359360
random_state=random_state,
360-
ordered=ordered,
361+
ordered=ordered if ordered is not None else self._session._strictly_ordered,
361362
)
362363
self._set_internal_query_job(query_job)
363364
series = df.squeeze(axis=1)

bigframes/session/__init__.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -297,7 +297,13 @@ def __init__(
297297
self._execution_count = 0
298298
# Whether this session treats objects as totally ordered.
299299
# Will expose as feature later, only False for internal testing
300-
self._strictly_ordered = True
300+
self._strictly_ordered: bool = context._strictly_ordered
301+
# Sequential index needs total ordering to generate, so use null index with unstrict ordering.
302+
self._default_index_type: bigframes.enums.DefaultIndexKind = (
303+
bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
304+
if context._strictly_ordered
305+
else bigframes.enums.DefaultIndexKind.NULL
306+
)
301307

302308
@property
303309
def bqclient(self):
@@ -882,11 +888,11 @@ def _read_gbq_table(
882888
# Create Default Sequential Index if still have no index
883889
# ----------------------------------------------------
884890

885-
# If no index columns provided or found, fall back to sequential index
891+
# If no index columns provided or found, fall back to session default
886892
if (index_col != bigframes.enums.DefaultIndexKind.NULL) and len(
887893
index_cols
888894
) == 0:
889-
index_col = bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64
895+
index_col = self._default_index_type
890896

891897
index_names: Sequence[Hashable] = index_cols
892898
if index_col == bigframes.enums.DefaultIndexKind.SEQUENTIAL_INT64:
@@ -1935,14 +1941,14 @@ def _execute(
19351941
array_value: core.ArrayValue,
19361942
job_config: Optional[bigquery.job.QueryJobConfig] = None,
19371943
*,
1938-
sorted: bool = True,
1944+
ordered: bool = True,
19391945
dry_run=False,
19401946
col_id_overrides: Mapping[str, str] = {},
19411947
) -> tuple[bigquery.table.RowIterator, bigquery.QueryJob]:
19421948
if not dry_run:
19431949
self._add_execution(1)
19441950
sql = self._to_sql(
1945-
array_value, sorted=sorted, col_id_overrides=col_id_overrides
1951+
array_value, ordered=ordered, col_id_overrides=col_id_overrides
19461952
) # type:ignore
19471953
if job_config is None:
19481954
job_config = bigquery.QueryJobConfig(dry_run=dry_run)
@@ -1977,12 +1983,12 @@ def _to_sql(
19771983
array_value: core.ArrayValue,
19781984
offset_column: typing.Optional[str] = None,
19791985
col_id_overrides: typing.Mapping[str, str] = {},
1980-
sorted: bool = False,
1986+
ordered: bool = False,
19811987
) -> str:
19821988
if offset_column:
19831989
array_value = array_value.promote_offsets(offset_column)
19841990
node_w_cached = self._with_cached_executions(array_value.node)
1985-
if sorted:
1991+
if ordered:
19861992
return bigframes.core.compile.compile_ordered(
19871993
node_w_cached, col_id_overrides=col_id_overrides
19881994
)

tests/system/conftest.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,11 +141,8 @@ def session() -> Generator[bigframes.Session, None, None]:
141141

142142
@pytest.fixture(scope="session")
143143
def unordered_session() -> Generator[bigframes.Session, None, None]:
144-
context = bigframes.BigQueryOptions(
145-
location="US",
146-
)
144+
context = bigframes.BigQueryOptions(location="US", _strictly_ordered=False)
147145
session = bigframes.Session(context=context)
148-
session._strictly_ordered = False
149146
yield session
150147
session.close() # close generated session at cleanup type
151148

tests/system/small/test_unordered.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,10 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414
import pandas as pd
15+
import pyarrow as pa
1516

1617
import bigframes.pandas as bpd
17-
from tests.system.utils import assert_pandas_df_equal
18+
from tests.system.utils import assert_pandas_df_equal, skip_legacy_pandas
1819

1920

2021
def test_unordered_mode_cache_aggregate(unordered_session):
@@ -26,3 +27,35 @@ def test_unordered_mode_cache_aggregate(unordered_session):
2627
pd_result = pd_df - pd_df.mean()
2728

2829
assert_pandas_df_equal(bf_result, pd_result, ignore_order=True)
30+
31+
32+
@skip_legacy_pandas
33+
def test_unordered_mode_read_gbq(unordered_session):
34+
df = unordered_session.read_gbq(
35+
"""SELECT
36+
[1, 3, 2] AS array_column,
37+
STRUCT(
38+
"a" AS string_field,
39+
1.2 AS float_field) AS struct_column"""
40+
)
41+
expected = pd.DataFrame(
42+
{
43+
"array_column": pd.Series(
44+
[[1, 3, 2]],
45+
dtype=(pd.ArrowDtype(pa.list_(pa.int64()))),
46+
),
47+
"struct_column": pd.Series(
48+
[{"string_field": "a", "float_field": 1.2}],
49+
dtype=pd.ArrowDtype(
50+
pa.struct(
51+
[
52+
("string_field", pa.string()),
53+
("float_field", pa.float64()),
54+
]
55+
)
56+
),
57+
),
58+
}
59+
)
60+
# Don't need ignore_order as there is only 1 row
61+
assert_pandas_df_equal(df.to_pandas(), expected)

0 commit comments

Comments
 (0)