Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.routine import RoutineType
from google.cloud.bigquery.routine import RemoteFunctionOptions
from google.cloud.bigquery.routine import ExternalRuntimeOptions
from google.cloud.bigquery.schema import PolicyTagList
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.schema import FieldElementType
Expand Down Expand Up @@ -181,6 +182,7 @@
"RoutineArgument",
"RoutineReference",
"RemoteFunctionOptions",
"ExternalRuntimeOptions",
# Shared helpers
"SchemaField",
"FieldElementType",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/routine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from google.cloud.bigquery.routine.routine import RoutineReference
from google.cloud.bigquery.routine.routine import RoutineType
from google.cloud.bigquery.routine.routine import RemoteFunctionOptions
from google.cloud.bigquery.routine.routine import ExternalRuntimeOptions


__all__ = (
Expand All @@ -30,4 +31,5 @@
"RoutineReference",
"RoutineType",
"RemoteFunctionOptions",
"ExternalRuntimeOptions",
)
180 changes: 180 additions & 0 deletions google/cloud/bigquery/routine/routine.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ class Routine(object):
"determinism_level": "determinismLevel",
"remote_function_options": "remoteFunctionOptions",
"data_governance_type": "dataGovernanceType",
"external_runtime_options": "externalRuntimeOptions",
}

def __init__(self, routine_ref, **kwargs) -> None:
Expand Down Expand Up @@ -349,6 +350,37 @@ def data_governance_type(self, value):
)
self._properties[self._PROPERTY_TO_API_FIELD["data_governance_type"]] = value

@property
def external_runtime_options(self):
"""Optional[google.cloud.bigquery.routine.ExternalRuntimeOptions]:
Configures the external runtime options for a routine.

Raises:
ValueError:
If the value is not
:class:`~google.cloud.bigquery.routine.ExternalRuntimeOptions` or
:data:`None`.
"""
prop = self._properties.get(
self._PROPERTY_TO_API_FIELD["external_runtime_options"]
)
if prop is not None:
return ExternalRuntimeOptions.from_api_repr(prop)

@external_runtime_options.setter
def external_runtime_options(self, value):
api_repr = value
if isinstance(value, ExternalRuntimeOptions):
api_repr = value.to_api_repr()
elif value is not None:
raise ValueError(
"value must be google.cloud.bigquery.routine.ExternalRuntimeOptions "
"or None"
)
self._properties[
self._PROPERTY_TO_API_FIELD["external_runtime_options"]
] = api_repr

@classmethod
def from_api_repr(cls, resource: dict) -> "Routine":
"""Factory: construct a routine given its API representation.
Expand Down Expand Up @@ -736,3 +768,151 @@ def __repr__(self):
for property_name in sorted(self._PROPERTY_TO_API_FIELD)
]
return "RemoteFunctionOptions({})".format(", ".join(all_properties))


class ExternalRuntimeOptions(object):
"""Options for the runtime of the external system.
Args:
container_memory (str):
Optional. Amount of memory provisioned for a Python UDF container
instance. Format: {number}{unit} where unit is one of "M", "G", "Mi"
and "Gi" (e.g. 1G, 512Mi). If not specified, the default value is
512Mi. For more information, see `Configure container limits for
Python UDFs <https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits>`_
container_cpu (int):
Optional. Amount of CPU provisioned for a Python UDF container
instance. For more information, see `Configure container limits
for Python UDFs <https://cloud.google.com/bigquery/docs/user-defined-functions-python#configure-container-limits>`_
runtime_connection (str):
Optional. Fully qualified name of the connection whose service account
will be used to execute the code in the container. Format:
"projects/{projectId}/locations/{locationId}/connections/{connectionId}"
max_batching_rows (int):
Optional. Maximum number of rows in each batch sent to the external
runtime. If absent or if 0, BigQuery dynamically decides the number of
rows in a batch.
runtime_version (str):
Optional. Language runtime version. Example: python-3.11.
"""

_PROPERTY_TO_API_FIELD = {
"container_memory": "containerMemory",
"container_cpu": "containerCpu",
"runtime_connection": "runtimeConnection",
"max_batching_rows": "maxBatchingRows",
"runtime_version": "runtimeVersion",
}

def __init__(
self,
container_memory: Optional[str] = None,
container_cpu: Optional[int] = None,
runtime_connection: Optional[str] = None,
max_batching_rows: Optional[int] = None,
runtime_version: Optional[str] = None,
_properties: Optional[Dict] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this is necessary?

) -> None:
if _properties is None:
_properties = {}
self._properties = _properties

if container_memory is not None:
self.container_memory = container_memory
if container_cpu is not None:
self.container_cpu = container_cpu
if runtime_connection is not None:
self.runtime_connection = runtime_connection
if max_batching_rows is not None:
self.max_batching_rows = max_batching_rows
if runtime_version is not None:
self.runtime_version = runtime_version

@property
def container_memory(self) -> Optional[str]:
"""Optional. Amount of memory provisioned for a Python UDF container instance."""
return self._properties.get("containerMemory")

@container_memory.setter
def container_memory(self, value: Optional[str]):
if value is not None and not isinstance(value, str):
raise ValueError("container_memory must be a string or None.")
self._properties["containerMemory"] = value

@property
def container_cpu(self) -> Optional[int]:
"""Optional. Amount of CPU provisioned for a Python UDF container instance."""
return self._properties.get("containerCpu")

@container_cpu.setter
def container_cpu(self, value: Optional[int]):
if value is not None and not isinstance(value, int):
raise ValueError("container_cpu must be an integer or None.")
self._properties["containerCpu"] = value

@property
def runtime_connection(self) -> Optional[str]:
"""Optional. Fully qualified name of the connection."""
return self._properties.get("runtimeConnection")

@runtime_connection.setter
def runtime_connection(self, value: Optional[str]):
if value is not None and not isinstance(value, str):
raise ValueError("runtime_connection must be a string or None.")
self._properties["runtimeConnection"] = value

@property
def max_batching_rows(self) -> Optional[int]:
"""Optional. Maximum number of rows in each batch sent to the external runtime."""
return _helpers._int_or_none(self._properties.get("maxBatchingRows"))

@max_batching_rows.setter
def max_batching_rows(self, value: Optional[int]):
if value is not None and not isinstance(value, int):
raise ValueError("max_batching_rows must be an integer or None.")
self._properties["maxBatchingRows"] = _helpers._str_or_none(value)

@property
def runtime_version(self) -> Optional[str]:
"""Optional. Language runtime version."""
return self._properties.get("runtimeVersion")

@runtime_version.setter
def runtime_version(self, value: Optional[str]):
if value is not None and not isinstance(value, str):
raise ValueError("runtime_version must be a string or None.")
self._properties["runtimeVersion"] = value

@classmethod
def from_api_repr(cls, resource: dict) -> "ExternalRuntimeOptions":
"""Factory: construct external runtime options given its API representation.
Args:
resource (Dict[str, object]): Resource, as returned from the API.
Returns:
google.cloud.bigquery.routine.ExternalRuntimeOptions:
Python object, as parsed from ``resource``.
"""
ref = cls()
ref._properties = resource
return ref

def to_api_repr(self) -> dict:
"""Construct the API resource representation of this ExternalRuntimeOptions.
Returns:
Dict[str, object]: External runtime options represented as an API resource.
"""
return self._properties

def __eq__(self, other):
if not isinstance(other, ExternalRuntimeOptions):
return NotImplemented
return self._properties == other._properties

def __ne__(self, other):
return not self == other

def __repr__(self):
all_properties = [
"{}={}".format(property_name, repr(getattr(self, property_name)))
for property_name in sorted(self._PROPERTY_TO_API_FIELD)
]
return "ExternalRuntimeOptions({})".format(", ".join(all_properties))
157 changes: 157 additions & 0 deletions tests/unit/routine/test_external_runtime_options.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# -*- coding: utf-8 -*-
#
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest


@pytest.fixture
def target_class():
from google.cloud.bigquery.routine.routine import ExternalRuntimeOptions

return ExternalRuntimeOptions


@pytest.fixture
def object_under_test(target_class):
return target_class()


def test_ctor(target_class):
container_memory = "1G"
container_cpu = 1
runtime_connection = "projects/my-project/locations/us-central1/connections/my-connection"
max_batching_rows = 100
runtime_version = "python-3.11"

instance = target_class(
container_memory=container_memory,
container_cpu=container_cpu,
runtime_connection=runtime_connection,
max_batching_rows=max_batching_rows,
runtime_version=runtime_version,
)

assert instance.container_memory == container_memory
assert instance.container_cpu == container_cpu
assert instance.runtime_connection == runtime_connection
assert instance.max_batching_rows == max_batching_rows
assert instance.runtime_version == runtime_version


def test_container_memory(object_under_test):
container_memory = "512Mi"
object_under_test.container_memory = container_memory
assert object_under_test.container_memory == container_memory


def test_container_cpu(object_under_test):
container_cpu = 1
object_under_test.container_cpu = container_cpu
assert object_under_test.container_cpu == container_cpu


def test_runtime_connection(object_under_test):
runtime_connection = "projects/my-project/locations/us-central1/connections/my-connection"
object_under_test.runtime_connection = runtime_connection
assert object_under_test.runtime_connection == runtime_connection


def test_max_batching_rows(object_under_test):
max_batching_rows = 100
object_under_test.max_batching_rows = max_batching_rows
assert object_under_test.max_batching_rows == max_batching_rows


def test_runtime_version(object_under_test):
runtime_version = "python-3.11"
object_under_test.runtime_version = runtime_version
assert object_under_test.runtime_version == runtime_version


def test_from_api_repr(target_class):
resource = {
"containerMemory": "1G",
"containerCpu": 1,
"runtimeConnection": "projects/my-project/locations/us-central1/connections/my-connection",
"maxBatchingRows": "100",
"runtimeVersion": "python-3.11",
}
instance = target_class.from_api_repr(resource)

assert instance.container_memory == "1G"
assert instance.container_cpu == 1
assert (
instance.runtime_connection
== "projects/my-project/locations/us-central1/connections/my-connection"
)
assert instance.max_batching_rows == 100
assert instance.runtime_version == "python-3.11"


def test_to_api_repr(target_class):
instance = target_class(
container_memory="1G",
container_cpu=1,
runtime_connection="projects/my-project/locations/us-central1/connections/my-connection",
max_batching_rows=100,
runtime_version="python-3.11",
)
resource = instance.to_api_repr()

assert resource == {
"containerMemory": "1G",
"containerCpu": 1,
"runtimeConnection": "projects/my-project/locations/us-central1/connections/my-connection",
"maxBatchingRows": "100",
"runtimeVersion": "python-3.11",
}


def test_repr(target_class):
instance = target_class(
container_memory="1G",
container_cpu=1,
)
expected_repr = (
"ExternalRuntimeOptions(container_cpu=1, container_memory='1G', "
"max_batching_rows=None, runtime_connection=None, runtime_version=None)"
)
assert repr(instance) == expected_repr


def test_invalid_container_memory(object_under_test):
with pytest.raises(ValueError, match="container_memory must be a string or None."):
object_under_test.container_memory = 123


def test_invalid_container_cpu(object_under_test):
with pytest.raises(ValueError, match="container_cpu must be an integer or None."):
object_under_test.container_cpu = "1"


def test_invalid_runtime_connection(object_under_test):
with pytest.raises(ValueError, match="runtime_connection must be a string or None."):
object_under_test.runtime_connection = 123


def test_invalid_max_batching_rows(object_under_test):
with pytest.raises(ValueError, match="max_batching_rows must be an integer or None."):
object_under_test.max_batching_rows = "100"


def test_invalid_runtime_version(object_under_test):
with pytest.raises(ValueError, match="runtime_version must be a string or None."):
object_under_test.runtime_version = 123
Loading
Loading