diff --git a/google/cloud/bigquery_v2/__init__.py b/google/cloud/bigquery_v2/__init__.py index 442609adc..30b6a63d6 100644 --- a/google/cloud/bigquery_v2/__init__.py +++ b/google/cloud/bigquery_v2/__init__.py @@ -24,7 +24,7 @@ from .services.routine_service import RoutineServiceClient from .services.row_access_policy_service import RowAccessPolicyServiceClient from .services.table_service import TableServiceClient -from .services.centralized_services import BigQueryClient +from .services.centralized_service import BigQueryClient from .types.biglake_config import BigLakeConfiguration from .types.clustering import Clustering diff --git a/google/cloud/bigquery_v2/services/centralized_services/__init__.py b/google/cloud/bigquery_v2/services/centralized_service/__init__.py similarity index 95% rename from google/cloud/bigquery_v2/services/centralized_services/__init__.py rename to google/cloud/bigquery_v2/services/centralized_service/__init__.py index e82098176..03ddf6619 100644 --- a/google/cloud/bigquery_v2/services/centralized_services/__init__.py +++ b/google/cloud/bigquery_v2/services/centralized_service/__init__.py @@ -15,4 +15,4 @@ # from .client import BigQueryClient -__all__ = ("BigQueryClient",) +__all__ = "BigQueryClient" diff --git a/google/cloud/bigquery_v2/services/centralized_service/_helpers.py b/google/cloud/bigquery_v2/services/centralized_service/_helpers.py new file mode 100644 index 000000000..53b585b18 --- /dev/null +++ b/google/cloud/bigquery_v2/services/centralized_service/_helpers.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + + +def _drop_self_key(kwargs): + "Drops 'self' key from a given kwargs dict." + + if not isinstance(kwargs, dict): + raise TypeError("kwargs must be a dict.") + kwargs.pop("self", None) # Essentially a no-op if 'self' key does not exist + return kwargs diff --git a/google/cloud/bigquery_v2/services/centralized_service/client.py b/google/cloud/bigquery_v2/services/centralized_service/client.py new file mode 100644 index 000000000..3b53fb53b --- /dev/null +++ b/google/cloud/bigquery_v2/services/centralized_service/client.py @@ -0,0 +1,249 @@ +# -*- coding: utf-8 -*- +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +from typing import ( + Optional, + Sequence, + Tuple, + Union, +) + +from google.cloud.bigquery_v2.services.centralized_service import _helpers + +# Import service client modules +from google.cloud.bigquery_v2.services import ( + dataset_service, + job_service, + model_service, +) + +# Import types modules (to access *Requests classes) +from google.cloud.bigquery_v2.types import ( + dataset, + job, + model, +) + +from google.api_core import client_options as client_options_lib +from google.api_core import gapic_v1 +from google.api_core import retry as retries +from google.auth import credentials as auth_credentials + +# Create a type alias +try: + OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] +except AttributeError: # pragma: NO COVER + OptionalRetry = Union[retries.Retry, object, None] # type: ignore + +# TODO: This line is here to simplify prototyping, etc. +PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT") + +DEFAULT_RETRY: OptionalRetry = gapic_v1.method.DEFAULT +DEFAULT_TIMEOUT: Union[float, object] = gapic_v1.method.DEFAULT +DEFAULT_METADATA: Sequence[Tuple[str, Union[str, bytes]]] = () + + +# Create Centralized Client +class BigQueryClient: + def __init__( + self, + *, + credentials: Optional[auth_credentials.Credentials] = None, + client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None, + ): + self._clients = {} + self._credentials = credentials + self._client_options = client_options + + @property + def dataset_service_client(self): + if "dataset" not in self._clients: + from google.cloud.bigquery_v2.services import dataset_service + + self._clients["dataset"] = dataset_service.DatasetServiceClient( + credentials=self._credentials, client_options=self._client_options + ) + return self._clients["dataset"] + + @dataset_service_client.setter + def dataset_service_client(self, value): + # Check for the methods the centralized client exposes (to allow duck-typing) + required_methods = [ + "get_dataset", + "insert_dataset", + "patch_dataset", + "update_dataset", + "delete_dataset", + "list_datasets", + "undelete_dataset", + ] + for method in required_methods: + if not hasattr(value, method) or not callable(getattr(value, method)): + raise AttributeError( + f"Object assigned to dataset_service_client is missing a callable '{method}' method." + ) + self._clients["dataset"] = value + + @property + def job_service_client(self): + if "job" not in self._clients: + from google.cloud.bigquery_v2.services import job_service + + self._clients["job"] = job_service.JobServiceClient( + credentials=self._credentials, client_options=self._client_options + ) + return self._clients["job"] + + @job_service_client.setter + def job_service_client(self, value): + required_methods = [ + "get_job", + "insert_job", + "cancel_job", + "delete_job", + "list_jobs", + ] + for method in required_methods: + if not hasattr(value, method) or not callable(getattr(value, method)): + raise AttributeError( + f"Object assigned to job_service_client is missing a callable '{method}' method." + ) + self._clients["job"] = value + + @property + def model_service_client(self): + if "model" not in self._clients: + from google.cloud.bigquery_v2.services import model_service + + self._clients["model"] = model_service.ModelServiceClient( + credentials=self._credentials, client_options=self._client_options + ) + return self._clients["model"] + + @model_service_client.setter + def model_service_client(self, value): + required_methods = [ + "get_model", + "delete_model", + "patch_model", + "list_models", + ] + for method in required_methods: + if not hasattr(value, method) or not callable(getattr(value, method)): + raise AttributeError( + f"Object assigned to model_service_client is missing a callable '{method}' method." + ) + self._clients["model"] = value + + def get_dataset( + self, + request: Optional[Union[dataset.GetDatasetRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.dataset_service_client.get_dataset(**kwargs) + + def list_datasets( + self, + request: Optional[Union[dataset.ListDatasetsRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.dataset_service_client.list_datasets(**kwargs) + + def list_jobs( + self, + request: Optional[Union[job.ListJobsRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.job_service_client.list_jobs(**kwargs) + + def get_model( + self, + request: Optional[Union[model.GetModelRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.model_service_client.get_model(**kwargs) + + def delete_model( + self, + request: Optional[Union[model.DeleteModelRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + # The underlying GAPIC client returns None on success. + return self.model_service_client.delete_model(**kwargs) + + def patch_model( + self, + request: Optional[Union[model.PatchModelRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.model_service_client.patch_model(**kwargs) + + def list_models( + self, + request: Optional[Union[model.ListModelsRequest, dict]] = None, + *, + retry: OptionalRetry = DEFAULT_RETRY, + timeout: Union[float, object] = DEFAULT_TIMEOUT, + metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, + ): + """ + TODO: Docstring is purposefully blank. microgenerator will add automatically. + """ + kwargs = _helpers._drop_self_key(locals()) + return self.model_service_client.list_models(**kwargs) diff --git a/google/cloud/bigquery_v2/services/centralized_services/client.py b/google/cloud/bigquery_v2/services/centralized_services/client.py deleted file mode 100644 index 63c403dcb..000000000 --- a/google/cloud/bigquery_v2/services/centralized_services/client.py +++ /dev/null @@ -1,160 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2025 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import os -from typing import ( - Optional, - Sequence, - Tuple, - Union, -) - -# Import service clients -from google.cloud.bigquery_v2.services.dataset_service import DatasetServiceClient -from google.cloud.bigquery_v2.services.job_service import JobServiceClient -from google.cloud.bigquery_v2.services.model_service import ModelServiceClient - -# Import Request classes -from google.cloud.bigquery_v2.types import ( - # DatasetService Request classes - GetDatasetRequest, - # JobService Request classes - ListJobsRequest, - # ModelService Request classes - DeleteModelRequest, - GetModelRequest, - PatchModelRequest, - ListModelsRequest, -) - -from google.api_core import gapic_v1 -from google.api_core import retry as retries - -# Create a type alias -try: - OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] -except AttributeError: # pragma: NO COVER - OptionalRetry = Union[retries.Retry, object, None] # type: ignore - -# TODO: revise this universally. -PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT") - -DEFAULT_RETRY: OptionalRetry = gapic_v1.method.DEFAULT -DEFAULT_TIMEOUT: Union[float, object] = gapic_v1.method.DEFAULT -DEFAULT_METADATA: Sequence[Tuple[str, Union[str, bytes]]] = () - - -def _drop_self_key(kwargs): - "Drops 'self' key from a given kwargs dict." - - if not isinstance(kwargs, dict): - raise TypeError("kwargs must be a dict.") - kwargs.pop("self", None) # Essentially a no-op if 'self' key does not exist - return kwargs - - -# Create Centralized Client -class BigQueryClient: - def __init__(self): - # Dataset service related init attributes - self.dataset_service_client = DatasetServiceClient() - self.job_service_client = JobServiceClient() - self.model_service_client = ModelServiceClient() - - def get_dataset( - self, - request: Optional[Union[GetDatasetRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - return self.dataset_service_client.get_dataset(**kwargs) - - def list_jobs( - self, - request: Optional[Union[ListJobsRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - return self.job_service_client.list_jobs(**kwargs) - - def get_model( - self, - request: Optional[Union[GetModelRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - return self.model_service_client.get_model(**kwargs) - - def delete_model( - self, - request: Optional[Union[DeleteModelRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - # The underlying GAPIC client returns None on success. - return self.model_service_client.delete_model(**kwargs) - - def patch_model( - self, - request: Optional[Union[PatchModelRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - return self.model_service_client.patch_model(**kwargs) - - def list_models( - self, - request: Optional[Union[ListModelsRequest, dict]] = None, - *, - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, - ): - """ - TODO: Add docstring. - """ - kwargs = _drop_self_key(locals()) - return self.model_service_client.list_models(**kwargs) diff --git a/google/cloud/bigquery_v2/services/job_service/pagers.py b/google/cloud/bigquery_v2/services/job_service/pagers.py index 83f723917..b976c1d75 100644 --- a/google/cloud/bigquery_v2/services/job_service/pagers.py +++ b/google/cloud/bigquery_v2/services/job_service/pagers.py @@ -67,7 +67,6 @@ def __init__( retry: OptionalRetry = gapic_v1.method.DEFAULT, timeout: Union[float, object] = gapic_v1.method.DEFAULT, metadata: Sequence[Tuple[str, Union[str, bytes]]] = (), - ): """Instantiate the pager. diff --git a/tests/unit/gapic/bigquery_v2/test_centralized_services.py b/tests/unit/gapic/bigquery_v2/test_centralized_services.py deleted file mode 100644 index e3883da5f..000000000 --- a/tests/unit/gapic/bigquery_v2/test_centralized_services.py +++ /dev/null @@ -1,259 +0,0 @@ -import pytest -from typing import ( - Optional, - Sequence, - Tuple, - Union, -) -from unittest import mock - -from google.api_core import gapic_v1 -from google.api_core import retry as retries -from google.cloud.bigquery_v2 import BigQueryClient -from google.cloud.bigquery_v2.types import Dataset, Job, Model -from google.cloud.bigquery_v2 import DatasetServiceClient -from google.cloud.bigquery_v2 import JobServiceClient -from google.cloud.bigquery_v2 import ModelServiceClient - -from google.cloud.bigquery_v2 import GetDatasetRequest - -from google.cloud.bigquery_v2 import ListJobsRequest - -from google.cloud.bigquery_v2 import DeleteModelRequest -from google.cloud.bigquery_v2 import GetModelRequest -from google.cloud.bigquery_v2 import PatchModelRequest -from google.cloud.bigquery_v2 import ListModelsRequest - - -try: - OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None] -except AttributeError: # pragma: NO COVER - OptionalRetry = Union[retries.Retry, object, None] # type: ignore - - -# --- CONSTANTS --- -PROJECT_ID = "test-project" -DATASET_ID = "test_dataset" -JOB_ID = "test_job" -MODEL_ID = "test_model" -DEFAULT_ETAG = "test_etag" - -DEFAULT_RETRY: OptionalRetry = gapic_v1.method.DEFAULT -DEFAULT_TIMEOUT: Union[float, object] = gapic_v1.method.DEFAULT -DEFAULT_METADATA: Sequence[Tuple[str, Union[str, bytes]]] = () - -# --- HELPERS --- - - -def assert_client_called_once_with( - mock_method: mock.Mock, - request: Union[ - GetDatasetRequest, - GetModelRequest, - DeleteModelRequest, - PatchModelRequest, - ListJobsRequest, - ListModelsRequest, - ], # TODO this needs to be simplified. - retry: OptionalRetry = DEFAULT_RETRY, - timeout: Union[float, object] = DEFAULT_TIMEOUT, - metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA, -): - """Helper to assert a client method was called with default args.""" - mock_method.assert_called_once_with( - request=request, - retry=retry, - timeout=timeout, - metadata=metadata, - ) - - -# --- FIXTURES --- - - -@pytest.fixture -def mock_dataset_service_client(): - """Mocks the DatasetServiceClient.""" - with mock.patch( - "google.cloud.bigquery_v2.services.centralized_services.client.DatasetServiceClient", - autospec=True, - ) as mock_client: - yield mock_client - - -@pytest.fixture -def mock_job_service_client(): - """Mocks the JobServiceClient.""" - with mock.patch( - "google.cloud.bigquery_v2.services.centralized_services.client.JobServiceClient", - autospec=True, - ) as mock_client: - yield mock_client - - -@pytest.fixture -def mock_model_service_client(): - """Mocks the ModelServiceClient.""" - with mock.patch( - "google.cloud.bigquery_v2.services.centralized_services.client.ModelServiceClient", - autospec=True, - ) as mock_client: - yield mock_client - - -# TODO: figure out a solution for this... is there an easier way to feed in clients? -# TODO: is there an easier way to make mock_x_service_clients? -@pytest.fixture -def bq_client( - mock_dataset_service_client, mock_job_service_client, mock_model_service_client -): - """Provides a BigQueryClient with mocked underlying services.""" - client = BigQueryClient() - client.dataset_service_client = mock_dataset_service_client - client.job_service_client = mock_job_service_client - client.model_service_client = mock_model_service_client - ... - return client - - -# --- TEST CLASSES --- - - -class TestCentralizedClientDatasetService: - def test_get_dataset(self, bq_client, mock_dataset_service_client): - # Arrange - expected_dataset = Dataset( - kind="bigquery#dataset", id=f"{PROJECT_ID}:{DATASET_ID}" - ) - mock_dataset_service_client.get_dataset.return_value = expected_dataset - get_dataset_request = GetDatasetRequest( - project_id=PROJECT_ID, dataset_id=DATASET_ID - ) - - # Act - dataset = bq_client.get_dataset(request=get_dataset_request) - - # Assert - assert dataset == expected_dataset - assert_client_called_once_with( - mock_dataset_service_client.get_dataset, get_dataset_request - ) - - -class TestCentralizedClientJobService: - def test_list_jobs(self, bq_client, mock_job_service_client): - # Arrange - expected_jobs = [Job(kind="bigquery#job", id=f"{PROJECT_ID}:{JOB_ID}")] - mock_job_service_client.list_jobs.return_value = expected_jobs - list_jobs_request = ListJobsRequest(project_id=PROJECT_ID) - - # Act - jobs = bq_client.list_jobs(request=list_jobs_request) - - # Assert - assert jobs == expected_jobs - assert_client_called_once_with( - mock_job_service_client.list_jobs, list_jobs_request - ) - - -class TestCentralizedClientModelService: - def test_get_model(self, bq_client, mock_model_service_client): - # Arrange - expected_model = Model( - etag=DEFAULT_ETAG, - model_reference={ - "project_id": PROJECT_ID, - "dataset_id": DATASET_ID, - "model_id": MODEL_ID, - }, - ) - mock_model_service_client.get_model.return_value = expected_model - get_model_request = GetModelRequest( - project_id=PROJECT_ID, dataset_id=DATASET_ID, model_id=MODEL_ID - ) - - # Act - model = bq_client.get_model(request=get_model_request) - - # Assert - assert model == expected_model - assert_client_called_once_with( - mock_model_service_client.get_model, get_model_request - ) - - def test_delete_model(self, bq_client, mock_model_service_client): - # Arrange - # The underlying service call returns nothing on success. - mock_model_service_client.delete_model.return_value = None - delete_model_request = DeleteModelRequest( - project_id=PROJECT_ID, dataset_id=DATASET_ID, model_id=MODEL_ID - ) - - # Act - # The wrapper method should also return nothing. - result = bq_client.delete_model(request=delete_model_request) - - # Assert - # 1. Assert the return value is None. This fails if the method doesn't exist. - assert result is None - # 2. Assert the underlying service was called correctly. - assert_client_called_once_with( - mock_model_service_client.delete_model, - delete_model_request, - ) - - def test_patch_model(self, bq_client, mock_model_service_client): - # Arrange - expected_model = Model( - etag="new_etag", - model_reference={ - "project_id": PROJECT_ID, - "dataset_id": DATASET_ID, - "model_id": MODEL_ID, - }, - description="A newly patched description.", - ) - mock_model_service_client.patch_model.return_value = expected_model - - model_patch = Model(description="A newly patched description.") - patch_model_request = PatchModelRequest( - project_id=PROJECT_ID, - dataset_id=DATASET_ID, - model_id=MODEL_ID, - model=model_patch, - ) - - # Act - patched_model = bq_client.patch_model(request=patch_model_request) - - # Assert - assert patched_model == expected_model - assert_client_called_once_with( - mock_model_service_client.patch_model, patch_model_request - ) - - def test_list_models(self, bq_client, mock_model_service_client): - # Arrange - expected_models = [ - Model( - etag=DEFAULT_ETAG, - model_reference={ - "project_id": PROJECT_ID, - "dataset_id": DATASET_ID, - "model_id": MODEL_ID, - }, - ) - ] - mock_model_service_client.list_models.return_value = expected_models - list_models_request = ListModelsRequest( - project_id=PROJECT_ID, dataset_id=DATASET_ID - ) - # Act - models = bq_client.list_models(request=list_models_request) - - # Assert - assert models == expected_models - assert_client_called_once_with( - mock_model_service_client.list_models, list_models_request - )