Skip to content

Commit 614386e

Browse files
authored
[Internal] Update Jobs GetJob API to support paginated responses (#869)
## What changes are proposed in this pull request? Introduces logic in extension for jobs GetJob call that paginates tasks and other arrays in the response. This change is necessary for SDK and API 2.2 compatibility. API 2.2 serves paginated responses as long as next_page_token field is present in the response. The pagination logic is not exposed to the customer. ## How is this tested? I enabled API 2.2 calls by modifying URL string /api/2.2/jobs/runs/get in databricks/sdk/service/jobs.py. Then I ran unit test from tests/test_jobs_mixin.py
1 parent 1a1719a commit 614386e

File tree

2 files changed

+160
-9
lines changed

2 files changed

+160
-9
lines changed

databricks/sdk/mixins/jobs.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from typing import Optional
22

33
from databricks.sdk.service import jobs
4+
from databricks.sdk.service.jobs import Job
45

56

67
class JobsExt(jobs.JobsAPI):
@@ -52,4 +53,32 @@ def get_run(self,
5253
run.repair_history.extend(next_run.repair_history)
5354
run.next_page_token = next_run.next_page_token
5455

55-
return run
56+
return run
57+
58+
def get(self, job_id: int, *, page_token: Optional[str] = None) -> Job:
59+
"""Get a single job.
60+
61+
Retrieves the details for a single job. If the job has multiple pages of tasks, job_clusters, parameters or environments,
62+
it will paginate through all pages and aggregate the results.
63+
64+
:param job_id: int
65+
The canonical identifier of the job to retrieve information about. This field is required.
66+
:param page_token: str (optional)
67+
Use `next_page_token` returned from the previous GetJob to request the next page of the job's
68+
sub-resources.
69+
70+
:returns: :class:`Job`
71+
"""
72+
job = super().get(job_id, page_token=page_token)
73+
74+
# jobs/get response includes next_page_token as long as there are more pages to fetch.
75+
while job.next_page_token is not None:
76+
next_job = super().get(job_id, page_token=job.next_page_token)
77+
# Each new page of jobs/get response includes the next page of the tasks, job_clusters, job_parameters, and environments.
78+
job.settings.tasks.extend(next_job.settings.tasks)
79+
job.settings.job_clusters.extend(next_job.settings.job_clusters)
80+
job.settings.parameters.extend(next_job.settings.parameters)
81+
job.settings.environments.extend(next_job.settings.environments)
82+
job.next_page_token = next_job.next_page_token
83+
84+
return job

tests/test_jobs_mixin.py

Lines changed: 130 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,21 @@
55
from databricks.sdk import WorkspaceClient
66

77

8-
def make_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
8+
def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
99
return re.compile(
1010
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
1111
)
1212

1313

14+
def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
15+
return re.compile(
16+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}&page_token={page_token}")}'
17+
)
18+
19+
1420
def test_get_run_with_no_pagination(config, requests_mock):
1521
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
16-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
22+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
1723
w = WorkspaceClient(config=config)
1824

1925
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -59,9 +65,9 @@ def test_get_run_pagination_with_tasks(config, requests_mock):
5965
"next_page_token": "tokenToThirdPage",
6066
}
6167
run3 = {"tasks": [{"run_id": 4}]}
62-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
63-
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
64-
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
68+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
69+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
70+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
6571
w = WorkspaceClient(config=config)
6672

6773
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -116,9 +122,9 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
116122
"next_page_token": "tokenToThirdPage",
117123
}
118124
run3 = {"tasks": [{"run_id": 1337}], "iterations": [{"run_id": 4}], }
119-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
120-
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
121-
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
125+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
126+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
127+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
122128
w = WorkspaceClient(config=config)
123129

