Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions databricks/sdk/mixins/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,20 @@ def get_run(self,
include_history: Optional[bool] = None,
include_resolved_values: Optional[bool] = None,
page_token: Optional[str] = None) -> jobs.Run:
"""
This method fetches the details of a run identified by `run_id`. If the run has multiple pages of tasks or iterations,
it will paginate through all pages and aggregate the results.
"""Get a single job run.

Retrieve the metadata of a run. If a run has multiple pages of tasks, it will paginate through all pages of tasks, iterations, job_clusters, job_parameters, and repair history.

:param run_id: int
The canonical identifier of the run for which to retrieve the metadata. This field is required.
:param include_history: bool (optional)
Whether to include the repair history in the response.
:param include_resolved_values: bool (optional)
Whether to include resolved parameter values in the response.
:param page_token: str (optional)
To list the next page or the previous page of job tasks, set this field to the value of the
`next_page_token` or `prev_page_token` returned in the GetJob response.
To list the next page of job tasks, set this field to the value of the `next_page_token` returned in
the GetJob response.

:returns: :class:`Run`
"""
run = super().get_run(run_id,
Expand All @@ -34,6 +36,7 @@ def get_run(self,
# When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks.
is_paginating_iterations = run.iterations is not None and len(run.iterations) > 0

# runs/get response includes next_page_token as long as there are more pages to fetch.
while run.next_page_token is not None:
next_run = super().get_run(run_id,
include_history=include_history,
Expand All @@ -43,7 +46,10 @@ def get_run(self,
run.iterations.extend(next_run.iterations)
else:
run.tasks.extend(next_run.tasks)
# Each new page of runs/get response includes the next page of the job_clusters, job_parameters, and repair history.
run.job_clusters.extend(next_run.job_clusters)
run.job_parameters.extend(next_run.job_parameters)
run.repair_history.extend(next_run.repair_history)
run.next_page_token = next_run.next_page_token

run.prev_page_token = None
return run
50 changes: 34 additions & 16 deletions tests/test_jobs_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,43 @@ def test_get_run_with_no_pagination(config, requests_mock):


def test_get_run_pagination_with_tasks(config, requests_mock):
from databricks.sdk.service import compute, jobs
cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12",
custom_tags={"ResourceClass": "SingleNode"},
num_workers=0,
node_type_id="Standard_DS3_v2",
)
cluster1 = jobs.JobCluster(job_cluster_key="cluster1", new_cluster=cluster_spec)
cluster2 = jobs.JobCluster(job_cluster_key="cluster2", new_cluster=cluster_spec)
cluster3 = jobs.JobCluster(job_cluster_key="cluster3", new_cluster=cluster_spec)
cluster4 = jobs.JobCluster(job_cluster_key="cluster4", new_cluster=cluster_spec)
run1 = {
"tasks": [{
"run_id": 0
}, {
"run_id": 1
}],
"job_clusters": [cluster1.as_dict(), cluster2.as_dict(), ],
"job_parameters": [{
"name": "param1",
"value": "value1"
}],
"next_page_token": "tokenToSecondPage",
"prev_page_token": "tokenToPreviousPage"
}
run2 = {
"tasks": [{
"run_id": 2
}, {
"run_id": 3
}],
"job_clusters": [cluster3.as_dict(), cluster4.as_dict(), ],
"job_parameters": [{
"name": "param2",
"value": "value2"
}],
"next_page_token": "tokenToThirdPage",
"prev_page_token": "initialToken"
}
run3 = {"tasks": [{"run_id": 4}], "next_page_token": None, "prev_page_token": "tokenToSecondPage"}
run3 = {"tasks": [{"run_id": 4}]}
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
Expand All @@ -60,6 +78,17 @@ def test_get_run_pagination_with_tasks(config, requests_mock):
}, {
'run_id': 4
}],
"job_clusters": [cluster1.as_dict(),
cluster2.as_dict(),
cluster3.as_dict(),
cluster4.as_dict()],
"job_parameters": [{
"name": "param1",
"value": "value1"
}, {
"name": "param2",
"value": "value2"
}],
}


Expand All @@ -74,7 +103,6 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
"run_id": 1
}],
"next_page_token": "tokenToSecondPage",
"prev_page_token": "tokenToPreviousPage"
}
run2 = {
"tasks": [{
Expand All @@ -86,18 +114,8 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
"run_id": 3
}],
"next_page_token": "tokenToThirdPage",
"prev_page_token": "initialToken"
}
run3 = {
"tasks": [{
"run_id": 1337
}],
"iterations": [{
"run_id": 4
}],
"next_page_token": None,
"prev_page_token": "tokenToSecondPage"
}
run3 = {"tasks": [{"run_id": 1337}], "iterations": [{"run_id": 4}], }
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
Expand All @@ -120,4 +138,4 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
}, {
'run_id': 4
}],
}
}
Loading