Skip to content

Commit c77a546

Browse files
committed
Add pagination to GetJob call
1 parent 267d369 commit c77a546

File tree

2 files changed

+158
-10
lines changed

2 files changed

+158
-10
lines changed

databricks/sdk/mixins/jobs.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,30 @@ def get_run(self,
4646
run.next_page_token = next_run.next_page_token
4747

4848
run.prev_page_token = None
49-
return run
49+
return run
50+
51+
def get(self, job_id: int, *, page_token: Optional[str] = None) -> Job:
52+
"""Get a single job.
53+
54+
Retrieves the details for a single job. If the job has multiple pages of tasks, job_clusters, parameters or environments,
55+
it will paginate through all pages and aggregate the results.
56+
57+
:param job_id: int
58+
The canonical identifier of the job to retrieve information about. This field is required.
59+
:param page_token: str (optional)
60+
Use `next_page_token` returned from the previous GetJob to request the next page of the job's
61+
sub-resources.
62+
63+
:returns: :class:`Job`
64+
"""
65+
job = super().get(job_id, page_token=page_token)
66+
67+
while job.next_page_token is not None:
68+
next_job = super().get(job_id, page_token=job.next_page_token)
69+
job.settings.tasks.extend(next_job.settings.tasks)
70+
job.settings.job_clusters.extend(next_job.settings.job_clusters)
71+
job.settings.parameters.extend(next_job.settings.parameters)
72+
job.settings.environments.extend(next_job.settings.environments)
73+
job.next_page_token = next_job.next_page_token
74+
75+
return job

tests/test_jobs_mixin.py

Lines changed: 131 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,21 @@
55
from databricks.sdk import WorkspaceClient
66

77

8-
def make_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
8+
def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
99
return re.compile(
1010
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
1111
)
1212

1313

14+
def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
15+
return re.compile(
16+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/get?job_id={job_id}&page_token={page_token}")}'
17+
)
18+
19+
1420
def test_get_run_with_no_pagination(config, requests_mock):
1521
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
16-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
22+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
1723
w = WorkspaceClient(config=config)
1824

1925
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -41,9 +47,9 @@ def test_get_run_pagination_with_tasks(config, requests_mock):
4147
"prev_page_token": "initialToken"
4248
}
4349
run3 = {"tasks": [{"run_id": 4}], "next_page_token": None, "prev_page_token": "tokenToSecondPage"}
44-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
45-
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
46-
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
50+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
51+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
52+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
4753
w = WorkspaceClient(config=config)
4854

4955
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -98,9 +104,9 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
98104
"next_page_token": None,
99105
"prev_page_token": "tokenToSecondPage"
100106
}
101-
requests_mock.get(make_path_pattern(1337, "initialToken"), text=json.dumps(run1))
102-
requests_mock.get(make_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
103-
requests_mock.get(make_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
107+
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
108+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(run2))
109+
requests_mock.get(make_getrun_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(run3))
104110
w = WorkspaceClient(config=config)
105111

106112
run = w.jobs.get_run(1337, page_token="initialToken")
@@ -120,4 +126,120 @@ def test_get_run_pagination_with_iterations(config, requests_mock):
120126
}, {
121127
'run_id': 4
122128
}],
123-
}
129+
}
130+
131+
132+
def test_get_job_with_no_pagination(config, requests_mock):
133+
job1 = {"settings": {"tasks": [{"task_key": "taskKey1"}, {"task_key": "taskKey2"}], }}
134+
requests_mock.get(make_getjob_path_pattern(1337, "initialToken"), text=json.dumps(job1))
135+
w = WorkspaceClient(config=config)
136+
137+
job = w.jobs.get(1337, page_token="initialToken")
138+
139+
assert job.as_dict() == {"settings": {"tasks": [{"task_key": "taskKey1"}, {"task_key": "taskKey2"}], }}
140+
141+
142+
def test_get_job_pagination_with_tasks(config, requests_mock):
143+
from databricks.sdk.service import compute, jobs
144+
cluster_spec = compute.ClusterSpec(spark_version="11.3.x-scala2.12",
145+
custom_tags={"ResourceClass": "SingleNode"},
146+
num_workers=0,
147+
node_type_id="Standard_DS3_v2",
148+
)
149+
cluster1 = jobs.JobCluster(job_cluster_key="cluster1", new_cluster=cluster_spec)
150+
cluster2 = jobs.JobCluster(job_cluster_key="cluster2", new_cluster=cluster_spec)
151+
cluster3 = jobs.JobCluster(job_cluster_key="cluster3", new_cluster=cluster_spec)
152+
cluster4 = jobs.JobCluster(job_cluster_key="cluster4", new_cluster=cluster_spec)
153+
job1 = {
154+
"settings": {
155+
"tasks": [{
156+
"task_key": "taskKey1"
157+
}, {
158+
"task_key": "taskKey2"
159+
}],
160+
"job_clusters": [cluster1.as_dict(), cluster2.as_dict()],
161+
"parameters": [{
162+
"name": "param1",
163+
"default": "default1"
164+
}],
165+
"environments": [{
166+
"environment_key": "key1"
167+
}, {
168+
"environment_key": "key2"
169+
}]
170+
},
171+
"next_page_token": "tokenToSecondPage"
172+
}
173+
job2 = {
174+
"settings": {
175+
"tasks": [{
176+
"task_key": "taskKey3"
177+
}, {
178+
"task_key": "taskKey4"
179+
}],
180+
"job_clusters": [cluster3.as_dict(), cluster4.as_dict()],
181+
"parameters": [{
182+
"name": "param2",
183+
"default": "default2"
184+
}],
185+
"environments": [{
186+
"environment_key": "key3"
187+
}]
188+
},
189+
"next_page_token": "tokenToThirdPage"
190+
}
191+
job3 = {
192+
"settings": {
193+
"tasks": [{
194+
"task_key": "taskKey5"
195+
}],
196+
"parameters": [{
197+
"name": "param3",
198+
"default": "default3"
199+
}]
200+
},
201+
}
202+
203+
requests_mock.get(make_getjob_path_pattern(1337, "initialToken"), text=json.dumps(job1))
204+
requests_mock.get(make_getjob_path_pattern(1337, "tokenToSecondPage"), text=json.dumps(job2))
205+
requests_mock.get(make_getjob_path_pattern(1337, "tokenToThirdPage"), text=json.dumps(job3))
206+
w = WorkspaceClient(config=config)
207+
208+
job = w.jobs.get(1337, page_token="initialToken")
209+
210+
assert job.as_dict() == {
211+
"settings": {
212+
"tasks": [{
213+
"task_key": "taskKey1"
214+
}, {
215+
"task_key": "taskKey2"
216+
}, {
217+
"task_key": "taskKey3"
218+
}, {
219+
"task_key": "taskKey4"
220+
}, {
221+
"task_key": "taskKey5"
222+
}],
223+
"job_clusters": [cluster1.as_dict(),
224+
cluster2.as_dict(),
225+
cluster3.as_dict(),
226+
cluster4.as_dict()],
227+
"parameters": [{
228+
"name": "param1",
229+
"default": "default1"
230+
}, {
231+
"name": "param2",
232+
"default": "default2"
233+
}, {
234+
"name": "param3",
235+
"default": "default3"
236+
}],
237+
"environments": [{
238+
"environment_key": "key1"
239+
}, {
240+
"environment_key": "key2"
241+
}, {
242+
"environment_key": "key3"
243+
}]
244+
}
245+
}

0 commit comments

Comments
 (0)