From cd5106f0fd3ffc8a8759c3887f9770b4b979fc14 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Tue, 21 Jan 2025 17:11:55 +0100 Subject: [PATCH 1/4] GetRun paginates more arrays --- databricks/sdk/mixins/jobs.py | 15 ++++++---- tests/test_jobs_mixin.py | 54 +++++++++++++++++++++++++++++------ 2 files changed, 56 insertions(+), 13 deletions(-) diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py index 01fb013be..6fe70f335 100644 --- a/databricks/sdk/mixins/jobs.py +++ b/databricks/sdk/mixins/jobs.py @@ -11,9 +11,10 @@ def get_run(self, include_history: Optional[bool] = None, include_resolved_values: Optional[bool] = None, page_token: Optional[str] = None) -> jobs.Run: - """ - This method fetches the details of a run identified by `run_id`. If the run has multiple pages of tasks or iterations, - it will paginate through all pages and aggregate the results. + """Get a single job run. + + Retrieve the metadata of a run. If a run has multiple pages of tasks, it will paginate through all pages of tasks, iterations, job_clusters, job_parameters, and repair history. + :param run_id: int The canonical identifier of the run for which to retrieve the metadata. This field is required. :param include_history: bool (optional) @@ -21,8 +22,9 @@ def get_run(self, :param include_resolved_values: bool (optional) Whether to include resolved parameter values in the response. :param page_token: str (optional) - To list the next page or the previous page of job tasks, set this field to the value of the - `next_page_token` or `prev_page_token` returned in the GetJob response. + To list the next page of job tasks, set this field to the value of the `next_page_token` returned in + the GetJob response. + :returns: :class:`Run` """ run = super().get_run(run_id, @@ -43,6 +45,9 @@ def get_run(self, run.iterations.extend(next_run.iterations) else: run.tasks.extend(next_run.tasks) + run.job_clusters.extend(next_run.job_clusters) + run.job_parameters.extend(next_run.job_parameters) + run.repair_history.extend(next_run.repair_history) run.next_page_token = next_run.next_page_token run.prev_page_token = None diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index 9b5f27138..d1d325ac4 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -22,14 +22,32 @@ def test_get_run_with_no_pagination(config, requests_mock): def test_get_run_pagination_with_tasks(config, requests_mock): + from databricks.sdk.service import compute, jobs + cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12", custom_tags={"ResourceClass": "SingleNode"}, + num_workers=0, node_type_id="Standard_DS3_v2", ) + cluster1 = jobs.JobCluster(job_cluster_key="cluster1", + new_cluster=cluster_spec) + cluster2 = jobs.JobCluster(job_cluster_key="cluster2", + new_cluster=cluster_spec) + cluster3 = jobs.JobCluster(job_cluster_key="cluster3", + new_cluster=cluster_spec) + cluster4 = jobs.JobCluster(job_cluster_key="cluster4", + new_cluster=cluster_spec) run1 = { "tasks": [{ "run_id": 0 }, { "run_id": 1 }], + "job_clusters": [ + cluster1.as_dict(), + cluster2.as_dict(), + ], + "job_parameters": [{ + "name": "param1", + "value": "value1" + }], "next_page_token": "tokenToSecondPage", - "prev_page_token": "tokenToPreviousPage" } run2 = { "tasks": [{ @@ -37,10 +55,21 @@ def test_get_run_pagination_with_tasks(config, requests_mock): }, { "run_id": 3 }], + "job_clusters": [ + cluster3.as_dict(), + cluster4.as_dict(), + ], + "job_parameters": [{ + "name": "param2", + "value": "value2" + }], "next_page_token": "tokenToThirdPage", - "prev_page_token": "initialToken" } - run3 = {"tasks": [{"run_id": 4}], "next_page_token": None, "prev_page_token": "tokenToSecondPage"} + run3 = { + "tasks": [{ + "run_id": 4 + }] + } requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3)) @@ -60,6 +89,19 @@ def test_get_run_pagination_with_tasks(config, requests_mock): }, { 'run_id': 4 }], + "job_clusters": [ + cluster1.as_dict(), + cluster2.as_dict(), + cluster3.as_dict(), + cluster4.as_dict() + ], + "job_parameters": [{ + "name": "param1", + "value": "value1" + }, { + "name": "param2", + "value": "value2" + }], } @@ -74,7 +116,6 @@ def test_get_run_pagination_with_iterations(config, requests_mock): "run_id": 1 }], "next_page_token": "tokenToSecondPage", - "prev_page_token": "tokenToPreviousPage" } run2 = { "tasks": [{ @@ -86,7 +127,6 @@ def test_get_run_pagination_with_iterations(config, requests_mock): "run_id": 3 }], "next_page_token": "tokenToThirdPage", - "prev_page_token": "initialToken" } run3 = { "tasks": [{ @@ -95,8 +135,6 @@ def test_get_run_pagination_with_iterations(config, requests_mock): "iterations": [{ "run_id": 4 }], - "next_page_token": None, - "prev_page_token": "tokenToSecondPage" } requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) @@ -120,4 +158,4 @@ def test_get_run_pagination_with_iterations(config, requests_mock): }, { 'run_id': 4 }], - } \ No newline at end of file + } From 8f9d512e5bcbfb27a9fb22dc3e674d18ea35675f Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Tue, 21 Jan 2025 17:19:45 +0100 Subject: [PATCH 2/4] Remove unnecessary previous page token --- databricks/sdk/mixins/jobs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py index 6fe70f335..c1b413710 100644 --- a/databricks/sdk/mixins/jobs.py +++ b/databricks/sdk/mixins/jobs.py @@ -50,5 +50,4 @@ def get_run(self, run.repair_history.extend(next_run.repair_history) run.next_page_token = next_run.next_page_token - run.prev_page_token = None return run \ No newline at end of file From 81545c091a1a37180119306081ead149e0331e79 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Tue, 21 Jan 2025 17:22:14 +0100 Subject: [PATCH 3/4] Apply fmt --- tests/test_jobs_mixin.py | 54 +++++++++++++--------------------------- 1 file changed, 17 insertions(+), 37 deletions(-) diff --git a/tests/test_jobs_mixin.py b/tests/test_jobs_mixin.py index d1d325ac4..90f1c0b89 100644 --- a/tests/test_jobs_mixin.py +++ b/tests/test_jobs_mixin.py @@ -23,26 +23,22 @@ def test_get_run_with_no_pagination(config, requests_mock): def test_get_run_pagination_with_tasks(config, requests_mock): from databricks.sdk.service import compute, jobs - cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12", custom_tags={"ResourceClass": "SingleNode"}, - num_workers=0, node_type_id="Standard_DS3_v2", ) - cluster1 = jobs.JobCluster(job_cluster_key="cluster1", - new_cluster=cluster_spec) - cluster2 = jobs.JobCluster(job_cluster_key="cluster2", - new_cluster=cluster_spec) - cluster3 = jobs.JobCluster(job_cluster_key="cluster3", - new_cluster=cluster_spec) - cluster4 = jobs.JobCluster(job_cluster_key="cluster4", - new_cluster=cluster_spec) + cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12", + custom_tags={"ResourceClass": "SingleNode"}, + num_workers=0, + node_type_id="Standard_DS3_v2", + ) + cluster1 = jobs.JobCluster(job_cluster_key="cluster1", new_cluster=cluster_spec) + cluster2 = jobs.JobCluster(job_cluster_key="cluster2", new_cluster=cluster_spec) + cluster3 = jobs.JobCluster(job_cluster_key="cluster3", new_cluster=cluster_spec) + cluster4 = jobs.JobCluster(job_cluster_key="cluster4", new_cluster=cluster_spec) run1 = { "tasks": [{ "run_id": 0 }, { "run_id": 1 }], - "job_clusters": [ - cluster1.as_dict(), - cluster2.as_dict(), - ], + "job_clusters": [cluster1.as_dict(), cluster2.as_dict(), ], "job_parameters": [{ "name": "param1", "value": "value1" @@ -55,21 +51,14 @@ def test_get_run_pagination_with_tasks(config, requests_mock): }, { "run_id": 3 }], - "job_clusters": [ - cluster3.as_dict(), - cluster4.as_dict(), - ], + "job_clusters": [cluster3.as_dict(), cluster4.as_dict(), ], "job_parameters": [{ "name": "param2", "value": "value2" }], "next_page_token": "tokenToThirdPage", } - run3 = { - "tasks": [{ - "run_id": 4 - }] - } + run3 = {"tasks": [{"run_id": 4}]} requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3)) @@ -89,12 +78,10 @@ def test_get_run_pagination_with_tasks(config, requests_mock): }, { 'run_id': 4 }], - "job_clusters": [ - cluster1.as_dict(), - cluster2.as_dict(), - cluster3.as_dict(), - cluster4.as_dict() - ], + "job_clusters": [cluster1.as_dict(), + cluster2.as_dict(), + cluster3.as_dict(), + cluster4.as_dict()], "job_parameters": [{ "name": "param1", "value": "value1" @@ -128,14 +115,7 @@ def test_get_run_pagination_with_iterations(config, requests_mock): }], "next_page_token": "tokenToThirdPage", } - run3 = { - "tasks": [{ - "run_id": 1337 - }], - "iterations": [{ - "run_id": 4 - }], - } + run3 = {"tasks": [{"run_id": 1337}], "iterations": [{"run_id": 4}], } requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1)) requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2)) requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3)) From ed32c31cd44c8d242076f10f3bfdcde7ed9e0192 Mon Sep 17 00:00:00 2001 From: Giorgi Kikolashvili Date: Tue, 4 Feb 2025 10:53:05 +0100 Subject: [PATCH 4/4] Add comments --- databricks/sdk/mixins/jobs.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/databricks/sdk/mixins/jobs.py b/databricks/sdk/mixins/jobs.py index c1b413710..c38304966 100644 --- a/databricks/sdk/mixins/jobs.py +++ b/databricks/sdk/mixins/jobs.py @@ -36,6 +36,7 @@ def get_run(self, # When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks. is_paginating_iterations = run.iterations is not None and len(run.iterations) > 0 + # runs/get response includes next_page_token as long as there are more pages to fetch. while run.next_page_token is not None: next_run = super().get_run(run_id, include_history=include_history, @@ -45,6 +46,7 @@ def get_run(self, run.iterations.extend(next_run.iterations) else: run.tasks.extend(next_run.tasks) + # Each new page of runs/get response includes the next page of the job_clusters, job_parameters, and repair history. run.job_clusters.extend(next_run.job_clusters) run.job_parameters.extend(next_run.job_parameters) run.repair_history.extend(next_run.repair_history)