Skip to content

Commit 83c2ef5

Browse files
committed
Merge branch 'pangea-v1alpha' into feat-b358215039-add-attrs-to-schemafield
2 parents 808a925 + 74beca6 commit 83c2ef5

File tree

5 files changed

+215
-64
lines changed

5 files changed

+215
-64
lines changed

google/cloud/bigquery/schema.py

Lines changed: 16 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
from google.cloud.bigquery import standard_sql
2525
from google.cloud.bigquery._helpers import (
2626
_isinstance_or_raise,
27-
_from_api_repr,
2827
_get_sub_prop,
2928
)
3029
from google.cloud.bigquery.enums import StandardSqlTypeNames, RoundingMode
@@ -548,6 +547,7 @@ def _to_schema_fields(schema):
548547
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
549548
instance or a compatible mapping representation of the field.
550549
"""
550+
551551
for field in schema:
552552
if not isinstance(field, (SchemaField, collections.abc.Mapping)):
553553
raise ValueError(
@@ -645,61 +645,6 @@ def to_api_repr(self) -> dict:
645645
return answer
646646

647647

648-
class TableSchema:
649-
"""Schema of a table
650-
651-
Args:
652-
fields (Optional[list]): Describes the fields in a table.
653-
foreignTypeInfo (Optional[str]): Specifies metadata of the foreign data type
654-
definition in field schema.
655-
"""
656-
657-
def __init__(
658-
self, fields: Optional[list] = None, foreign_type_info: Optional[str] = None
659-
):
660-
self._properties: Dict[str, Any] = {}
661-
self.fields = fields
662-
self.foreign_type_info = foreign_type_info
663-
664-
@property
665-
def fields(self) -> Any:
666-
"""Describes the fields in a table."""
667-
668-
return self._properties.get("fields")
669-
670-
@fields.setter
671-
def fields(self, value: list, dtype: str) -> None:
672-
value = _isinstance_or_raise(value, list, none_allowed=True)
673-
self._properties["fields"] = value
674-
675-
@property
676-
def foreign_type_info(self) -> Any:
677-
"""Optional. Specifies metadata of the foreign data type definition in
678-
field schema (TableFieldSchema.foreign_type_definition)."""
679-
680-
return self._properties.get("foreignTypeInfo")
681-
682-
@foreign_type_info.setter
683-
def foreign_type_info(self, value: str, dtype: str) -> None:
684-
if not isinstance(value, str):
685-
raise ValueError(
686-
f"Pass {value} as a '{repr(dtype)}'." f"Got {type(value)}."
687-
)
688-
self._properties["foreignTypeInfo"] = value
689-
690-
def to_api_repr(self) -> dict:
691-
"""Build an API representation of this object.
692-
693-
Returns:
694-
Dict[str, Any]:
695-
A dictionary in the format used by the BigQuery API.
696-
"""
697-
return copy.deepcopy(self._properties)
698-
699-
def from_api_repr(self, resource):
700-
return _from_api_repr(self, resource)
701-
702-
703648
class ForeignTypeInfo:
704649
"""Metadata about the foreign data type definition such as the system in which the
705650
type is defined.
@@ -734,8 +679,21 @@ def to_api_repr(self) -> dict:
734679
"""
735680
return copy.deepcopy(self._properties)
736681

737-
def from_api_repr(self, resource):
738-
return _from_api_repr(self, resource)
682+
@classmethod
683+
def from_api_repr(cls, resource):
684+
"""Factory: constructs an instance of the class (cls)
685+
given its API representation.
686+
687+
Args:
688+
resource (Dict[str, Any]):
689+
API representation of the object to be instantiated.
690+
691+
Returns:
692+
An instance of the class initialized with data from 'resource'.
693+
"""
694+
config = cls()
695+
config._properties = copy.deepcopy(resource)
696+
return config
739697

740698

741699
class StorageDescriptor:

google/cloud/bigquery/table.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1837,6 +1837,7 @@ def to_arrow_iterable(
18371837
self,
18381838
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
18391839
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
1840+
max_stream_count: Optional[int] = None,
18401841
) -> Iterator["pyarrow.RecordBatch"]:
18411842
"""[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream.
18421843
@@ -1861,6 +1862,22 @@ def to_arrow_iterable(
18611862
created by the server. If ``max_queue_size`` is :data:`None`, the queue
18621863
size is infinite.
18631864
1865+
max_stream_count (Optional[int]):
1866+
The maximum number of parallel download streams when
1867+
using BigQuery Storage API. Ignored if
1868+
BigQuery Storage API is not used.
1869+
1870+
This setting also has no effect if the query result
1871+
is deterministically ordered with ORDER BY,
1872+
in which case, the number of download stream is always 1.
1873+
1874+
If set to 0 or None (the default), the number of download
1875+
streams is determined by BigQuery the server. However, this behaviour
1876+
can require a lot of memory to store temporary download result,
1877+
especially with very large queries. In that case,
1878+
setting this parameter value to a value > 0 can help
1879+
reduce system resource consumption.
1880+
18641881
Returns:
18651882
pyarrow.RecordBatch:
18661883
A generator of :class:`~pyarrow.RecordBatch`.
@@ -1877,6 +1894,7 @@ def to_arrow_iterable(
18771894
preserve_order=self._preserve_order,
18781895
selected_fields=self._selected_fields,
18791896
max_queue_size=max_queue_size,
1897+
max_stream_count=max_stream_count,
18801898
)
18811899
tabledata_list_download = functools.partial(
18821900
_pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema
@@ -2003,6 +2021,7 @@ def to_dataframe_iterable(
20032021
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
20042022
dtypes: Optional[Dict[str, Any]] = None,
20052023
max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore
2024+
max_stream_count: Optional[int] = None,
20062025
) -> "pandas.DataFrame":
20072026
"""Create an iterable of pandas DataFrames, to process the table as a stream.
20082027
@@ -2033,6 +2052,22 @@ def to_dataframe_iterable(
20332052
20342053
.. versionadded:: 2.14.0
20352054
2055+
max_stream_count (Optional[int]):
2056+
The maximum number of parallel download streams when
2057+
using BigQuery Storage API. Ignored if
2058+
BigQuery Storage API is not used.
2059+
2060+
This setting also has no effect if the query result
2061+
is deterministically ordered with ORDER BY,
2062+
in which case, the number of download stream is always 1.
2063+
2064+
If set to 0 or None (the default), the number of download
2065+
streams is determined by BigQuery the server. However, this behaviour
2066+
can require a lot of memory to store temporary download result,
2067+
especially with very large queries. In that case,
2068+
setting this parameter value to a value > 0 can help
2069+
reduce system resource consumption.
2070+
20362071
Returns:
20372072
pandas.DataFrame:
20382073
A generator of :class:`~pandas.DataFrame`.
@@ -2059,6 +2094,7 @@ def to_dataframe_iterable(
20592094
preserve_order=self._preserve_order,
20602095
selected_fields=self._selected_fields,
20612096
max_queue_size=max_queue_size,
2097+
max_stream_count=max_stream_count,
20622098
)
20632099
tabledata_list_download = functools.partial(
20642100
_pandas_helpers.download_dataframe_row_iterator,
@@ -2715,6 +2751,7 @@ def to_dataframe_iterable(
27152751
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27162752
dtypes: Optional[Dict[str, Any]] = None,
27172753
max_queue_size: Optional[int] = None,
2754+
max_stream_count: Optional[int] = None,
27182755
) -> Iterator["pandas.DataFrame"]:
27192756
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27202757
@@ -2730,6 +2767,9 @@ def to_dataframe_iterable(
27302767
max_queue_size:
27312768
Ignored. Added for compatibility with RowIterator.
27322769
2770+
max_stream_count:
2771+
Ignored. Added for compatibility with RowIterator.
2772+
27332773
Returns:
27342774
An iterator yielding a single empty :class:`~pandas.DataFrame`.
27352775
@@ -2744,6 +2784,7 @@ def to_arrow_iterable(
27442784
self,
27452785
bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None,
27462786
max_queue_size: Optional[int] = None,
2787+
max_stream_count: Optional[int] = None,
27472788
) -> Iterator["pyarrow.RecordBatch"]:
27482789
"""Create an iterable of pandas DataFrames, to process the table as a stream.
27492790
@@ -2756,6 +2797,9 @@ def to_arrow_iterable(
27562797
max_queue_size:
27572798
Ignored. Added for compatibility with RowIterator.
27582799
2800+
max_stream_count:
2801+
Ignored. Added for compatibility with RowIterator.
2802+
27592803
Returns:
27602804
An iterator yielding a single empty :class:`~pyarrow.RecordBatch`.
27612805
"""

samples/geography/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ google-crc32c==1.6.0; python_version >= '3.9'
2424
google-resumable-media==2.7.2
2525
googleapis-common-protos==1.66.0
2626
grpcio===1.62.2; python_version == '3.7'
27-
grpcio==1.67.1; python_version >= '3.8'
27+
grpcio==1.68.0; python_version >= '3.8'
2828
idna==3.10
2929
munch==4.0.0
3030
mypy-extensions==1.0.0

tests/unit/test_schema.py

Lines changed: 85 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
from google.cloud.bigquery.standard_sql import StandardSqlStructType
1818
from google.cloud.bigquery.schema import (
1919
PolicyTagList,
20-
# ForeignTypeInfo,
20+
ForeignTypeInfo,
2121
StorageDescriptor,
2222
SerDeInfo,
2323
)
@@ -1158,8 +1158,6 @@ class TestForeignTypeInfo:
11581158

11591159
@staticmethod
11601160
def _get_target_class():
1161-
from google.cloud.bigquery.schema import ForeignTypeInfo
1162-
11631161
return ForeignTypeInfo
11641162

11651163
def _make_one(self, *args, **kw):
@@ -1197,6 +1195,39 @@ def test_to_api_repr(self, type_system, expected):
11971195
result = self._make_one(type_system=type_system)
11981196
assert result.to_api_repr() == expected
11991197

1198+
def test_from_api_repr(self):
1199+
"""GIVEN an api representation of a ForeignTypeInfo object (i.e. resource)
1200+
WHEN converted into a ForeignTypeInfo object using from_api_repr() and
1201+
displayed as a dict
1202+
THEN it will have the same representation a ForeignTypeInfo object created
1203+
directly (via _make_one()) and displayed as a dict.
1204+
"""
1205+
resource = {"typeSystem": "TYPE_SYSTEM_UNSPECIFIED"}
1206+
1207+
expected = self._make_one(type_system="TYPE_SYSTEM_UNSPECIFIED")
1208+
1209+
klass = self._get_target_class()
1210+
result = klass.from_api_repr(resource)
1211+
1212+
assert result.to_api_repr() == expected.to_api_repr()
1213+
1214+
1215+
@pytest.fixture
1216+
def _make_storage_descriptor():
1217+
serdeinfo = SerDeInfo(
1218+
serialization_library="testpath.to.LazySimpleSerDe",
1219+
name="serde_lib_name",
1220+
parameters={"key": "value"},
1221+
)
1222+
1223+
obj = StorageDescriptor(
1224+
input_format="testpath.to.OrcInputFormat",
1225+
location_uri="gs://test/path/",
1226+
output_format="testpath.to.OrcOutputFormat",
1227+
serde_info=serdeinfo,
1228+
)
1229+
return obj
1230+
12001231

12011232
class TestStorageDescriptor:
12021233
"""Tests for the StorageDescriptor class."""
@@ -1288,7 +1319,34 @@ def test_to_api_repr(self):
12881319

12891320
assert storage_descriptor.to_api_repr() == expected_repr
12901321

1291-
# TODO: needs a from_api_repr() test.
1322+
SERDEINFO = SerDeInfo(
1323+
serialization_library="testpath.to.LazySimpleSerDe",
1324+
name="serde_lib_name",
1325+
parameters={"key": "value"},
1326+
)
1327+
1328+
API_REPR = {
1329+
"inputFormat": "testpath.to.OrcInputFormat",
1330+
"locationUri": "gs://test/path/",
1331+
"outputFormat": "testpath.to.OrcOutputFormat",
1332+
"serDeInfo": SERDEINFO.to_api_repr(),
1333+
}
1334+
1335+
def test_from_api_repr(self, _make_storage_descriptor):
1336+
"""GIVEN an api representation of a StorageDescriptor (i.e. API_REPR)
1337+
WHEN converted into a StorageDescriptor using from_api_repr() and
1338+
displayed as a dict
1339+
THEN it will have the same representation a StorageDescriptor created
1340+
directly (via the fixture) and displayed as a dict.
1341+
"""
1342+
# generate via fixture
1343+
expected = _make_storage_descriptor
1344+
resource = self.API_REPR
1345+
klass = self._get_target_class()
1346+
# generate via API_REPR
1347+
result = klass.from_api_repr(resource)
1348+
1349+
assert result.to_api_repr() == expected.to_api_repr()
12921350

12931351

12941352
class TestSerDeInfo:
@@ -1352,4 +1410,26 @@ def test_to_api_repr(self):
13521410
}
13531411
assert serde_info.to_api_repr() == expected_repr
13541412

1355-
# TODO: needs a from_api_repr() test.
1413+
def test_from_api_repr(self, _make_storage_descriptor):
1414+
"""GIVEN an api representation of a SerDeInfo object (i.e. resource)
1415+
WHEN converted into a SerDeInfo using from_api_repr() and
1416+
displayed as a dict
1417+
THEN it will have the same representation a SerDeInfo object created
1418+
directly (via _make_one()) and displayed as a dict.
1419+
"""
1420+
resource = {
1421+
"serializationLibrary": "testpath.to.LazySimpleSerDe",
1422+
"name": "serde_name",
1423+
"parameters": {"key": "value"},
1424+
}
1425+
1426+
expected = self._make_one(
1427+
serialization_library="testpath.to.LazySimpleSerDe",
1428+
name="serde_name",
1429+
parameters={"key": "value"},
1430+
)
1431+
1432+
klass = self._get_target_class()
1433+
result = klass.from_api_repr(resource)
1434+
1435+
assert result.to_api_repr() == expected.to_api_repr()

0 commit comments

Comments
 (0)