Skip to content

Commit 4a1a1e0

Browse files
refactor: refactor block materialization (#306)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://togithub.com/googleapis/python-bigquery-dataframes/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 8922e5e commit 4a1a1e0

File tree

6 files changed

+113
-93
lines changed

6 files changed

+113
-93
lines changed

bigframes/_config/sampling_options.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
"""Options for downsampling."""
1616

17+
from __future__ import annotations
18+
1719
import dataclasses
1820
from typing import Literal, Optional
1921

@@ -25,6 +27,28 @@ class SamplingOptions:
2527
__doc__ = vendored_pandas_config.sampling_options_doc
2628

2729
max_download_size: Optional[int] = 500
30+
# Enable downsampling
2831
enable_downsampling: bool = False
2932
sampling_method: Literal["head", "uniform"] = "uniform"
3033
random_state: Optional[int] = None
34+
35+
def with_max_download_size(self, max_rows: Optional[int]) -> SamplingOptions:
36+
return SamplingOptions(
37+
max_rows, self.enable_downsampling, self.sampling_method, self.random_state
38+
)
39+
40+
def with_method(self, method: Literal["head", "uniform"]) -> SamplingOptions:
41+
return SamplingOptions(self.max_download_size, True, method, self.random_state)
42+
43+
def with_random_state(self, state: Optional[int]) -> SamplingOptions:
44+
return SamplingOptions(
45+
self.max_download_size,
46+
self.enable_downsampling,
47+
self.sampling_method,
48+
state,
49+
)
50+
51+
def with_disabled(self) -> SamplingOptions:
52+
return SamplingOptions(
53+
self.max_download_size, False, self.sampling_method, self.random_state
54+
)

bigframes/core/__init__.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,7 @@
2121
import ibis.expr.types as ibis_types
2222
import pandas
2323

24-
import bigframes.core.compile.compiled as compiled
25-
import bigframes.core.compile.compiler as compiler
24+
import bigframes.core.compile as compiling
2625
import bigframes.core.expression as expressions
2726
import bigframes.core.guid
2827
import bigframes.core.nodes as nodes
@@ -104,11 +103,11 @@ def _try_evaluate_local(self):
104103
def get_column_type(self, key: str) -> bigframes.dtypes.Dtype:
105104
return self._compile_ordered().get_column_type(key)
106105

107-
def _compile_ordered(self) -> compiled.OrderedIR:
108-
return compiler.compile_ordered(self.node)
106+
def _compile_ordered(self) -> compiling.OrderedIR:
107+
return compiling.compile_ordered(self.node)
109108

110-
def _compile_unordered(self) -> compiled.UnorderedIR:
111-
return compiler.compile_unordered(self.node)
109+
def _compile_unordered(self) -> compiling.UnorderedIR:
110+
return compiling.compile_unordered(self.node)
112111

113112
def row_count(self) -> ArrayValue:
114113
"""Get number of rows in ArrayValue as a single-entry ArrayValue."""

bigframes/core/blocks.py

Lines changed: 79 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
from __future__ import annotations
2323

24+
import dataclasses
2425
import functools
2526
import itertools
2627
import random
@@ -31,6 +32,7 @@
3132
import google.cloud.bigquery as bigquery
3233
import pandas as pd
3334

35+
import bigframes._config.sampling_options as sampling_options
3436
import bigframes.constants as constants
3537
import bigframes.core as core
3638
import bigframes.core.guid as guid
@@ -80,6 +82,14 @@ def _get_block(self) -> Block:
8082
"""Get the underlying block value of the object"""
8183

8284

85+
@dataclasses.dataclass()
86+
class MaterializationOptions:
87+
downsampling: sampling_options.SamplingOptions = dataclasses.field(
88+
default_factory=sampling_options.SamplingOptions
89+
)
90+
ordered: bool = True
91+
92+
8393
class Block:
8494
"""A immutable 2D data structure."""
8595

@@ -395,23 +405,31 @@ def _to_dataframe(self, result) -> pd.DataFrame:
395405

396406
def to_pandas(
397407
self,
398-
value_keys: Optional[Iterable[str]] = None,
399-
max_results: Optional[int] = None,
400408
max_download_size: Optional[int] = None,
401409
sampling_method: Optional[str] = None,
402410
random_state: Optional[int] = None,
403411
*,
404412
ordered: bool = True,
405413
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
406414
"""Run query and download results as a pandas DataFrame."""
415+
if (sampling_method is not None) and (sampling_method not in _SAMPLING_METHODS):
416+
raise NotImplementedError(
417+
f"The downsampling method {sampling_method} is not implemented, "
418+
f"please choose from {','.join(_SAMPLING_METHODS)}."
419+
)
407420

408-
df, _, query_job = self._compute_and_count(
409-
value_keys=value_keys,
410-
max_results=max_results,
411-
max_download_size=max_download_size,
412-
sampling_method=sampling_method,
413-
random_state=random_state,
414-
ordered=ordered,
421+
sampling = bigframes.options.sampling.with_max_download_size(max_download_size)
422+
if sampling_method is not None:
423+
sampling = sampling.with_method(sampling_method).with_random_state( # type: ignore
424+
random_state
425+
)
426+
else:
427+
sampling = sampling.with_disabled()
428+
429+
df, query_job = self._materialize_local(
430+
materialize_options=MaterializationOptions(
431+
downsampling=sampling, ordered=ordered
432+
)
415433
)
416434
return df, query_job
417435

@@ -439,57 +457,29 @@ def _copy_index_to_pandas(self, df: pd.DataFrame):
439457
# See: https://github.com/pandas-dev/pandas-stubs/issues/804
440458
df.index.names = self.index.names # type: ignore
441459

442-
def _compute_and_count(
443-
self,
444-
value_keys: Optional[Iterable[str]] = None,
445-
max_results: Optional[int] = None,
446-
max_download_size: Optional[int] = None,
447-
sampling_method: Optional[str] = None,
448-
random_state: Optional[int] = None,
449-
*,
450-
ordered: bool = True,
451-
) -> Tuple[pd.DataFrame, int, bigquery.QueryJob]:
460+
def _materialize_local(
461+
self, materialize_options: MaterializationOptions = MaterializationOptions()
462+
) -> Tuple[pd.DataFrame, bigquery.QueryJob]:
452463
"""Run query and download results as a pandas DataFrame. Return the total number of results as well."""
453464
# TODO(swast): Allow for dry run and timeout.
454-
enable_downsampling = (
455-
True
456-
if sampling_method is not None
457-
else bigframes.options.sampling.enable_downsampling
458-
)
459-
460-
max_download_size = (
461-
max_download_size or bigframes.options.sampling.max_download_size
462-
)
463-
464-
random_state = random_state or bigframes.options.sampling.random_state
465-
466-
if sampling_method is None:
467-
sampling_method = bigframes.options.sampling.sampling_method or _UNIFORM
468-
sampling_method = sampling_method.lower()
469-
470-
if sampling_method not in _SAMPLING_METHODS:
471-
raise NotImplementedError(
472-
f"The downsampling method {sampling_method} is not implemented, "
473-
f"please choose from {','.join(_SAMPLING_METHODS)}."
474-
)
475-
476-
expr = self._apply_value_keys_to_expr(value_keys=value_keys)
477-
478465
results_iterator, query_job = self.session._execute(
479-
expr, max_results=max_results, sorted=ordered
466+
self.expr, sorted=materialize_options.ordered
480467
)
481-
482468
table_size = (
483469
self.session._get_table_size(query_job.destination) / _BYTES_TO_MEGABYTES
484470
)
471+
sample_config = materialize_options.downsampling
472+
max_download_size = sample_config.max_download_size
485473
fraction = (
486474
max_download_size / table_size
487475
if (max_download_size is not None) and (table_size != 0)
488476
else 2
489477
)
490478

479+
# TODO: Maybe materialize before downsampling
480+
# Some downsampling methods
491481
if fraction < 1:
492-
if not enable_downsampling:
482+
if not sample_config.enable_downsampling:
493483
raise RuntimeError(
494484
f"The data size ({table_size:.2f} MB) exceeds the maximum download limit of "
495485
f"{max_download_size} MB. You can:\n\t* Enable downsampling in global options:\n"
@@ -507,42 +497,53 @@ def _compute_and_count(
507497
"\nPlease refer to the documentation for configuring the downloading limit.",
508498
UserWarning,
509499
)
510-
if sampling_method == _HEAD:
511-
total_rows = int(results_iterator.total_rows * fraction)
512-
results_iterator.max_results = total_rows
513-
df = self._to_dataframe(results_iterator)
514-
515-
if self.index_columns:
516-
df.set_index(list(self.index_columns), inplace=True)
517-
df.index.names = self.index.names # type: ignore
518-
elif (sampling_method == _UNIFORM) and (random_state is None):
519-
filtered_expr = self.expr._uniform_sampling(fraction)
520-
block = Block(
521-
filtered_expr,
522-
index_columns=self.index_columns,
523-
column_labels=self.column_labels,
524-
index_labels=self.index.names,
525-
)
526-
df, total_rows, _ = block._compute_and_count(max_download_size=None)
527-
elif sampling_method == _UNIFORM:
528-
block = self._split(
529-
fracs=(max_download_size / table_size,),
530-
random_state=random_state,
531-
preserve_order=True,
532-
)[0]
533-
df, total_rows, _ = block._compute_and_count(max_download_size=None)
534-
else:
535-
# This part should never be called, just in case.
536-
raise NotImplementedError(
537-
f"The downsampling method {sampling_method} is not implemented, "
538-
f"please choose from {','.join(_SAMPLING_METHODS)}."
539-
)
500+
total_rows = results_iterator.total_rows
501+
# Remove downsampling config from subsequent invocations, as otherwise could result in many
502+
# iterations if downsampling undershoots
503+
return self._downsample(
504+
total_rows=total_rows,
505+
sampling_method=sample_config.sampling_method,
506+
fraction=fraction,
507+
random_state=sample_config.random_state,
508+
)._materialize_local(
509+
MaterializationOptions(ordered=materialize_options.ordered)
510+
)
540511
else:
541512
total_rows = results_iterator.total_rows
542513
df = self._to_dataframe(results_iterator)
543514
self._copy_index_to_pandas(df)
544515

545-
return df, total_rows, query_job
516+
return df, query_job
517+
518+
def _downsample(
519+
self, total_rows: int, sampling_method: str, fraction: float, random_state
520+
) -> Block:
521+
# either selecting fraction or number of rows
522+
if sampling_method == _HEAD:
523+
filtered_block = self.slice(stop=int(total_rows * fraction))
524+
return filtered_block
525+
elif (sampling_method == _UNIFORM) and (random_state is None):
526+
filtered_expr = self.expr._uniform_sampling(fraction)
527+
block = Block(
528+
filtered_expr,
529+
index_columns=self.index_columns,
530+
column_labels=self.column_labels,
531+
index_labels=self.index.names,
532+
)
533+
return block
534+
elif sampling_method == _UNIFORM:
535+
block = self._split(
536+
fracs=(fraction,),
537+
random_state=random_state,
538+
preserve_order=True,
539+
)[0]
540+
return block
541+
else:
542+
# This part should never be called, just in case.
543+
raise NotImplementedError(
544+
f"The downsampling method {sampling_method} is not implemented, "
545+
f"please choose from {','.join(_SAMPLING_METHODS)}."
546+
)
546547

547548
def _split(
548549
self,
@@ -1209,10 +1210,9 @@ def retrieve_repr_request_results(
12091210
count = self.shape[0]
12101211
if count > max_results:
12111212
head_block = self.slice(0, max_results)
1212-
computed_df, query_job = head_block.to_pandas(max_results=max_results)
12131213
else:
12141214
head_block = self
1215-
computed_df, query_job = head_block.to_pandas()
1215+
computed_df, query_job = head_block.to_pandas()
12161216
formatted_df = computed_df.set_axis(self.column_labels, axis=1)
12171217
# we reset the axis and substitute the bf index name for the default
12181218
formatted_df.index.name = self.index.name

bigframes/core/compile/row_identity.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
import bigframes.constants as constants
2626
import bigframes.core.compile.compiled as compiled
27-
import bigframes.core.joins.name_resolution as naming
27+
import bigframes.core.joins as joining
2828
import bigframes.core.ordering as orderings
2929

3030
SUPPORTED_ROW_IDENTITY_HOW = {"outer", "left", "inner"}
@@ -68,7 +68,7 @@ def join_by_row_identity_unordered(
6868
right_mask = right_relative_predicates if how in ["left", "outer"] else None
6969

7070
# Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result
71-
map_left_id, map_right_id = naming.JOIN_NAME_REMAPPER(
71+
map_left_id, map_right_id = joining.JOIN_NAME_REMAPPER(
7272
left.column_ids, right.column_ids
7373
)
7474
joined_columns = [
@@ -125,10 +125,10 @@ def join_by_row_identity_ordered(
125125
right_mask = right_relative_predicates if how in ["left", "outer"] else None
126126

127127
# Public mapping must use JOIN_NAME_REMAPPER to stay in sync with consumers of join result
128-
lpublicmapping, rpublicmapping = naming.JOIN_NAME_REMAPPER(
128+
lpublicmapping, rpublicmapping = joining.JOIN_NAME_REMAPPER(
129129
left.column_ids, right.column_ids
130130
)
131-
lhiddenmapping, rhiddenmapping = naming.JoinNameRemapper(namespace="hidden")(
131+
lhiddenmapping, rhiddenmapping = joining.JoinNameRemapper(namespace="hidden")(
132132
left._hidden_column_ids, right._hidden_column_ids
133133
)
134134
map_left_id = {**lpublicmapping, **lhiddenmapping}

bigframes/series.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -305,14 +305,13 @@ def to_pandas(
305305
is not exceeded; otherwise, a pandas Series with downsampled rows of the DataFrame.
306306
"""
307307
df, query_job = self._block.to_pandas(
308-
(self._value_column,),
309308
max_download_size=max_download_size,
310309
sampling_method=sampling_method,
311310
random_state=random_state,
312311
ordered=ordered,
313312
)
314313
self._set_internal_query_job(query_job)
315-
series = df[self._value_column]
314+
series = df.squeeze(axis=1)
316315
series.name = self._name
317316
return series
318317

bigframes/session/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1497,7 +1497,6 @@ def _execute(
14971497
self,
14981498
array_value: core.ArrayValue,
14991499
job_config: Optional[bigquery.job.QueryJobConfig] = None,
1500-
max_results: Optional[int] = None,
15011500
*,
15021501
sorted: bool = True,
15031502
dry_run=False,
@@ -1507,7 +1506,6 @@ def _execute(
15071506
return self._start_query(
15081507
sql=sql,
15091508
job_config=job_config,
1510-
max_results=max_results,
15111509
)
15121510

15131511
def _to_sql(

0 commit comments

Comments
 (0)