Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sqlmesh/core/config/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1062,6 +1062,7 @@ class BigQueryConnectionConfig(ConnectionConfig):
job_retry_deadline_seconds: t.Optional[int] = None
priority: t.Optional[BigQueryPriority] = None
maximum_bytes_billed: t.Optional[int] = None
reservation_id: t.Optional[str] = None

concurrent_tasks: int = 1
register_comments: bool = True
Expand Down Expand Up @@ -1171,6 +1172,7 @@ def _extra_engine_config(self) -> t.Dict[str, t.Any]:
"job_retry_deadline_seconds",
"priority",
"maximum_bytes_billed",
"reservation_id",
}
}

Expand Down
6 changes: 6 additions & 0 deletions sqlmesh/core/engine_adapter/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,7 +1106,13 @@ def _execute(
else []
)

# Create job config with reservation support
job_config = QueryJobConfig(**self._job_params, connection_properties=connection_properties)

# Set reservation directly on the job_config object if specified
reservation_id = self._extra_config.get("reservation_id")
if reservation_id:
job_config.reservation = reservation_id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about three things here:

  1. Should we be checking if reservation_id is not None instead of whether it's simply falsey?
  2. How come we're setting a field reservation with a property called reservation_id? Are you sure this is the right attribute in the QueryJobConfig instance? Seems like it contains a property/getter for accessing the reservation_id.
  3. Is this the recommended way for setting the reservation ID in the config? What about the QueryJobConfig's constructor?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we be checking if reservation_id is not None instead of whether it's simply falsey?

Yes, I made this update to follow the same pattern used for the maximum_bytes_billed property.

How come we're setting a field reservation with a property called reservation_id? Are you sure this is the right attribute in the QueryJobConfig instance? Seems like it [contains a property/getter](https://github.com/googleapis/python-bigquery/blob/46764a59ca7a21ed14ad2c91eb7f98c302736c22/google/cloud/bigquery/job/base.py#L540-L550) for accessing the reservation_id.

Agreed, the better name is reservation instead of reservation_id. I updated this also.

Is this the recommended way for setting the reservation ID in the config? What about the QueryJobConfig's constructor?

I updated the _job_params dictionary, it gets passed to QueryJobConfig(**self._job_params) just like priority, and maximum_bytes_billed

Copy link
Contributor

@georgesittas georgesittas Jan 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, @scottypate, I dug a bit deeper in bigquery's code myself. I'm not sure if we can rely on _job_params to populate reservation, because the constructor sets attributes, whereas the getter/setter properties look into _properties.

So I wonder if we should just revert to what you did. Have you tested this new code you added end-to-end? Does it work?

self._query_job = self._db_call(
self.client.query,
query=sql,
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -1951,7 +1951,7 @@ def test_init_dbt_template(runner: CliRunner, tmp_path: Path):
def test_init_project_engine_configs(tmp_path):
engine_type_to_config = {
"redshift": "# concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # database: \n # host: \n # port: \n # source_address: \n # unix_sock: \n # ssl: \n # sslmode: \n # timeout: \n # tcp_keepalive: \n # application_name: \n # preferred_role: \n # principal_arn: \n # credentials_provider: \n # region: \n # cluster_identifier: \n # iam: \n # is_serverless: \n # serverless_acct_id: \n # serverless_work_group: \n # enable_merge: ",
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: ",
"bigquery": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # method: oauth\n # project: \n # execution_project: \n # quota_project: \n # location: \n # keyfile: \n # keyfile_json: \n # token: \n # refresh_token: \n # client_id: \n # client_secret: \n # token_uri: \n # scopes: \n # impersonated_service_account: \n # job_creation_timeout_seconds: \n # job_execution_timeout_seconds: \n # job_retries: 1\n # job_retry_deadline_seconds: \n # priority: \n # maximum_bytes_billed: \n # reservation_id: ",
"snowflake": "account: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # user: \n # password: \n # warehouse: \n # database: \n # role: \n # authenticator: \n # token: \n # host: \n # port: \n # application: Tobiko_SQLMesh\n # private_key: \n # private_key_path: \n # private_key_passphrase: \n # session_parameters: ",
"databricks": "# concurrent_tasks: 1\n # register_comments: True\n # pre_ping: False\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # server_hostname: \n # http_path: \n # access_token: \n # auth_type: \n # oauth_client_id: \n # oauth_client_secret: \n # catalog: \n # http_headers: \n # session_configuration: \n # databricks_connect_server_hostname: \n # databricks_connect_access_token: \n # databricks_connect_cluster_id: \n # databricks_connect_use_serverless: False\n # force_databricks_connect: False\n # disable_databricks_connect: False\n # disable_spark_session: False",
"postgres": "host: \n user: \n password: \n port: \n database: \n # concurrent_tasks: 4\n # register_comments: True\n # pre_ping: True\n # pretty_sql: False\n # schema_differ_overrides: \n # catalog_type_overrides: \n # keepalives_idle: \n # connect_timeout: 10\n # role: \n # sslmode: \n # application_name: ",
Expand Down
21 changes: 21 additions & 0 deletions tests/core/test_connection_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1131,6 +1131,27 @@ def test_bigquery(make_config):
assert config.get_catalog() == "project"
assert config.is_recommended_for_state_sync is False

# Test reservation_id
config_with_reservation = make_config(
type="bigquery",
project="project",
reservation_id="projects/my-project/locations/us-central1/reservations/my-reservation",
check_import=False,
)
assert isinstance(config_with_reservation, BigQueryConnectionConfig)
assert (
config_with_reservation.reservation_id
== "projects/my-project/locations/us-central1/reservations/my-reservation"
)

# Test that reservation_id is included in _extra_engine_config
extra_config = config_with_reservation._extra_engine_config
assert "reservation_id" in extra_config
assert (
extra_config["reservation_id"]
== "projects/my-project/locations/us-central1/reservations/my-reservation"
)

with pytest.raises(ConfigError, match="you must also specify the `project` field"):
make_config(type="bigquery", execution_project="execution_project", check_import=False)

Expand Down
Loading