Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,87 @@
# limitations under the License.
#

from typing import Any, Callable, Optional, Type, TypeVar


T = TypeVar("T")


def _drop_self_key(kwargs):
"Drops 'self' key from a given kwargs dict."
"""Drops 'self' key from a given kwargs dict."""

if not isinstance(kwargs, dict):
raise TypeError("kwargs must be a dict.")
kwargs.pop("self", None) # Essentially a no-op if 'self' key does not exist
return kwargs


def _make_request(
request_class: Type[T],
user_request: Optional[T],
identifier_value: Optional[Any],
identifier_name: str,
parser: Callable[..., Any],
identifier_required: bool = True,
) -> T:
"""A factory for creating *Request objects.
This function simplifies the creation of request objects by extracting identifier
values from strings (e.g., 'project_id.dataset_id') and using them to instantiate
the appropriate request object. It allows users to continue using identifier
strings with BigQueryClient methods, even though the underlying *ServiceClient
methods do not directly support this convenience.
For example, this helper is used in methods like:
- BigQueryClient.get_dataset()
- BigQueryClient.delete_dataset()
Args:
request_class: The class of the request object to create (e.g.,
GetDatasetRequest, ListModelsRequest).
user_request: A user-provided *Request object. If not None, this
object is returned directly.
identifier_value: The value to be parsed to create the request object
(e.g., a dataset_id for GetDatasetRequest).
identifier_name: The name of the identifier field (e.g., 'dataset_id',
'job_id').
parser: A callable that takes the identifier_value and returns a dict
of fields for the request object. For example, a parser could
separate a 'project_id.dataset_id' string into its components.
identifier_required: Whether the identifier is required. Defaults to True.
Returns:
A *Request object.
Raises:
ValueError: If both user_request and identifier_value are provided, or
if identifier_required is True and both are None.
Example:
request = _make_request(
request_class=dataset.GetDatasetRequest,
user_request=request,
identifier_value=dataset_id,
identifier_name="dataset_id",
parser=self._parse_dataset_id_to_dict,
identifier_required=True,
)
"""
if user_request is not None and identifier_value is not None:
raise ValueError(
f"Provide either a request object or '{identifier_name}', not both."
)
Comment on lines +83 to +86
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there are cases where we merge these two in the existing hand-written client. For example, load jobs can take a string as a destination but merge in the job config object to the final request:

