From 2b4a5c1eebddd9138135247cf5eb535b6a9285b1 Mon Sep 17 00:00:00 2001 From: Hector Castejon Diaz Date: Thu, 17 Oct 2024 09:57:12 +0200 Subject: [PATCH 1/2] Fixes to docs and serving endpoint mixin --- .codegen/__init__.py.tmpl | 2 +- .codegen/_openapi_sha | 2 +- databricks/sdk/__init__.py | 2 +- databricks/sdk/service/apps.py | 54 ++++++++-------- databricks/sdk/service/catalog.py | 15 ++++- databricks/sdk/service/compute.py | 30 ++++----- databricks/sdk/service/dashboards.py | 9 ++- databricks/sdk/service/jobs.py | 53 ++++++++++++++- databricks/sdk/service/pipelines.py | 68 +++++++++++++++++--- databricks/sdk/service/sql.py | 20 ++++++ docs/gen-client-docs.py | 11 +++- docs/workspace/jobs/jobs.rst | 13 +++- docs/workspace/serving/serving_endpoints.rst | 8 ++- 13 files changed, 222 insertions(+), 65 deletions(-) diff --git a/.codegen/__init__.py.tmpl b/.codegen/__init__.py.tmpl index bc68f5654..d54e9dfff 100644 --- a/.codegen/__init__.py.tmpl +++ b/.codegen/__init__.py.tmpl @@ -18,7 +18,7 @@ from typing import Optional "google_credentials" "google_service_account" }} {{- define "api" -}} - {{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsExt" "ServingEndpointsApi" -}} + {{- $mixins := dict "ClustersAPI" "ClustersExt" "DbfsAPI" "DbfsExt" "WorkspaceAPI" "WorkspaceExt" "ServingEndpointsAPI" "ServingEndpointsExt" -}} {{- $genApi := concat .PascalName "API" -}} {{- getOrDefault $mixins $genApi $genApi -}} {{- end -}} diff --git a/.codegen/_openapi_sha b/.codegen/_openapi_sha index 7f9f41bb8..2d9cb6d86 100644 --- a/.codegen/_openapi_sha +++ b/.codegen/_openapi_sha @@ -1 +1 @@ -bc17b474818138f19b78a7bea0675707dead2b87 \ No newline at end of file +cf9c61453990df0f9453670f2fe68e1b128647a2 \ No newline at end of file diff --git a/databricks/sdk/__init__.py b/databricks/sdk/__init__.py index a4058ec51..159946461 100755 --- a/databricks/sdk/__init__.py +++ b/databricks/sdk/__init__.py @@ -638,7 +638,7 @@ def service_principals(self) -> ServicePrincipalsAPI: return self._service_principals @property - def serving_endpoints(self) -> ServingEndpointsAPI: + def serving_endpoints(self) -> ServingEndpointsExt: """The Serving Endpoints API allows you to create, update, and delete model serving endpoints.""" return self._serving_endpoints diff --git a/databricks/sdk/service/apps.py b/databricks/sdk/service/apps.py index 63bc981ba..5f413f0be 100755 --- a/databricks/sdk/service/apps.py +++ b/databricks/sdk/service/apps.py @@ -787,7 +787,7 @@ def wait_get_app_active(self, callback: Optional[Callable[[App], None]] = None) -> App: deadline = time.time() + timeout.total_seconds() target_states = (ComputeState.ACTIVE, ) - failure_states = (ComputeState.ERROR, ) + failure_states = (ComputeState.ERROR, ComputeState.STOPPED, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: @@ -813,29 +813,31 @@ def wait_get_app_active(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_app_stopped(self, - name: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[App], None]] = None) -> App: + def wait_get_deployment_app_succeeded( + self, + app_name: str, + deployment_id: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: deadline = time.time() + timeout.total_seconds() - target_states = (ComputeState.STOPPED, ) - failure_states = (ComputeState.ERROR, ) + target_states = (AppDeploymentState.SUCCEEDED, ) + failure_states = (AppDeploymentState.FAILED, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get(name=name) - status = poll.compute_status.state + poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) + status = poll.status.state status_message = f'current status: {status}' - if poll.compute_status: - status_message = poll.compute_status.message + if poll.status: + status_message = poll.status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach STOPPED, got {status}: {status_message}' + msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"name={name}" + prefix = f"app_name={app_name}, deployment_id={deployment_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -845,31 +847,29 @@ def wait_get_app_stopped(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_deployment_app_succeeded( - self, - app_name: str, - deployment_id: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: + def wait_get_app_stopped(self, + name: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[App], None]] = None) -> App: deadline = time.time() + timeout.total_seconds() - target_states = (AppDeploymentState.SUCCEEDED, ) - failure_states = (AppDeploymentState.FAILED, ) + target_states = (ComputeState.STOPPED, ) + failure_states = (ComputeState.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) - status = poll.status.state + poll = self.get(name=name) + status = poll.compute_status.state status_message = f'current status: {status}' - if poll.status: - status_message = poll.status.message + if poll.compute_status: + status_message = poll.compute_status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' + msg = f'failed to reach STOPPED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"app_name={app_name}, deployment_id={deployment_id}" + prefix = f"name={name}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/catalog.py b/databricks/sdk/service/catalog.py index 2ccff4217..b149dbbaa 100755 --- a/databricks/sdk/service/catalog.py +++ b/databricks/sdk/service/catalog.py @@ -3865,11 +3865,16 @@ class OnlineTable: """Specification of the online table.""" status: Optional[OnlineTableStatus] = None - """Online Table status""" + """Online Table data synchronization status""" table_serving_url: Optional[str] = None """Data serving REST API URL for this table""" + unity_catalog_provisioning_state: Optional[ProvisioningInfoState] = None + """The provisioning state of the online table entity in Unity Catalog. This is distinct from the + state of the data synchronization pipeline (i.e. the table may be in "ACTIVE" but the pipeline + may be in "PROVISIONING" as it runs asynchronously).""" + def as_dict(self) -> dict: """Serializes the OnlineTable into a dictionary suitable for use as a JSON request body.""" body = {} @@ -3877,6 +3882,8 @@ def as_dict(self) -> dict: if self.spec: body['spec'] = self.spec.as_dict() if self.status: body['status'] = self.status.as_dict() if self.table_serving_url is not None: body['table_serving_url'] = self.table_serving_url + if self.unity_catalog_provisioning_state is not None: + body['unity_catalog_provisioning_state'] = self.unity_catalog_provisioning_state.value return body @classmethod @@ -3885,7 +3892,9 @@ def from_dict(cls, d: Dict[str, any]) -> OnlineTable: return cls(name=d.get('name', None), spec=_from_dict(d, 'spec', OnlineTableSpec), status=_from_dict(d, 'status', OnlineTableStatus), - table_serving_url=d.get('table_serving_url', None)) + table_serving_url=d.get('table_serving_url', None), + unity_catalog_provisioning_state=_enum(d, 'unity_catalog_provisioning_state', + ProvisioningInfoState)) @dataclass @@ -4244,7 +4253,7 @@ class ProvisioningInfoState(Enum): DELETING = 'DELETING' FAILED = 'FAILED' PROVISIONING = 'PROVISIONING' - STATE_UNSPECIFIED = 'STATE_UNSPECIFIED' + UPDATING = 'UPDATING' @dataclass diff --git a/databricks/sdk/service/compute.py b/databricks/sdk/service/compute.py index 4a77496de..40def5df5 100755 --- a/databricks/sdk/service/compute.py +++ b/databricks/sdk/service/compute.py @@ -7865,20 +7865,19 @@ def wait_command_status_command_execution_cancelled( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_command_status_command_execution_finished_or_error( + def wait_context_status_command_execution_running( self, cluster_id: str, - command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: + callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) - failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) + target_states = (ContextStatus.RUNNING, ) + failure_states = (ContextStatus.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) + poll = self.context_status(cluster_id=cluster_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7886,9 +7885,9 @@ def wait_command_status_command_execution_finished_or_error( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Finished or Error, got {status}: {status_message}' + msg = f'failed to reach Running, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -7898,19 +7897,20 @@ def wait_command_status_command_execution_finished_or_error( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_context_status_command_execution_running( + def wait_command_status_command_execution_finished_or_error( self, cluster_id: str, + command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: + callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (ContextStatus.RUNNING, ) - failure_states = (ContextStatus.ERROR, ) + target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) + failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.context_status(cluster_id=cluster_id, context_id=context_id) + poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7918,9 +7918,9 @@ def wait_context_status_command_execution_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Running, got {status}: {status_message}' + msg = f'failed to reach Finished or Error, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/dashboards.py b/databricks/sdk/service/dashboards.py index 27117d43a..4a4c640e6 100755 --- a/databricks/sdk/service/dashboards.py +++ b/databricks/sdk/service/dashboards.py @@ -607,6 +607,7 @@ class MessageErrorType(Enum): LOCAL_CONTEXT_EXCEEDED_EXCEPTION = 'LOCAL_CONTEXT_EXCEEDED_EXCEPTION' MESSAGE_DELETED_WHILE_EXECUTING_EXCEPTION = 'MESSAGE_DELETED_WHILE_EXECUTING_EXCEPTION' MESSAGE_UPDATED_WHILE_EXECUTING_EXCEPTION = 'MESSAGE_UPDATED_WHILE_EXECUTING_EXCEPTION' + NO_QUERY_TO_VISUALIZE_EXCEPTION = 'NO_QUERY_TO_VISUALIZE_EXCEPTION' NO_TABLES_TO_QUERY_EXCEPTION = 'NO_TABLES_TO_QUERY_EXCEPTION' RATE_LIMIT_EXCEEDED_GENERIC_EXCEPTION = 'RATE_LIMIT_EXCEEDED_GENERIC_EXCEPTION' RATE_LIMIT_EXCEEDED_SPECIFIED_WAIT_EXCEPTION = 'RATE_LIMIT_EXCEEDED_SPECIFIED_WAIT_EXCEPTION' @@ -784,6 +785,9 @@ def from_dict(cls, d: Dict[str, any]) -> QueryAttachment: @dataclass class Result: + is_truncated: Optional[bool] = None + """If result is truncated""" + row_count: Optional[int] = None """Row count of the result""" @@ -794,6 +798,7 @@ class Result: def as_dict(self) -> dict: """Serializes the Result into a dictionary suitable for use as a JSON request body.""" body = {} + if self.is_truncated is not None: body['is_truncated'] = self.is_truncated if self.row_count is not None: body['row_count'] = self.row_count if self.statement_id is not None: body['statement_id'] = self.statement_id return body @@ -801,7 +806,9 @@ def as_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, any]) -> Result: """Deserializes the Result from a dictionary.""" - return cls(row_count=d.get('row_count', None), statement_id=d.get('statement_id', None)) + return cls(is_truncated=d.get('is_truncated', None), + row_count=d.get('row_count', None), + statement_id=d.get('statement_id', None)) @dataclass diff --git a/databricks/sdk/service/jobs.py b/databricks/sdk/service/jobs.py index b3c723f37..a4f138d6b 100755 --- a/databricks/sdk/service/jobs.py +++ b/databricks/sdk/service/jobs.py @@ -29,6 +29,12 @@ class BaseJob: """The creator user name. This field won’t be included in the response if the user has already been deleted.""" + effective_budget_policy_id: Optional[str] = None + """The id of the budget policy used by this job for cost attribution purposes. This may be set + through (in order of precedence): 1. Budget admins through the account or workspace console 2. + Jobs UI in the job details page and Jobs API using `budget_policy_id` 3. Inferred default based + on accessible budget policies of the run_as identity on job creation or modification.""" + job_id: Optional[int] = None """The canonical identifier for this job.""" @@ -41,6 +47,8 @@ def as_dict(self) -> dict: body = {} if self.created_time is not None: body['created_time'] = self.created_time if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name + if self.effective_budget_policy_id is not None: + body['effective_budget_policy_id'] = self.effective_budget_policy_id if self.job_id is not None: body['job_id'] = self.job_id if self.settings: body['settings'] = self.settings.as_dict() return body @@ -50,6 +58,7 @@ def from_dict(cls, d: Dict[str, any]) -> BaseJob: """Deserializes the BaseJob from a dictionary.""" return cls(created_time=d.get('created_time', None), creator_user_name=d.get('creator_user_name', None), + effective_budget_policy_id=d.get('effective_budget_policy_id', None), job_id=d.get('job_id', None), settings=_from_dict(d, 'settings', JobSettings)) @@ -484,6 +493,11 @@ class CreateJob: access_control_list: Optional[List[JobAccessControlRequest]] = None """List of permissions to set on the job.""" + budget_policy_id: Optional[str] = None + """The id of the user specified budget policy to use for this job. If not specified, a default + budget policy may be applied when creating or modifying the job. See + `effective_budget_policy_id` for the budget policy used by this workload.""" + continuous: Optional[Continuous] = None """An optional continuous property for this job. The continuous property will ensure that there is always one run executing. Only one of `schedule` and `continuous` can be used.""" @@ -591,6 +605,7 @@ def as_dict(self) -> dict: body = {} if self.access_control_list: body['access_control_list'] = [v.as_dict() for v in self.access_control_list] + if self.budget_policy_id is not None: body['budget_policy_id'] = self.budget_policy_id if self.continuous: body['continuous'] = self.continuous.as_dict() if self.deployment: body['deployment'] = self.deployment.as_dict() if self.description is not None: body['description'] = self.description @@ -619,6 +634,7 @@ def as_dict(self) -> dict: def from_dict(cls, d: Dict[str, any]) -> CreateJob: """Deserializes the CreateJob from a dictionary.""" return cls(access_control_list=_repeated_dict(d, 'access_control_list', JobAccessControlRequest), + budget_policy_id=d.get('budget_policy_id', None), continuous=_from_dict(d, 'continuous', Continuous), deployment=_from_dict(d, 'deployment', JobDeployment), description=d.get('description', None), @@ -1261,6 +1277,12 @@ class Job: """The creator user name. This field won’t be included in the response if the user has already been deleted.""" + effective_budget_policy_id: Optional[str] = None + """The id of the budget policy used by this job for cost attribution purposes. This may be set + through (in order of precedence): 1. Budget admins through the account or workspace console 2. + Jobs UI in the job details page and Jobs API using `budget_policy_id` 3. Inferred default based + on accessible budget policies of the run_as identity on job creation or modification.""" + job_id: Optional[int] = None """The canonical identifier for this job.""" @@ -1282,6 +1304,8 @@ def as_dict(self) -> dict: body = {} if self.created_time is not None: body['created_time'] = self.created_time if self.creator_user_name is not None: body['creator_user_name'] = self.creator_user_name + if self.effective_budget_policy_id is not None: + body['effective_budget_policy_id'] = self.effective_budget_policy_id if self.job_id is not None: body['job_id'] = self.job_id if self.run_as_user_name is not None: body['run_as_user_name'] = self.run_as_user_name if self.settings: body['settings'] = self.settings.as_dict() @@ -1292,6 +1316,7 @@ def from_dict(cls, d: Dict[str, any]) -> Job: """Deserializes the Job from a dictionary.""" return cls(created_time=d.get('created_time', None), creator_user_name=d.get('creator_user_name', None), + effective_budget_policy_id=d.get('effective_budget_policy_id', None), job_id=d.get('job_id', None), run_as_user_name=d.get('run_as_user_name', None), settings=_from_dict(d, 'settings', JobSettings)) @@ -1755,6 +1780,11 @@ def from_dict(cls, d: Dict[str, any]) -> JobRunAs: @dataclass class JobSettings: + budget_policy_id: Optional[str] = None + """The id of the user specified budget policy to use for this job. If not specified, a default + budget policy may be applied when creating or modifying the job. See + `effective_budget_policy_id` for the budget policy used by this workload.""" + continuous: Optional[Continuous] = None """An optional continuous property for this job. The continuous property will ensure that there is always one run executing. Only one of `schedule` and `continuous` can be used.""" @@ -1860,6 +1890,7 @@ class JobSettings: def as_dict(self) -> dict: """Serializes the JobSettings into a dictionary suitable for use as a JSON request body.""" body = {} + if self.budget_policy_id is not None: body['budget_policy_id'] = self.budget_policy_id if self.continuous: body['continuous'] = self.continuous.as_dict() if self.deployment: body['deployment'] = self.deployment.as_dict() if self.description is not None: body['description'] = self.description @@ -1887,7 +1918,8 @@ def as_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, any]) -> JobSettings: """Deserializes the JobSettings from a dictionary.""" - return cls(continuous=_from_dict(d, 'continuous', Continuous), + return cls(budget_policy_id=d.get('budget_policy_id', None), + continuous=_from_dict(d, 'continuous', Continuous), deployment=_from_dict(d, 'deployment', JobDeployment), description=d.get('description', None), edit_mode=_enum(d, 'edit_mode', JobEditMode), @@ -4507,6 +4539,10 @@ class SubmitRun: access_control_list: Optional[List[JobAccessControlRequest]] = None """List of permissions to set on the job.""" + budget_policy_id: Optional[str] = None + """The user specified id of the budget policy to use for this one-time run. If not specified, the + run will be not be attributed to any budget policy.""" + email_notifications: Optional[JobEmailNotifications] = None """An optional set of email addresses notified when the run begins or completes.""" @@ -4567,6 +4603,7 @@ def as_dict(self) -> dict: body = {} if self.access_control_list: body['access_control_list'] = [v.as_dict() for v in self.access_control_list] + if self.budget_policy_id is not None: body['budget_policy_id'] = self.budget_policy_id if self.email_notifications: body['email_notifications'] = self.email_notifications.as_dict() if self.environments: body['environments'] = [v.as_dict() for v in self.environments] if self.git_source: body['git_source'] = self.git_source.as_dict() @@ -4585,6 +4622,7 @@ def as_dict(self) -> dict: def from_dict(cls, d: Dict[str, any]) -> SubmitRun: """Deserializes the SubmitRun from a dictionary.""" return cls(access_control_list=_repeated_dict(d, 'access_control_list', JobAccessControlRequest), + budget_policy_id=d.get('budget_policy_id', None), email_notifications=_from_dict(d, 'email_notifications', JobEmailNotifications), environments=_repeated_dict(d, 'environments', JobEnvironment), git_source=_from_dict(d, 'git_source', GitSource), @@ -5619,6 +5657,7 @@ def cancel_run_and_wait(self, run_id: int, timeout=timedelta(minutes=20)) -> Run def create(self, *, access_control_list: Optional[List[JobAccessControlRequest]] = None, + budget_policy_id: Optional[str] = None, continuous: Optional[Continuous] = None, deployment: Optional[JobDeployment] = None, description: Optional[str] = None, @@ -5647,6 +5686,10 @@ def create(self, :param access_control_list: List[:class:`JobAccessControlRequest`] (optional) List of permissions to set on the job. + :param budget_policy_id: str (optional) + The id of the user specified budget policy to use for this job. If not specified, a default budget + policy may be applied when creating or modifying the job. See `effective_budget_policy_id` for the + budget policy used by this workload. :param continuous: :class:`Continuous` (optional) An optional continuous property for this job. The continuous property will ensure that there is always one run executing. Only one of `schedule` and `continuous` can be used. @@ -5731,6 +5774,7 @@ def create(self, body = {} if access_control_list is not None: body['access_control_list'] = [v.as_dict() for v in access_control_list] + if budget_policy_id is not None: body['budget_policy_id'] = budget_policy_id if continuous is not None: body['continuous'] = continuous.as_dict() if deployment is not None: body['deployment'] = deployment.as_dict() if description is not None: body['description'] = description @@ -6398,6 +6442,7 @@ def set_permissions( def submit(self, *, access_control_list: Optional[List[JobAccessControlRequest]] = None, + budget_policy_id: Optional[str] = None, email_notifications: Optional[JobEmailNotifications] = None, environments: Optional[List[JobEnvironment]] = None, git_source: Optional[GitSource] = None, @@ -6418,6 +6463,9 @@ def submit(self, :param access_control_list: List[:class:`JobAccessControlRequest`] (optional) List of permissions to set on the job. + :param budget_policy_id: str (optional) + The user specified id of the budget policy to use for this one-time run. If not specified, the run + will be not be attributed to any budget policy. :param email_notifications: :class:`JobEmailNotifications` (optional) An optional set of email addresses notified when the run begins or completes. :param environments: List[:class:`JobEnvironment`] (optional) @@ -6469,6 +6517,7 @@ def submit(self, body = {} if access_control_list is not None: body['access_control_list'] = [v.as_dict() for v in access_control_list] + if budget_policy_id is not None: body['budget_policy_id'] = budget_policy_id if email_notifications is not None: body['email_notifications'] = email_notifications.as_dict() if environments is not None: body['environments'] = [v.as_dict() for v in environments] if git_source is not None: body['git_source'] = git_source.as_dict() @@ -6492,6 +6541,7 @@ def submit_and_wait( self, *, access_control_list: Optional[List[JobAccessControlRequest]] = None, + budget_policy_id: Optional[str] = None, email_notifications: Optional[JobEmailNotifications] = None, environments: Optional[List[JobEnvironment]] = None, git_source: Optional[GitSource] = None, @@ -6506,6 +6556,7 @@ def submit_and_wait( webhook_notifications: Optional[WebhookNotifications] = None, timeout=timedelta(minutes=20)) -> Run: return self.submit(access_control_list=access_control_list, + budget_policy_id=budget_policy_id, email_notifications=email_notifications, environments=environments, git_source=git_source, diff --git a/databricks/sdk/service/pipelines.py b/databricks/sdk/service/pipelines.py index f102bdc9d..f99201fde 100755 --- a/databricks/sdk/service/pipelines.py +++ b/databricks/sdk/service/pipelines.py @@ -587,6 +587,9 @@ def from_dict(cls, d: Dict[str, any]) -> GetUpdateResponse: @dataclass class IngestionConfig: + report: Optional[ReportSpec] = None + """Select tables from a specific source report.""" + schema: Optional[SchemaSpec] = None """Select tables from a specific source schema.""" @@ -596,6 +599,7 @@ class IngestionConfig: def as_dict(self) -> dict: """Serializes the IngestionConfig into a dictionary suitable for use as a JSON request body.""" body = {} + if self.report: body['report'] = self.report.as_dict() if self.schema: body['schema'] = self.schema.as_dict() if self.table: body['table'] = self.table.as_dict() return body @@ -603,7 +607,9 @@ def as_dict(self) -> dict: @classmethod def from_dict(cls, d: Dict[str, any]) -> IngestionConfig: """Deserializes the IngestionConfig from a dictionary.""" - return cls(schema=_from_dict(d, 'schema', SchemaSpec), table=_from_dict(d, 'table', TableSpec)) + return cls(report=_from_dict(d, 'report', ReportSpec), + schema=_from_dict(d, 'schema', SchemaSpec), + table=_from_dict(d, 'table', TableSpec)) @dataclass @@ -1624,6 +1630,44 @@ def from_dict(cls, d: Dict[str, any]) -> PipelineTrigger: return cls(cron=_from_dict(d, 'cron', CronTrigger), manual=_from_dict(d, 'manual', ManualTrigger)) +@dataclass +class ReportSpec: + destination_catalog: Optional[str] = None + """Required. Destination catalog to store table.""" + + destination_schema: Optional[str] = None + """Required. Destination schema to store table.""" + + destination_table: Optional[str] = None + """Required. Destination table name. The pipeline fails if a table with that name already exists.""" + + source_url: Optional[str] = None + """Required. Report URL in the source system.""" + + table_configuration: Optional[TableSpecificConfig] = None + """Configuration settings to control the ingestion of tables. These settings override the + table_configuration defined in the IngestionPipelineDefinition object.""" + + def as_dict(self) -> dict: + """Serializes the ReportSpec into a dictionary suitable for use as a JSON request body.""" + body = {} + if self.destination_catalog is not None: body['destination_catalog'] = self.destination_catalog + if self.destination_schema is not None: body['destination_schema'] = self.destination_schema + if self.destination_table is not None: body['destination_table'] = self.destination_table + if self.source_url is not None: body['source_url'] = self.source_url + if self.table_configuration: body['table_configuration'] = self.table_configuration.as_dict() + return body + + @classmethod + def from_dict(cls, d: Dict[str, any]) -> ReportSpec: + """Deserializes the ReportSpec from a dictionary.""" + return cls(destination_catalog=d.get('destination_catalog', None), + destination_schema=d.get('destination_schema', None), + destination_table=d.get('destination_table', None), + source_url=d.get('source_url', None), + table_configuration=_from_dict(d, 'table_configuration', TableSpecificConfig)) + + @dataclass class SchemaSpec: destination_catalog: Optional[str] = None @@ -1841,7 +1885,7 @@ class TableSpec: """Required. Destination schema to store table.""" destination_table: Optional[str] = None - """Optional. Destination table name. The pipeline fails If a table with that name already exists. + """Optional. Destination table name. The pipeline fails if a table with that name already exists. If not set, the source table name is used.""" source_catalog: Optional[str] = None @@ -1893,6 +1937,10 @@ class TableSpecificConfig: scd_type: Optional[TableSpecificConfigScdType] = None """The SCD type to use to ingest the table.""" + sequence_by: Optional[List[str]] = None + """The column names specifying the logical order of events in the source data. Delta Live Tables + uses this sequencing to handle change events that arrive out of order.""" + def as_dict(self) -> dict: """Serializes the TableSpecificConfig into a dictionary suitable for use as a JSON request body.""" body = {} @@ -1900,6 +1948,7 @@ def as_dict(self) -> dict: if self.salesforce_include_formula_fields is not None: body['salesforce_include_formula_fields'] = self.salesforce_include_formula_fields if self.scd_type is not None: body['scd_type'] = self.scd_type.value + if self.sequence_by: body['sequence_by'] = [v for v in self.sequence_by] return body @classmethod @@ -1907,7 +1956,8 @@ def from_dict(cls, d: Dict[str, any]) -> TableSpecificConfig: """Deserializes the TableSpecificConfig from a dictionary.""" return cls(primary_keys=d.get('primary_keys', None), salesforce_include_formula_fields=d.get('salesforce_include_formula_fields', None), - scd_type=_enum(d, 'scd_type', TableSpecificConfigScdType)) + scd_type=_enum(d, 'scd_type', TableSpecificConfigScdType), + sequence_by=d.get('sequence_by', None)) class TableSpecificConfigScdType(Enum): @@ -2072,13 +2122,13 @@ class PipelinesAPI: def __init__(self, api_client): self._api = api_client - def wait_get_pipeline_idle( + def wait_get_pipeline_running( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.IDLE, ) + target_states = (PipelineState.RUNNING, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2091,7 +2141,7 @@ def wait_get_pipeline_idle( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach IDLE, got {status}: {status_message}' + msg = f'failed to reach RUNNING, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt @@ -2103,13 +2153,13 @@ def wait_get_pipeline_idle( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_pipeline_running( + def wait_get_pipeline_idle( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.RUNNING, ) + target_states = (PipelineState.IDLE, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2122,7 +2172,7 @@ def wait_get_pipeline_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach RUNNING, got {status}: {status_message}' + msg = f'failed to reach IDLE, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt diff --git a/databricks/sdk/service/sql.py b/databricks/sdk/service/sql.py index 4f0e49c77..7a224feeb 100755 --- a/databricks/sdk/service/sql.py +++ b/databricks/sdk/service/sql.py @@ -72,6 +72,9 @@ class Alert: lifecycle_state: Optional[LifecycleState] = None """The workspace state of the alert. Used for tracking trashed status.""" + notify_on_ok: Optional[bool] = None + """Whether to notify alert subscribers when alert returns back to normal.""" + owner_user_name: Optional[str] = None """The owner's username. This field is set to "Unavailable" if the user has been deleted.""" @@ -105,6 +108,7 @@ def as_dict(self) -> dict: if self.display_name is not None: body['display_name'] = self.display_name if self.id is not None: body['id'] = self.id if self.lifecycle_state is not None: body['lifecycle_state'] = self.lifecycle_state.value + if self.notify_on_ok is not None: body['notify_on_ok'] = self.notify_on_ok if self.owner_user_name is not None: body['owner_user_name'] = self.owner_user_name if self.parent_path is not None: body['parent_path'] = self.parent_path if self.query_id is not None: body['query_id'] = self.query_id @@ -124,6 +128,7 @@ def from_dict(cls, d: Dict[str, any]) -> Alert: display_name=d.get('display_name', None), id=d.get('id', None), lifecycle_state=_enum(d, 'lifecycle_state', LifecycleState), + notify_on_ok=d.get('notify_on_ok', None), owner_user_name=d.get('owner_user_name', None), parent_path=d.get('parent_path', None), query_id=d.get('query_id', None), @@ -652,6 +657,9 @@ class CreateAlertRequestAlert: display_name: Optional[str] = None """The display name of the alert.""" + notify_on_ok: Optional[bool] = None + """Whether to notify alert subscribers when alert returns back to normal.""" + parent_path: Optional[str] = None """The workspace path of the folder containing the alert.""" @@ -669,6 +677,7 @@ def as_dict(self) -> dict: if self.custom_body is not None: body['custom_body'] = self.custom_body if self.custom_subject is not None: body['custom_subject'] = self.custom_subject if self.display_name is not None: body['display_name'] = self.display_name + if self.notify_on_ok is not None: body['notify_on_ok'] = self.notify_on_ok if self.parent_path is not None: body['parent_path'] = self.parent_path if self.query_id is not None: body['query_id'] = self.query_id if self.seconds_to_retrigger is not None: body['seconds_to_retrigger'] = self.seconds_to_retrigger @@ -681,6 +690,7 @@ def from_dict(cls, d: Dict[str, any]) -> CreateAlertRequestAlert: custom_body=d.get('custom_body', None), custom_subject=d.get('custom_subject', None), display_name=d.get('display_name', None), + notify_on_ok=d.get('notify_on_ok', None), parent_path=d.get('parent_path', None), query_id=d.get('query_id', None), seconds_to_retrigger=d.get('seconds_to_retrigger', None)) @@ -2696,6 +2706,9 @@ class ListAlertsResponseAlert: lifecycle_state: Optional[LifecycleState] = None """The workspace state of the alert. Used for tracking trashed status.""" + notify_on_ok: Optional[bool] = None + """Whether to notify alert subscribers when alert returns back to normal.""" + owner_user_name: Optional[str] = None """The owner's username. This field is set to "Unavailable" if the user has been deleted.""" @@ -2726,6 +2739,7 @@ def as_dict(self) -> dict: if self.display_name is not None: body['display_name'] = self.display_name if self.id is not None: body['id'] = self.id if self.lifecycle_state is not None: body['lifecycle_state'] = self.lifecycle_state.value + if self.notify_on_ok is not None: body['notify_on_ok'] = self.notify_on_ok if self.owner_user_name is not None: body['owner_user_name'] = self.owner_user_name if self.query_id is not None: body['query_id'] = self.query_id if self.seconds_to_retrigger is not None: body['seconds_to_retrigger'] = self.seconds_to_retrigger @@ -2744,6 +2758,7 @@ def from_dict(cls, d: Dict[str, any]) -> ListAlertsResponseAlert: display_name=d.get('display_name', None), id=d.get('id', None), lifecycle_state=_enum(d, 'lifecycle_state', LifecycleState), + notify_on_ok=d.get('notify_on_ok', None), owner_user_name=d.get('owner_user_name', None), query_id=d.get('query_id', None), seconds_to_retrigger=d.get('seconds_to_retrigger', None), @@ -4561,6 +4576,9 @@ class UpdateAlertRequestAlert: display_name: Optional[str] = None """The display name of the alert.""" + notify_on_ok: Optional[bool] = None + """Whether to notify alert subscribers when alert returns back to normal.""" + owner_user_name: Optional[str] = None """The owner's username. This field is set to "Unavailable" if the user has been deleted.""" @@ -4578,6 +4596,7 @@ def as_dict(self) -> dict: if self.custom_body is not None: body['custom_body'] = self.custom_body if self.custom_subject is not None: body['custom_subject'] = self.custom_subject if self.display_name is not None: body['display_name'] = self.display_name + if self.notify_on_ok is not None: body['notify_on_ok'] = self.notify_on_ok if self.owner_user_name is not None: body['owner_user_name'] = self.owner_user_name if self.query_id is not None: body['query_id'] = self.query_id if self.seconds_to_retrigger is not None: body['seconds_to_retrigger'] = self.seconds_to_retrigger @@ -4590,6 +4609,7 @@ def from_dict(cls, d: Dict[str, any]) -> UpdateAlertRequestAlert: custom_body=d.get('custom_body', None), custom_subject=d.get('custom_subject', None), display_name=d.get('display_name', None), + notify_on_ok=d.get('notify_on_ok', None), owner_user_name=d.get('owner_user_name', None), query_id=d.get('query_id', None), seconds_to_retrigger=d.get('seconds_to_retrigger', None)) diff --git a/docs/gen-client-docs.py b/docs/gen-client-docs.py index 4b52d817d..5c32beffe 100644 --- a/docs/gen-client-docs.py +++ b/docs/gen-client-docs.py @@ -259,7 +259,7 @@ def _openapi_spec(self) -> str: return f.read() with open(f'{__dir__}/../.codegen/_openapi_sha') as f: sha = f.read().strip() - return subprocess.check_output(['deco', 'openapi', 'get', sha]).decode('utf-8') + return subprocess.check_output(['genkit', 'get', sha]).decode('utf-8') def _load_mapping(self) -> dict[str, Tag]: mapping = {} @@ -342,8 +342,15 @@ def service_docs(self, client_inst, client_prefix: str) -> list[ServiceDoc]: continue if service_name in ignore_client_fields: continue - class_doc = service_inst.__doc__ + class_name = service_inst.__class__.__name__ + + # Use original class docstring for mixin classes + if class_name.endswith('Ext'): + class_doc = service_inst.__class__.__base__.__doc__ + else: + class_doc = service_inst.__doc__ + print(f'Processing service {client_prefix}.{service_name}') all += self.service_docs(service_inst, client_prefix + "." + service_name) diff --git a/docs/workspace/jobs/jobs.rst b/docs/workspace/jobs/jobs.rst index 3c6e0f2e4..b097c94c8 100644 --- a/docs/workspace/jobs/jobs.rst +++ b/docs/workspace/jobs/jobs.rst @@ -120,7 +120,7 @@ .. py:method:: cancel_run_and_wait(run_id: int, timeout: datetime.timedelta = 0:20:00) -> Run - .. py:method:: create( [, access_control_list: Optional[List[JobAccessControlRequest]], continuous: Optional[Continuous], deployment: Optional[JobDeployment], description: Optional[str], edit_mode: Optional[JobEditMode], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], format: Optional[Format], git_source: Optional[GitSource], health: Optional[JobsHealthRules], job_clusters: Optional[List[JobCluster]], max_concurrent_runs: Optional[int], name: Optional[str], notification_settings: Optional[JobNotificationSettings], parameters: Optional[List[JobParameterDefinition]], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], schedule: Optional[CronSchedule], tags: Optional[Dict[str, str]], tasks: Optional[List[Task]], timeout_seconds: Optional[int], trigger: Optional[TriggerSettings], webhook_notifications: Optional[WebhookNotifications]]) -> CreateResponse + .. py:method:: create( [, access_control_list: Optional[List[JobAccessControlRequest]], budget_policy_id: Optional[str], continuous: Optional[Continuous], deployment: Optional[JobDeployment], description: Optional[str], edit_mode: Optional[JobEditMode], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], format: Optional[Format], git_source: Optional[GitSource], health: Optional[JobsHealthRules], job_clusters: Optional[List[JobCluster]], max_concurrent_runs: Optional[int], name: Optional[str], notification_settings: Optional[JobNotificationSettings], parameters: Optional[List[JobParameterDefinition]], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], schedule: Optional[CronSchedule], tags: Optional[Dict[str, str]], tasks: Optional[List[Task]], timeout_seconds: Optional[int], trigger: Optional[TriggerSettings], webhook_notifications: Optional[WebhookNotifications]]) -> CreateResponse Usage: @@ -158,6 +158,10 @@ :param access_control_list: List[:class:`JobAccessControlRequest`] (optional) List of permissions to set on the job. + :param budget_policy_id: str (optional) + The id of the user specified budget policy to use for this job. If not specified, a default budget + policy may be applied when creating or modifying the job. See `effective_budget_policy_id` for the + budget policy used by this workload. :param continuous: :class:`Continuous` (optional) An optional continuous property for this job. The continuous property will ensure that there is always one run executing. Only one of `schedule` and `continuous` can be used. @@ -931,7 +935,7 @@ :returns: :class:`JobPermissions` - .. py:method:: submit( [, access_control_list: Optional[List[JobAccessControlRequest]], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], git_source: Optional[GitSource], health: Optional[JobsHealthRules], idempotency_token: Optional[str], notification_settings: Optional[JobNotificationSettings], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], run_name: Optional[str], tasks: Optional[List[SubmitTask]], timeout_seconds: Optional[int], webhook_notifications: Optional[WebhookNotifications]]) -> Wait[Run] + .. py:method:: submit( [, access_control_list: Optional[List[JobAccessControlRequest]], budget_policy_id: Optional[str], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], git_source: Optional[GitSource], health: Optional[JobsHealthRules], idempotency_token: Optional[str], notification_settings: Optional[JobNotificationSettings], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], run_name: Optional[str], tasks: Optional[List[SubmitTask]], timeout_seconds: Optional[int], webhook_notifications: Optional[WebhookNotifications]]) -> Wait[Run] Usage: @@ -969,6 +973,9 @@ :param access_control_list: List[:class:`JobAccessControlRequest`] (optional) List of permissions to set on the job. + :param budget_policy_id: str (optional) + The user specified id of the budget policy to use for this one-time run. If not specified, the run + will be not be attributed to any budget policy. :param email_notifications: :class:`JobEmailNotifications` (optional) An optional set of email addresses notified when the run begins or completes. :param environments: List[:class:`JobEnvironment`] (optional) @@ -1018,7 +1025,7 @@ See :method:wait_get_run_job_terminated_or_skipped for more details. - .. py:method:: submit_and_wait( [, access_control_list: Optional[List[JobAccessControlRequest]], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], git_source: Optional[GitSource], health: Optional[JobsHealthRules], idempotency_token: Optional[str], notification_settings: Optional[JobNotificationSettings], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], run_name: Optional[str], tasks: Optional[List[SubmitTask]], timeout_seconds: Optional[int], webhook_notifications: Optional[WebhookNotifications], timeout: datetime.timedelta = 0:20:00]) -> Run + .. py:method:: submit_and_wait( [, access_control_list: Optional[List[JobAccessControlRequest]], budget_policy_id: Optional[str], email_notifications: Optional[JobEmailNotifications], environments: Optional[List[JobEnvironment]], git_source: Optional[GitSource], health: Optional[JobsHealthRules], idempotency_token: Optional[str], notification_settings: Optional[JobNotificationSettings], queue: Optional[QueueSettings], run_as: Optional[JobRunAs], run_name: Optional[str], tasks: Optional[List[SubmitTask]], timeout_seconds: Optional[int], webhook_notifications: Optional[WebhookNotifications], timeout: datetime.timedelta = 0:20:00]) -> Run .. py:method:: update(job_id: int [, fields_to_remove: Optional[List[str]], new_settings: Optional[JobSettings]]) diff --git a/docs/workspace/serving/serving_endpoints.rst b/docs/workspace/serving/serving_endpoints.rst index 8e21197a1..cbcbca964 100644 --- a/docs/workspace/serving/serving_endpoints.rst +++ b/docs/workspace/serving/serving_endpoints.rst @@ -2,7 +2,7 @@ ========================================== .. currentmodule:: databricks.sdk.service.serving -.. py:class:: ServingEndpointsAPI +.. py:class:: ServingEndpointsExt The Serving Endpoints API allows you to create, update, and delete model serving endpoints. @@ -92,6 +92,12 @@ :returns: :class:`ServingEndpointDetailed` + .. py:method:: get_langchain_chat_open_ai_client(model) + + + .. py:method:: get_open_ai_client() + + .. py:method:: get_open_api(name: str) Get the schema for a serving endpoint. From 6cd119f671d16dacc35d98bca0dd33f75b788d31 Mon Sep 17 00:00:00 2001 From: Omer Lachish Date: Thu, 17 Oct 2024 11:14:49 +0200 Subject: [PATCH 2/2] ran genkit generate-sdk --- databricks/sdk/service/apps.py | 52 ++++++++++++++--------------- databricks/sdk/service/compute.py | 30 ++++++++--------- databricks/sdk/service/pipelines.py | 12 +++---- docs/dbdataclasses/catalog.rst | 4 +-- docs/dbdataclasses/dashboards.rst | 3 ++ docs/dbdataclasses/pipelines.rst | 4 +++ 6 files changed, 56 insertions(+), 49 deletions(-) diff --git a/databricks/sdk/service/apps.py b/databricks/sdk/service/apps.py index 5f413f0be..52796d0e8 100755 --- a/databricks/sdk/service/apps.py +++ b/databricks/sdk/service/apps.py @@ -813,31 +813,29 @@ def wait_get_app_active(self, attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_deployment_app_succeeded( - self, - app_name: str, - deployment_id: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: + def wait_get_app_stopped(self, + name: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[App], None]] = None) -> App: deadline = time.time() + timeout.total_seconds() - target_states = (AppDeploymentState.SUCCEEDED, ) - failure_states = (AppDeploymentState.FAILED, ) + target_states = (ComputeState.STOPPED, ) + failure_states = (ComputeState.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) - status = poll.status.state + poll = self.get(name=name) + status = poll.compute_status.state status_message = f'current status: {status}' - if poll.status: - status_message = poll.status.message + if poll.compute_status: + status_message = poll.compute_status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' + msg = f'failed to reach STOPPED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"app_name={app_name}, deployment_id={deployment_id}" + prefix = f"name={name}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -847,29 +845,31 @@ def wait_get_deployment_app_succeeded( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_app_stopped(self, - name: str, - timeout=timedelta(minutes=20), - callback: Optional[Callable[[App], None]] = None) -> App: + def wait_get_deployment_app_succeeded( + self, + app_name: str, + deployment_id: str, + timeout=timedelta(minutes=20), + callback: Optional[Callable[[AppDeployment], None]] = None) -> AppDeployment: deadline = time.time() + timeout.total_seconds() - target_states = (ComputeState.STOPPED, ) - failure_states = (ComputeState.ERROR, ) + target_states = (AppDeploymentState.SUCCEEDED, ) + failure_states = (AppDeploymentState.FAILED, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.get(name=name) - status = poll.compute_status.state + poll = self.get_deployment(app_name=app_name, deployment_id=deployment_id) + status = poll.status.state status_message = f'current status: {status}' - if poll.compute_status: - status_message = poll.compute_status.message + if poll.status: + status_message = poll.status.message if status in target_states: return poll if callback: callback(poll) if status in failure_states: - msg = f'failed to reach STOPPED, got {status}: {status_message}' + msg = f'failed to reach SUCCEEDED, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"name={name}" + prefix = f"app_name={app_name}, deployment_id={deployment_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/compute.py b/databricks/sdk/service/compute.py index 40def5df5..4a77496de 100755 --- a/databricks/sdk/service/compute.py +++ b/databricks/sdk/service/compute.py @@ -7865,19 +7865,20 @@ def wait_command_status_command_execution_cancelled( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_context_status_command_execution_running( + def wait_command_status_command_execution_finished_or_error( self, cluster_id: str, + command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: + callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (ContextStatus.RUNNING, ) - failure_states = (ContextStatus.ERROR, ) + target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) + failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.context_status(cluster_id=cluster_id, context_id=context_id) + poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7885,9 +7886,9 @@ def wait_context_status_command_execution_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Running, got {status}: {status_message}' + msg = f'failed to reach Finished or Error, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt @@ -7897,20 +7898,19 @@ def wait_context_status_command_execution_running( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_command_status_command_execution_finished_or_error( + def wait_context_status_command_execution_running( self, cluster_id: str, - command_id: str, context_id: str, timeout=timedelta(minutes=20), - callback: Optional[Callable[[CommandStatusResponse], None]] = None) -> CommandStatusResponse: + callback: Optional[Callable[[ContextStatusResponse], None]] = None) -> ContextStatusResponse: deadline = time.time() + timeout.total_seconds() - target_states = (CommandStatus.FINISHED, CommandStatus.ERROR, ) - failure_states = (CommandStatus.CANCELLED, CommandStatus.CANCELLING, ) + target_states = (ContextStatus.RUNNING, ) + failure_states = (ContextStatus.ERROR, ) status_message = 'polling...' attempt = 1 while time.time() < deadline: - poll = self.command_status(cluster_id=cluster_id, command_id=command_id, context_id=context_id) + poll = self.context_status(cluster_id=cluster_id, context_id=context_id) status = poll.status status_message = f'current status: {status}' if status in target_states: @@ -7918,9 +7918,9 @@ def wait_command_status_command_execution_finished_or_error( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach Finished or Error, got {status}: {status_message}' + msg = f'failed to reach Running, got {status}: {status_message}' raise OperationFailed(msg) - prefix = f"cluster_id={cluster_id}, command_id={command_id}, context_id={context_id}" + prefix = f"cluster_id={cluster_id}, context_id={context_id}" sleep = attempt if sleep > 10: # sleep 10s max per attempt diff --git a/databricks/sdk/service/pipelines.py b/databricks/sdk/service/pipelines.py index f99201fde..9c12f8788 100755 --- a/databricks/sdk/service/pipelines.py +++ b/databricks/sdk/service/pipelines.py @@ -2122,13 +2122,13 @@ class PipelinesAPI: def __init__(self, api_client): self._api = api_client - def wait_get_pipeline_running( + def wait_get_pipeline_idle( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.RUNNING, ) + target_states = (PipelineState.IDLE, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2141,7 +2141,7 @@ def wait_get_pipeline_running( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach RUNNING, got {status}: {status_message}' + msg = f'failed to reach IDLE, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt @@ -2153,13 +2153,13 @@ def wait_get_pipeline_running( attempt += 1 raise TimeoutError(f'timed out after {timeout}: {status_message}') - def wait_get_pipeline_idle( + def wait_get_pipeline_running( self, pipeline_id: str, timeout=timedelta(minutes=20), callback: Optional[Callable[[GetPipelineResponse], None]] = None) -> GetPipelineResponse: deadline = time.time() + timeout.total_seconds() - target_states = (PipelineState.IDLE, ) + target_states = (PipelineState.RUNNING, ) failure_states = (PipelineState.FAILED, ) status_message = 'polling...' attempt = 1 @@ -2172,7 +2172,7 @@ def wait_get_pipeline_idle( if callback: callback(poll) if status in failure_states: - msg = f'failed to reach IDLE, got {status}: {status_message}' + msg = f'failed to reach RUNNING, got {status}: {status_message}' raise OperationFailed(msg) prefix = f"pipeline_id={pipeline_id}" sleep = attempt diff --git a/docs/dbdataclasses/catalog.rst b/docs/dbdataclasses/catalog.rst index b0f4f838e..cb6399348 100644 --- a/docs/dbdataclasses/catalog.rst +++ b/docs/dbdataclasses/catalog.rst @@ -1194,8 +1194,8 @@ These dataclasses are used in the SDK to represent API requests and responses fo .. py:attribute:: PROVISIONING :value: "PROVISIONING" - .. py:attribute:: STATE_UNSPECIFIED - :value: "STATE_UNSPECIFIED" + .. py:attribute:: UPDATING + :value: "UPDATING" .. autoclass:: ProvisioningStatus :members: diff --git a/docs/dbdataclasses/dashboards.rst b/docs/dbdataclasses/dashboards.rst index 192095548..91de6ccb2 100644 --- a/docs/dbdataclasses/dashboards.rst +++ b/docs/dbdataclasses/dashboards.rst @@ -166,6 +166,9 @@ These dataclasses are used in the SDK to represent API requests and responses fo .. py:attribute:: MESSAGE_UPDATED_WHILE_EXECUTING_EXCEPTION :value: "MESSAGE_UPDATED_WHILE_EXECUTING_EXCEPTION" + .. py:attribute:: NO_QUERY_TO_VISUALIZE_EXCEPTION + :value: "NO_QUERY_TO_VISUALIZE_EXCEPTION" + .. py:attribute:: NO_TABLES_TO_QUERY_EXCEPTION :value: "NO_TABLES_TO_QUERY_EXCEPTION" diff --git a/docs/dbdataclasses/pipelines.rst b/docs/dbdataclasses/pipelines.rst index 9d3d9c8a7..9f419f160 100644 --- a/docs/dbdataclasses/pipelines.rst +++ b/docs/dbdataclasses/pipelines.rst @@ -265,6 +265,10 @@ These dataclasses are used in the SDK to represent API requests and responses fo :members: :undoc-members: +.. autoclass:: ReportSpec + :members: + :undoc-members: + .. autoclass:: SchemaSpec :members: :undoc-members: