Skip to content

Commit 74beca6

Browse files
authored
Merge branch 'main' into pangea-v1alpha
2 parents 8162a2c + d461297 commit 74beca6

File tree

3 files changed

+114
-1
lines changed

3 files changed

+114
-1
lines changed

google/cloud/bigquery/table.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,6 +1837,7 @@ def to_arrow_iterable(
18371837
self,
18381838
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
18391839
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1840+
max_stream_count: Optional[int] = None,
18401841
) -> Iterator["pyarrow.RecordBatch"]:
18411842
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.
18421843
@@ -1861,6 +1862,22 @@ def to_arrow_iterable(
18611862
created by the server. If ``max_queue_size`` is :data:`None`, the queue
18621863
size is infinite.
18631864
1865+
max_stream_count (Optional[int]):
1866+
The maximum number of parallel download streams when
1867+
using BigQuery Storage API. Ignored if
1868+
BigQuery Storage API is not used.
1869+
1870+
This setting also has no effect if the query result
1871+
is deterministically ordered with ORDER BY,
1872+
in which case, the number of download stream is always 1.
1873+
1874+
If set to 0 or None (the default), the number of download
1875+
streams is determined by BigQuery the server. However, this behaviour
1876+
can require a lot of memory to store temporary download result,
1877+
especially with very large queries. In that case,
1878+
setting this parameter value to a value > 0 can help
1879+
reduce system resource consumption.
1880+
18641881
Returns:
18651882
pyarrow.RecordBatch:
18661883
A generator of :class:`~pyarrow.RecordBatch`.
@@ -1877,6 +1894,7 @@ def to_arrow_iterable(
18771894
preserve_order=self._preserve_order,
18781895
selected_fields=self._selected_fields,
18791896
max_queue_size=max_queue_size,
1897+
max_stream_count=max_stream_count,
18801898
)
18811899
tabledata_list_download = functools.partial(
18821900
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
@@ -2003,6 +2021,7 @@ def to_dataframe_iterable(
20032021
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
20042022
dtypes: Optional[Dict[str, Any]] = None,
20052023
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
2024+
max_stream_count: Optional[int] = None,
20062025
) -> "pandas.DataFrame":
20072026
"""Create an iterable of pandas DataFrames, to process the table as a stream.
20082027
@@ -2033,6 +2052,22 @@ def to_dataframe_iterable(
20332052
20342053
.. versionadded:: 2.14.0
20352054
2055+
max_stream_count (Optional[int]):
2056+
The maximum number of parallel download streams when
2057+
using BigQuery Storage API. Ignored if
2058+
BigQuery Storage API is not used.
2059+
2060+
This setting also has no effect if the query result
2061+
is deterministically ordered with ORDER BY,
2062+
in which case, the number of download stream is always 1.
2063+
2064+
If set to 0 or None (the default), the number of download
2065+
streams is determined by BigQuery the server. However, this behaviour
2066+
can require a lot of memory to store temporary download result,
2067+
especially with very large queries. In that case,
2068+
setting this parameter value to a value > 0 can help
2069+
reduce system resource consumption.
2070+
20362071
Returns:
20372072
pandas.DataFrame:
20382073
A generator of :class:`~pandas.DataFrame`.
@@ -2059,6 +2094,7 @@ def to_dataframe_iterable(
20592094
preserve_order=self._preserve_order,
20602095
selected_fields=self._selected_fields,
20612096
max_queue_size=max_queue_size,
2097+
max_stream_count=max_stream_count,
20622098
)
20632099
tabledata_list_download = functools.partial(
20642100
_pandas_helpers.download_dataframe_row_iterator,
@@ -2715,6 +2751,7 @@ def to_dataframe_iterable(
27152751
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27162752
dtypes: Optional[Dict[str, Any]] = None,
27172753
max_queue_size: Optional[int] = None,
2754+
max_stream_count: Optional[int] = None,
27182755
) -> Iterator["pandas.DataFrame"]:
27192756
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27202757
@@ -2730,6 +2767,9 @@ def to_dataframe_iterable(
27302767
max_queue_size:
27312768
Ignored. Added for compatibility with RowIterator.
27322769
2770+
max_stream_count:
2771+
Ignored. Added for compatibility with RowIterator.
2772+
27332773
Returns:
27342774
An iterator yielding a single empty :class:`~pandas.DataFrame`.
27352775
@@ -2744,6 +2784,7 @@ def to_arrow_iterable(
27442784
self,
27452785
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27462786
max_queue_size: Optional[int] = None,
2787+
max_stream_count: Optional[int] = None,
27472788
) -> Iterator["pyarrow.RecordBatch"]:
27482789
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27492790
@@ -2756,6 +2797,9 @@ def to_arrow_iterable(
27562797
max_queue_size:
27572798
Ignored. Added for compatibility with RowIterator.
27582799
2800+
max_stream_count:
2801+
Ignored. Added for compatibility with RowIterator.
2802+
27592803
Returns:
27602804
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
27612805
"""

samples/geography/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ google-crc32c==1.6.0; python_version >= '3.9'
2424
google-resumable-media==2.7.2
2525
googleapis-common-protos==1.66.0
2626
grpcio===1.62.2; python_version == '3.7'
27-
grpcio==1.67.1; python_version >= '3.8'
27+
grpcio==1.68.0; python_version >= '3.8'
2828
idna==3.10
2929
munch==4.0.0
3030
mypy-extensions==1.0.0

tests/unit/test_table.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5927,3 +5927,72 @@ def test_external_catalog_table_options_setter(
59275927
# Confirm that the api_repr of the ecto_output matches the inputs
59285928
result = table.to_api_repr()
59295929
assert result == expected
5930+
5931+
@pytest.mark.parametrize("preserve_order", [True, False])
5932+
def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order):
5933+
pytest.importorskip("pandas")
5934+
pytest.importorskip("google.cloud.bigquery_storage")
5935+
from google.cloud.bigquery import schema
5936+
from google.cloud.bigquery import table as mut
5937+
from google.cloud import bigquery_storage
5938+
5939+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5940+
session = bigquery_storage.types.ReadSession()
5941+
bqstorage_client.create_read_session.return_value = session
5942+
5943+
row_iterator = mut.RowIterator(
5944+
_mock_client(),
5945+
api_request=None,
5946+
path=None,
5947+
schema=[
5948+
schema.SchemaField("colA", "INTEGER"),
5949+
],
5950+
table=mut.TableReference.from_string("proj.dset.tbl"),
5951+
)
5952+
row_iterator._preserve_order = preserve_order
5953+
5954+
max_stream_count = 132
5955+
result_iterable = row_iterator.to_arrow_iterable(
5956+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5957+
)
5958+
list(result_iterable)
5959+
bqstorage_client.create_read_session.assert_called_once_with(
5960+
parent=mock.ANY,
5961+
read_session=mock.ANY,
5962+
max_stream_count=max_stream_count if not preserve_order else 1,
5963+
)
5964+
5965+
5966+
@pytest.mark.parametrize("preserve_order", [True, False])
5967+
def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order):
5968+
pytest.importorskip("pandas")
5969+
pytest.importorskip("google.cloud.bigquery_storage")
5970+
from google.cloud.bigquery import schema
5971+
from google.cloud.bigquery import table as mut
5972+
from google.cloud import bigquery_storage
5973+
5974+
bqstorage_client = mock.create_autospec(bigquery_storage.BigQueryReadClient)
5975+
session = bigquery_storage.types.ReadSession()
5976+
bqstorage_client.create_read_session.return_value = session
5977+
5978+
row_iterator = mut.RowIterator(
5979+
_mock_client(),
5980+
api_request=None,
5981+
path=None,
5982+
schema=[
5983+
schema.SchemaField("colA", "INTEGER"),
5984+
],
5985+
table=mut.TableReference.from_string("proj.dset.tbl"),
5986+
)
5987+
row_iterator._preserve_order = preserve_order
5988+
5989+
max_stream_count = 132
5990+
result_iterable = row_iterator.to_dataframe_iterable(
5991+
bqstorage_client=bqstorage_client, max_stream_count=max_stream_count
5992+
)
5993+
list(result_iterable)
5994+
bqstorage_client.create_read_session.assert_called_once_with(
5995+
parent=mock.ANY,
5996+
read_session=mock.ANY,
5997+
max_stream_count=max_stream_count if not preserve_order else 1,
5998+
)

0 commit comments

Comments
 (0)