def load_table_from_file(
self,
file_obj: IO[bytes],
destination: Union[Table, TableReference, TableListItem, str],
rewind: bool = False,
size: Optional[int] = None,
num_retries: int = _DEFAULT_NUM_RETRIES,
job_id: Optional[str] = None,
job_id_prefix: Optional[str] = None,
location: Optional[str] = None,
project: Optional[str] = None,
job_config: Optional[LoadJobConfig] = None,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:

I actually haven't thought too much about how the non-query jobs fit into this design, though. I suppose the user needs to specify more than just an identifier for all of the job types, so this method wouldn't apply?

Note: In addition to query jobs, load jobs using jobs.insert REST API will need a bit of handwritten magic to support load from local data via "resumable media uploads" (https://cloud.google.com/bigquery/docs/reference/api-uploads). I imagine we're planning on providing a separate hand-written helper for this, similar to queries? Actually do we know if the GAPICs even support the resumable upload API? AFAIK, it's only used in BigQuery, Cloud Storage, and Google Drive APIs. CC @parthea

Copy link
Collaborator Author

@chalmerlowe chalmerlowe Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How to handle the Query experience is being designed by someone else and is not fully fleshed out for the python libraries.

I would like to defer this as out of scope for the alpha release.


if user_request is not None:
return user_request

if identifier_required and identifier_value is None:
raise ValueError(
f"Either a request object or '{identifier_name}' must be provided."
)

if identifier_value is None:
request_fields = parser()
else:
request_fields = parser(identifier_value)
return request_class(**request_fields)
175 changes: 119 additions & 56 deletions google/cloud/bigquery_v2/services/centralized_service/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import os
from typing import (
Dict,
Optional,
Sequence,
Tuple,
Expand All @@ -34,6 +35,7 @@
# Import types modules (to access *Requests classes)
from google.cloud.bigquery_v2.types import (
dataset,
dataset_reference,
job,
model,
)
Expand All @@ -43,139 +45,200 @@
from google.api_core import retry as retries
from google.auth import credentials as auth_credentials

# Create a type alias
# Create type aliases
try:
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
except AttributeError: # pragma: NO COVER
OptionalRetry = Union[retries.Retry, object, None] # type: ignore

# TODO: This line is here to simplify prototyping, etc.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
DatasetIdentifier = Union[str, dataset_reference.DatasetReference]

# TODO: This variable is here to simplify prototyping, etc.
PROJECT_ID = os.environ.get("GOOGLE_CLOUD_PROJECT")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That upgrade is handled in this separate PR.

DEFAULT_RETRY: OptionalRetry = gapic_v1.method.DEFAULT
DEFAULT_TIMEOUT: Union[float, object] = gapic_v1.method.DEFAULT
DEFAULT_METADATA: Sequence[Tuple[str, Union[str, bytes]]] = ()


# Create Centralized Client
class BigQueryClient:
"""A centralized client for BigQuery API."""

def __init__(
self,
*,
credentials: Optional[auth_credentials.Credentials] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the past, project is a pretty common input to the client constructor. This is especially important in environments like https://colab.research.google.com that don't have a project associated with the environment.

Likewise, client_info is another important one. That's how applications can amend the user-agent, which is very important for BQ to track usage attributable to open source "connectors" as well as partner company usage. I suppose we could guide such users to use the raw clients then, but if so maybe client_options falls in the same bucket of "advanced" features and should be excluded?

Speaking of commonly used arguments: Fun fact, the Kaggle team was (is?) using _http to provide a custom proxy to BigQuery for their special Kaggle free tier that included BQML, unlike the BQ Sandbox. Might be worth reaching out to them to come up with an alternative if they're still doing that. See: https://www.kaggle.com/discussions/general/50159 Note that 5 TB is 5x more than the BQ Sandbox 1 TB. That post was from 8 years ago, though, so I don't know if that still applies.

Copy link
Collaborator Author

@chalmerlowe chalmerlowe Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for these insights into what customers commonly pass into the client constructor.
I will see what I can to do include project into the client constructor.
I may reserve client_info til the beta or release candidates due to schedule.

Copy link
Collaborator Author

@chalmerlowe chalmerlowe Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For those who are reading, this may provide some good context:

Right now, if you ran the GAPIC generated code and wanted to get_dataset(), this is what that would look like:

# Create a client
from google.cloud.bigquery_v2 import bigquery_v2
client = bigquery_v2.DatasetServiceClient() # default DatasetServiceClient

# Initialize request argument(s)
request = bigquery_v2.GetDatasetRequest(
    project_id="project_id_value",
    dataset_id="dataset_id_value",
)

# Make the request
#     NOTE: due to the protobuf definition, there is no way to
#     provide a "project_id_value.dataset_id_value" string to this method.
response = client.get_dataset(request=request)

As part of this alpha, we are trying to enable one basic transmogrification: allow a user to continue to be able to supply a "project_id_value.dataset_id_value" string to the method (if this proves useful and universally generatable, other convenience transformers will follow).

This is done by injecting several helper functions that can invisibly accept the string and create a *Request object for the user.

# Create a client
from google.cloud.bigquery_v2 import bigquery_v2
bqclient = bigquery_v2.BigQueryClient() # my new and improved hotness

# Initialize the string value 
dataset_id = "project_id_value.dataset_id_value"

# Make the request
#     Inside the centralized_client version of get_dataset, it accepts
#     a "project_id_value.dataset_id_value" string and the helper functions
#     break it apart and create a bigquery_v2.GetDatasetRequest that is passed
#     to the dataset_service_client's version of get_dataset.
#     This happens invisibly to the user.

response = bqclient.get_dataset(dataset_id)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Including project is handled in this separate PR.

For the moment, I have not made any attempts to also process client_info.

client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
):
self._clients = {}
"""
Initializes the BigQueryClient.
Args:
credentials:
The credentials to use for authentication. If not provided, the
client will attempt to use the default credentials.
client_options:
A dictionary of client options to pass to the underlying
service clients.
Comment on lines +79 to +81
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type above also allows for a single client_options.

Copy link
Collaborator Author

@chalmerlowe chalmerlowe Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the alpha, I may leave out the client_options. It will depend on schedule. If we keep it, I will ensure that this nuance is captured in the code.

"""

self._clients: Dict[str, object] = {}
self._credentials = credentials
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll want to call credentials, default_project_id = google.auth.default() (https://googleapis.dev/python/google-auth/latest/reference/google.auth.html#google.auth.default) if the credentials aren't set so that we don't repeat the auth flow for each sub-client construction.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That upgrade is handled in this separate PR.

self._client_options = client_options
self.project = PROJECT_ID
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to upgrade this to a @property so that it can be included in public documentation (and also be suggested to be read-only).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That upgrade is handled in this separate PR.


# --- HELPER METHODS ---
def _parse_dataset_path(self, dataset_path: str) -> Tuple[Optional[str], str]:
"""
Helper to parse project_id and/or dataset_id from a string identifier.
Args:
dataset_path: A string in the format 'project_id.dataset_id' or
'dataset_id'.
Returns:
A tuple of (project_id, dataset_id).
"""
if "." in dataset_path:
# Use rsplit to handle legacy paths like `google.com:my-project.my_dataset`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my education, is google.com:my-project.my_dataset supposed to yield ['google.com:my-project', 'my_dataset'] or ['my-project', 'my_dataset']?

project_id, dataset_id = dataset_path.rsplit(".", 1)
return project_id, dataset_id
return self.project, dataset_path

def _parse_dataset_id_to_dict(self, dataset_id: DatasetIdentifier) -> dict:
"""
Helper to create a dictionary from a project_id and dataset_id to pass
internally between helper functions.
Args:
dataset_id: A string or DatasetReference.
Returns:
A dict of {"project_id": project_id, "dataset_id": dataset_id_str }.
"""
if isinstance(dataset_id, str):
project_id, dataset_id_str = self._parse_dataset_path(dataset_id)
return {"project_id": project_id, "dataset_id": dataset_id_str}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious to see snake_case here. IIRC the REST API mostly uses camel-case, but per https://protobuf.dev/programming-guides/json/ both should be acceptable.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not information that is sent directly to the API.
It's a dictionary shared between helper functions.
As noted above, we send a *Request object per the DatasetServiceClient.get_dataset() method signature. The helper functions accept a string and invisibly create a *Request object for the user.

I will include a note to this effect in the docstring of the helper funcs.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

elif isinstance(dataset_id, dataset_reference.DatasetReference):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not intended to be sent to the API. It is internal use only.

return {
"project_id": dataset_id.project_id,
"dataset_id": dataset_id.dataset_id,
}
else:
raise TypeError(f"Invalid type for dataset_id: {type(dataset_id)}")

def _parse_project_id_to_dict(self, project_id: Optional[str] = None) -> dict:
"""Helper to create a request dictionary from a project_id."""
final_project_id = project_id or self.project
return {"project_id": final_project_id}

# --- *SERVICECLIENT ATTRIBUTES ---
@property
def dataset_service_client(self):
if "dataset" not in self._clients:
from google.cloud.bigquery_v2.services import dataset_service

self._clients["dataset"] = dataset_service.DatasetServiceClient(
credentials=self._credentials, client_options=self._client_options
)
return self._clients["dataset"]

@dataset_service_client.setter
def dataset_service_client(self, value):
# Check for the methods the centralized client exposes (to allow duck-typing)
required_methods = [
"get_dataset",
"insert_dataset",
"patch_dataset",
"update_dataset",
"delete_dataset",
"list_datasets",
"undelete_dataset",
]
for method in required_methods:
if not hasattr(value, method) or not callable(getattr(value, method)):
raise AttributeError(
f"Object assigned to dataset_service_client is missing a callable '{method}' method."
)
if not isinstance(value, dataset_service.DatasetServiceClient):
raise TypeError(
"Expected an instance of dataset_service.DatasetServiceClient."
)
self._clients["dataset"] = value

@property
def job_service_client(self):
if "job" not in self._clients:
from google.cloud.bigquery_v2.services import job_service

self._clients["job"] = job_service.JobServiceClient(
credentials=self._credentials, client_options=self._client_options
)
return self._clients["job"]

@job_service_client.setter
def job_service_client(self, value):
required_methods = [
"get_job",
"insert_job",
"cancel_job",
"delete_job",
"list_jobs",
]
for method in required_methods:
if not hasattr(value, method) or not callable(getattr(value, method)):
raise AttributeError(
f"Object assigned to job_service_client is missing a callable '{method}' method."
)
if not isinstance(value, job_service.JobServiceClient):
raise TypeError("Expected an instance of job_service.JobServiceClient.")
self._clients["job"] = value

@property
def model_service_client(self):
if "model" not in self._clients:
from google.cloud.bigquery_v2.services import model_service

self._clients["model"] = model_service.ModelServiceClient(
credentials=self._credentials, client_options=self._client_options
)
return self._clients["model"]

@model_service_client.setter
def model_service_client(self, value):
required_methods = [
"get_model",
"delete_model",
"patch_model",
"list_models",
]
for method in required_methods:
if not hasattr(value, method) or not callable(getattr(value, method)):
raise AttributeError(
f"Object assigned to model_service_client is missing a callable '{method}' method."
)
if not isinstance(value, model_service.ModelServiceClient):
raise TypeError("Expected an instance of model_service.ModelServiceClient.")
self._clients["model"] = value

# --- *SERVICECLIENT METHODS ---
def get_dataset(
self,
request: Optional[Union[dataset.GetDatasetRequest, dict]] = None,
dataset_id: Optional[DatasetIdentifier] = None,
*,
request: Optional["dataset.GetDatasetRequest"] = None,
retry: OptionalRetry = DEFAULT_RETRY,
timeout: Union[float, object] = DEFAULT_TIMEOUT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA,
):
) -> "dataset.Dataset":
"""
TODO: Docstring is purposefully blank. microgenerator will add automatically.
"""
kwargs = _helpers._drop_self_key(locals())
return self.dataset_service_client.get_dataset(**kwargs)
final_request = _helpers._make_request(
request_class=dataset.GetDatasetRequest,
user_request=request,
identifier_value=dataset_id,
identifier_name="dataset_id",
parser=self._parse_dataset_id_to_dict,
identifier_required=True,
)

return self.dataset_service_client.get_dataset(
request=final_request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

def list_datasets(
self,
request: Optional[Union[dataset.ListDatasetsRequest, dict]] = None,
project_id: Optional[str] = None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is project_id before the star sign and request after? Is there any special consideration about order here?

*,
request: Optional["dataset.ListDatasetsRequest"] = None,
retry: OptionalRetry = DEFAULT_RETRY,
timeout: Union[float, object] = DEFAULT_TIMEOUT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = DEFAULT_METADATA,
):
"""
TODO: Docstring is purposefully blank. microgenerator will add automatically.
"""
kwargs = _helpers._drop_self_key(locals())
return self.dataset_service_client.list_datasets(**kwargs)
final_request = _helpers._make_request(
request_class=dataset.ListDatasetsRequest,
user_request=request,
identifier_value=project_id,
identifier_name="project_id",
parser=self._parse_project_id_to_dict,
identifier_required=False,
)

return self.dataset_service_client.list_datasets(
request=final_request,
retry=retry,
timeout=timeout,
metadata=metadata,
)

# ============================================================================
# TODO: HERE THERE BE DRAGONS. Once the above changes have been approved the
# methods below this comment will be updated to look and function similar
# to the above.
# NOT YET READY FOR REVIEW.
# ============================================================================

def list_jobs(
self,
Expand Down
Loading
Loading