Skip to content

Commit 8af2190

Browse files
committed
Add logic and tests for ListRuns
1 parent 998a117 commit 8af2190

File tree

2 files changed

+283
-7
lines changed

2 files changed

+283
-7
lines changed

databricks/sdk/mixins/jobs.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,90 @@
1-
from typing import Optional
1+
from typing import Iterator, Optional
22

33
from databricks.sdk.service import jobs
4-
from databricks.sdk.service.jobs import Job
4+
from databricks.sdk.service.jobs import BaseRun, Job, RunType
55

66

77
class JobsExt(jobs.JobsAPI):
88

9+
def list_runs(self,
10+
*,
11+
active_only: Optional[bool] = None,
12+
completed_only: Optional[bool] = None,
13+
expand_tasks: Optional[bool] = None,
14+
job_id: Optional[int] = None,
15+
limit: Optional[int] = None,
16+
offset: Optional[int] = None,
17+
page_token: Optional[str] = None,
18+
run_type: Optional[RunType] = None,
19+
start_time_from: Optional[int] = None,
20+
start_time_to: Optional[int] = None) -> Iterator[BaseRun]:
21+
"""List job runs.
22+
23+
List runs in descending order by start time. If the job has multiple pages of tasks, job_clusters, parameters or repair history,
24+
it will paginate through all pages and aggregate the results.
25+
26+
:param active_only: bool (optional)
27+
If active_only is `true`, only active runs are included in the results; otherwise, lists both active
28+
and completed runs. An active run is a run in the `QUEUED`, `PENDING`, `RUNNING`, or `TERMINATING`.
29+
This field cannot be `true` when completed_only is `true`.
30+
:param completed_only: bool (optional)
31+
If completed_only is `true`, only completed runs are included in the results; otherwise, lists both
32+
active and completed runs. This field cannot be `true` when active_only is `true`.
33+
:param expand_tasks: bool (optional)
34+
Whether to include task and cluster details in the response. Note that in API 2.2, only the first
35+
100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters.
36+
:param job_id: int (optional)
37+
The job for which to list runs. If omitted, the Jobs service lists runs from all jobs.
38+
:param limit: int (optional)
39+
The number of runs to return. This value must be greater than 0 and less than 25. The default value
40+
is 20. If a request specifies a limit of 0, the service instead uses the maximum limit.
41+
:param offset: int (optional)
42+
The offset of the first run to return, relative to the most recent run. Deprecated since June 2023.
43+
Use `page_token` to iterate through the pages instead.
44+
:param page_token: str (optional)
45+
Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or
46+
previous page of runs respectively.
47+
:param run_type: :class:`RunType` (optional)
48+
The type of runs to return. For a description of run types, see :method:jobs/getRun.
49+
:param start_time_from: int (optional)
50+
Show runs that started _at or after_ this value. The value must be a UTC timestamp in milliseconds.
51+
Can be combined with _start_time_to_ to filter by a time range.
52+
:param start_time_to: int (optional)
53+
Show runs that started _at or before_ this value. The value must be a UTC timestamp in milliseconds.
54+
Can be combined with _start_time_from_ to filter by a time range.
55+
56+
:returns: Iterator over :class:`BaseRun`
57+
"""
58+
# fetch runs with limited elements in top level arrays
59+
runs_list = super().list_runs(active_only=active_only,
60+
completed_only=completed_only,
61+
expand_tasks=expand_tasks,
62+
job_id=job_id,
63+
limit=limit,
64+
offset=offset,
65+
page_token=page_token,
66+
run_type=run_type,
67+
start_time_from=start_time_from,
68+
start_time_to=start_time_to)
69+
70+
if not expand_tasks:
71+
yield from runs_list
72+
73+
# fully fetch all top level arrays for each run in the list
74+
for run in runs_list:
75+
if run.has_more:
76+
run_from_get_call = self.get_run(run.run_id)
77+
run.tasks = run_from_get_call.tasks
78+
run.job_clusters = run_from_get_call.job_clusters
79+
run.job_parameters = run_from_get_call.job_parameters
80+
run.repair_history = run_from_get_call.repair_history
81+
# Remove has_more fields for each run in the list.
82+
# This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run.
83+
# This function hides pagination details from the user. So the field does not play useful role here.
84+
if hasattr(run, 'has_more'):
85+
delattr(run, 'has_more')
86+
yield run
87+
988
def get_run(self,
1089
run_id: int,
1190
*,

tests/test_jobs_mixin.py

Lines changed: 202 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,18 @@
11
import json
22
import re
3-
from typing import Pattern
3+
from typing import Optional, Pattern
44

55
from databricks.sdk import WorkspaceClient
66

77

8-
def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
9-
return re.compile(
10-
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
11-
)
8+
def make_getrun_path_pattern(run_id: int, page_token: Optional[str] = None) -> Pattern[str]:
9+
if page_token:
10+
return re.compile(
11+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
12+
)
13+
else:
14+
return re.compile(
15+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?run_id={run_id}")}')
1216

1317

1418
def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
@@ -17,6 +21,12 @@ def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
1721
)
1822

1923

24+
def make_listruns_path_pattern(page_token: str) -> Pattern[str]:
25+
return re.compile(
26+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}'
27+
)
28+
29+
2030
def test_get_run_with_no_pagination(config, requests_mock):
2131
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
2232
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
@@ -261,3 +271,190 @@ def test_get_job_pagination_with_tasks(config, requests_mock):
261271
}]
262272
}
263273
}
274+
275+
276+
def test_list_runs_without_task_expansion(config, requests_mock):
277+
listruns_page1 = {
278+
"runs": [{
279+
"run_id": 100,
280+
"run_name": "run100",
281+
}, {
282+
"run_id":
283+
200,
284+
"run_name":
285+
"run200",
286+
"job_parameters": [{
287+
"name": "param1",
288+
"default": "default1"
289+
}, {
290+
"name": "param2",
291+
"default": "default2"
292+
}]
293+
}, {
294+
"run_id": 300,
295+
"run_name": "run300",
296+
}],
297+
"next_page_token":
298+
"tokenToSecondPage"
299+
}
300+
listruns_page2 = {
301+
"runs": [{
302+
"run_id": 400,
303+
"run_name": "run400",
304+
"repair_history": [{
305+
"id": "repair400_1",
306+
}, {
307+
"id": "repair400_2",
308+
}]
309+
}]
310+
}
311+
312+
requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
313+
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
314+
w = WorkspaceClient(config=config)
315+
316+
runs_list = list(w.jobs.list_runs(expand_tasks=False, page_token="initialToken"))
317+
runs_dict = [run.as_dict() for run in runs_list]
318+
319+
assert runs_dict == [{
320+
"run_id": 100,
321+
"run_name": "run100",
322+
}, {
323+
"run_id":
324+
200,
325+
"run_name":
326+
"run200",
327+
"job_parameters": [{
328+
"name": "param1",
329+
"default": "default1"
330+
}, {
331+
"name": "param2",
332+
"default": "default2"
333+
}]
334+
}, {
335+
"run_id": 300,
336+
"run_name": "run300",
337+
}, {
338+
"run_id": 400,
339+
"run_name": "run400",
340+
"repair_history": [{
341+
"id": "repair400_1",
342+
}, {
343+
"id": "repair400_2",
344+
}]
345+
}]
346+
347+
# only two requests should be made which are jobs/list requests
348+
assert requests_mock.call_count == 2
349+
350+
351+
def test_list_runs(config, requests_mock):
352+
listruns_page1 = {
353+
"runs": [{
354+
"run_id": 100,
355+
"tasks": [{
356+
"task_key": "taskkey101"
357+
}, {
358+
"task_key": "taskkey102"
359+
}],
360+
"has_more": True
361+
}, {
362+
"run_id": 200,
363+
"tasks": [{
364+
"task_key": "taskkey201"
365+
}]
366+
}, {
367+
"run_id": 300,
368+
"tasks": [{
369+
"task_key": "taskkey301"
370+
}]
371+
}],
372+
"next_page_token":
373+
"tokenToSecondPage"
374+
}
375+
listruns_page2 = {
376+
"runs": [{
377+
"run_id": 400,
378+
"tasks": [{
379+
"task_key": "taskkey401"
380+
}, {
381+
"task_key": "taskkey402"
382+
}],
383+
"has_more": True
384+
}]
385+
}
386+
387+
getrun_100_page1 = {
388+
"run_id": 100,
389+
"tasks": [{
390+
"task_key": "taskkey101"
391+
}, {
392+
"task_key": "taskkey102"
393+
}],
394+
"next_page_token": "tokenToSecondPage_100"
395+
}
396+
getrun_100_page2 = {"run_id": 100, "tasks": [{"task_key": "taskkey103"}]}
397+
getrun_400_page1 = {
398+
"run_id": 400,
399+
"tasks": [{
400+
"task_key": "taskkey401"
401+
}, {
402+
"task_key": "taskkey403"
403+
}],
404+
"next_page_token": "tokenToSecondPage_400"
405+
}
406+
getrun_400_page2 = {"run_id": 400, "tasks": [{"task_key": "taskkey402"}, {"task_key": "taskkey404"}]}
407+
408+
requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
409+
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
410+
411+
requests_mock.get(make_getrun_path_pattern(100), text=json.dumps(getrun_100_page1))
412+
requests_mock.get(make_getrun_path_pattern(100, "tokenToSecondPage_100"),
413+
text=json.dumps(getrun_100_page2))
414+
415+
requests_mock.get(make_getrun_path_pattern(400), text=json.dumps(getrun_400_page1))
416+
requests_mock.get(make_getrun_path_pattern(400, "tokenToSecondPage_400"),
417+
text=json.dumps(getrun_400_page2))
418+
w = WorkspaceClient(config=config)
419+
420+
runs_list = list(w.jobs.list_runs(expand_tasks=True, page_token="initialToken"))
421+
runs_dict = [run.as_dict() for run in runs_list]
422+
423+
assert runs_dict == [{
424+
"run_id":
425+
100,
426+
"tasks": [{
427+
"task_key": "taskkey101",
428+
}, {
429+
"task_key": "taskkey102",
430+
}, {
431+
"task_key": "taskkey103",
432+
}],
433+
}, {
434+
"run_id": 200,
435+
"tasks": [{
436+
"task_key": "taskkey201",
437+
}],
438+
}, {
439+
"run_id": 300,
440+
"tasks": [{
441+
"task_key": "taskkey301",
442+
}],
443+
}, {
444+
"run_id":
445+
400,
446+
"tasks": [{
447+
"task_key": "taskkey401",
448+
}, {
449+
"task_key": "taskkey403",
450+
}, {
451+
"task_key": "taskkey402",
452+
}, {
453+
"task_key": "taskkey404",
454+
}],
455+
}]
456+
457+
# check that job_id 200 and 300 was never used in runs/get call
458+
history = requests_mock.request_history
459+
assert all('300' not in request.qs.get("run_id", ['']) for request in history)
460+
assert all('200' not in request.qs.get("run_id", ['']) for request in history)

0 commit comments

Comments
 (0)