Skip to content

Commit 005dd77

Browse files
committed
get_run paginates tasks and iterations
1 parent 2143e35 commit 005dd77

File tree

2 files changed

+172
-0
lines changed

2 files changed

+172
-0
lines changed

databricks/sdk/mixins/jobs.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
from typing import Optional
2+
3+
from databricks.sdk.service import jobs
4+
5+
6+
class JobsExt(jobs.JobsAPI):
7+
8+
def get_run(self,
9+
run_id: int,
10+
*,
11+
include_history: Optional[bool] = None,
12+
include_resolved_values: Optional[bool] = None,
13+
page_token: Optional[str] = None) -> jobs.Run:
14+
"""
15+
This method fetches the details of a run identified by `run_id`. If the run has multiple pages of tasks or iterations,
16+
it will paginate through all pages and aggregate the results.
17+
:param run_id: int
18+
The canonical identifier of the run for which to retrieve the metadata. This field is required.
19+
:param include_history: bool (optional)
20+
Whether to include the repair history in the response.
21+
:param include_resolved_values: bool (optional)
22+
Whether to include resolved parameter values in the response.
23+
:param page_token: str (optional)
24+
To list the next page or the previous page of job tasks, set this field to the value of the
25+
`next_page_token` or `prev_page_token` returned in the GetJob response.
26+
:returns: :class:`Run`
27+
"""
28+
run = super().get_run(run_id,
29+
include_history=include_history,
30+
include_resolved_values=include_resolved_values,
31+
page_token=page_token)
32+
33+
# When querying a Job run, a page token is returned when there are more than 100 tasks. No iterations are defined for a Job run. Therefore, the next page in the response only includes the next page of tasks.
34+
# When querying a ForEach task run, a page token is returned when there are more than 100 iterations. Only a single task is returned, corresponding to the ForEach task itself. Therefore, the client only reads the iterations from the next page and not the tasks.
35+
is_paginating_iterations = run.iterations is not None and len(run.iterations) > 0
36+
37+
while run.next_page_token is not None:
38+
next_run = super().get_run(run_id,
39+
include_history=include_history,
40+
include_resolved_values=include_resolved_values,
41+
page_token=run.next_page_token)
42+
if is_paginating_iterations:
43+
run.iterations.extend(next_run.iterations)
44+
else:
45+
run.tasks.extend(next_run.tasks)
46+
run.next_page_token = next_run.next_page_token
47+
48+
run.prev_page_token = None
49+
return run

tests/test_jobs_mixin.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
import json
2+
import re
3+
from typing import Pattern
4+
5+
from databricks.sdk import WorkspaceClient
6+
7+
8+
def make_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
9+
return re.compile(
10+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
11+
)
12+
13+
14+
def test_get_run_with_no_pagination(config, requests_mock):
15+
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
16+
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
17+
w = WorkspaceClient(config=config)
18+
19+
run = w.jobs.get_run(1337, page_token="initialToken")
20+
21+
assert run.as_dict() == {"tasks": [{'run_id': 0}, {'run_id': 1}], }
22+
23+
24+
def test_get_run_pagination_with_tasks(config, requests_mock):
25+
run1 = {
26+
"tasks": [{
27+
"run_id": 0
28+
}, {
29+
"run_id": 1
30+
}],
31+
"next_page_token": "tokenToSecondPage",
32+
"prev_page_token": "tokenToPreviousPage"
33+
}
34+
run2 = {
35+
"tasks": [{
36+
"run_id": 2
37+
}, {
38+
"run_id": 3
39+
}],
40+
"next_page_token": "tokenToThirdPage",
41+
"prev_page_token": "initialToken"
42+
}
43+
run3 = {"tasks": [{"run_id": 4}], "next_page_token": None, "prev_page_token": "tokenToSecondPage"}
44+
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
45+
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
46+
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
47+
w = WorkspaceClient(config=config)
48+
49+
run = w.jobs.get_run(1337, page_token="initialToken")
50+
51+
assert run.as_dict() == {
52+
"tasks": [{
53+
'run_id': 0
54+
}, {
55+
'run_id': 1
56+
}, {
57+
'run_id': 2
58+
}, {
59+
'run_id': 3
60+
}, {
61+
'run_id': 4
62+
}],
63+
}
64+
65+
66+
def test_get_run_pagination_with_iterations(config, requests_mock):
67+
run1 = {
68+
"tasks": [{
69+
"run_id": 1337
70+
}],
71+
"iterations": [{
72+
"run_id": 0
73+
}, {
74+
"run_id": 1
75+
}],
76+
"next_page_token": "tokenToSecondPage",
77+
"prev_page_token": "tokenToPreviousPage"
78+
}
79+
run2 = {
80+
"tasks": [{
81+
"run_id": 1337
82+
}],
83+
"iterations": [{
84+
"run_id": 2
85+
}, {
86+
"run_id": 3
87+
}],
88+
"next_page_token": "tokenToThirdPage",
89+
"prev_page_token": "initialToken"
90+
}
91+
run3 = {
92+
"tasks": [{
93+
"run_id": 1337
94+
}],
95+
"iterations": [{
96+
"run_id": 4
97+
}],
98+
"next_page_token": None,
99+
"prev_page_token": "tokenToSecondPage"
100+
}
101+
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
102+
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
103+
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
104+
w = WorkspaceClient(config=config)
105+
106+
run = w.jobs.get_run(1337, page_token="initialToken")
107+
108+
assert run.as_dict() == {
109+
"tasks": [{
110+
'run_id': 1337
111+
}],
112+
"iterations": [{
113+
'run_id': 0
114+
}, {
115+
'run_id': 1
116+
}, {
117+
'run_id': 2
118+
}, {
119+
'run_id': 3
120+
}, {
121+
'run_id': 4
122+
}],
123+
}

0 commit comments

Comments
 (0)