124130
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -139,3 +145,119 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
139145
'run_id': 4
140146
}],
141147
}
148+
149+
150+
def test_get_job_with_no_pagination(config, requests_mock):
151+
job1 = {"settings": {"tasks": [{"task_key": "taskKey1"}, {"task_key": "taskKey2"}], }}
152+
requests_mock.get(make_getjob_path_pattern(1337, "initialToken"), text=json.dumps(job1))
153+
w = WorkspaceClient(config=config)
154+
155+
job = w.jobs.get(1337, page_token="initialToken")
156+
157+
assert job.as_dict() == {"settings": {"tasks": [{"task_key": "taskKey1"}, {"task_key": "taskKey2"}], }}
158+
159+
160+
def test_get_job_pagination_with_tasks(config, requests_mock):
161+
from databricks.sdk.service import compute, jobs
162+
cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12",
163+
custom_tags={"ResourceClass": "SingleNode"},
164+
num_workers=0,
165+
node_type_id="Standard_DS3_v2",
166+
)
167+
cluster1 = jobs.JobCluster(job_cluster_key="cluster1", new_cluster=cluster_spec)
168+
cluster2 = jobs.JobCluster(job_cluster_key="cluster2", new_cluster=cluster_spec)
169+
cluster3 = jobs.JobCluster(job_cluster_key="cluster3", new_cluster=cluster_spec)
170+
cluster4 = jobs.JobCluster(job_cluster_key="cluster4", new_cluster=cluster_spec)
171+
job1 = {
172+
"settings": {
173+
"tasks": [{
174+
"task_key": "taskKey1"
175+
}, {
176+
"task_key": "taskKey2"
177+
}],
178+
"job_clusters": [cluster1.as_dict(), cluster2.as_dict()],
179+
"parameters": [{
180+
"name": "param1",
181+
"default": "default1"
182+
}],
183+
"environments": [{
184+
"environment_key": "key1"
185+
}, {
186+
"environment_key": "key2"
187+
}]
188+
},
189+
"next_page_token": "tokenToSecondPage"
190+
}
191+
job2 = {
192+
"settings": {
193+
"tasks": [{
194+
"task_key": "taskKey3"
195+
}, {
196+
"task_key": "taskKey4"
197+
}],
198+
"job_clusters": [cluster3.as_dict(), cluster4.as_dict()],
199+
"parameters": [{
200+
"name": "param2",
201+
"default": "default2"
202+
}],
203+
"environments": [{
204+
"environment_key": "key3"
205+
}]
206+
},
207+
"next_page_token": "tokenToThirdPage"
208+
}
209+
job3 = {
210+
"settings": {
211+
"tasks": [{
212+
"task_key": "taskKey5"
213+
}],
214+
"parameters": [{
215+
"name": "param3",
216+
"default": "default3"
217+
}]
218+
},
219+
}
220+
221+
requests_mock.get(make_getjob_path_pattern(1337, "initialToken"), text=json.dumps(job1))
222+
requests_mock.get(make_getjob_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(job2))
223+
requests_mock.get(make_getjob_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(job3))
224+
w = WorkspaceClient(config=config)
225+
226+
job = w.jobs.get(1337, page_token="initialToken")
227+
228+
assert job.as_dict() == {
229+
"settings": {
230+
"tasks": [{
231+
"task_key": "taskKey1"
232+
}, {
233+
"task_key": "taskKey2"
234+
}, {
235+
"task_key": "taskKey3"
236+
}, {
237+
"task_key": "taskKey4"
238+
}, {
239+
"task_key": "taskKey5"
240+
}],
241+
"job_clusters": [cluster1.as_dict(),
242+
cluster2.as_dict(),
243+
cluster3.as_dict(),
244+
cluster4.as_dict()],
245+
"parameters": [{
246+
"name": "param1",
247+
"default": "default1"
248+
}, {
249+
"name": "param2",
250+
"default": "default2"
251+
}, {
252+
"name": "param3",
253+
"default": "default3"
254+
}],
255+
"environments": [{
256+
"environment_key": "key1"
257+
}, {
258+
"environment_key": "key2"
259+
}, {
260+
"environment_key": "key3"
261+
}]
262+
}
263+
}

0 commit comments

Comments
 (0)