Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 81 additions & 2 deletions databricks/sdk/mixins/jobs.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,90 @@
from typing import Optional
from typing import Iterator, Optional

from databricks.sdk.service import jobs
from databricks.sdk.service.jobs import Job
from databricks.sdk.service.jobs import BaseRun, Job, RunType


class JobsExt(jobs.JobsAPI):

def list_runs(self,
*,
active_only: Optional[bool] = None,
completed_only: Optional[bool] = None,
expand_tasks: Optional[bool] = None,
job_id: Optional[int] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
page_token: Optional[str] = None,
run_type: Optional[RunType] = None,
start_time_from: Optional[int] = None,
start_time_to: Optional[int] = None) -> Iterator[BaseRun]:
"""List job runs.
List runs in descending order by start time. If the job has multiple pages of tasks, job_clusters, parameters or repair history,
it will paginate through all pages and aggregate the results.
:param active_only: bool (optional)
If active_only is `true`, only active runs are included in the results; otherwise, lists both active
and completed runs. An active run is a run in the `QUEUED`, `PENDING`, `RUNNING`, or `TERMINATING`.
This field cannot be `true` when completed_only is `true`.
:param completed_only: bool (optional)
If completed_only is `true`, only completed runs are included in the results; otherwise, lists both
active and completed runs. This field cannot be `true` when active_only is `true`.
:param expand_tasks: bool (optional)
Whether to include task and cluster details in the response. Note that in API 2.2, only the first
100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters.
:param job_id: int (optional)
The job for which to list runs. If omitted, the Jobs service lists runs from all jobs.
:param limit: int (optional)
The number of runs to return. This value must be greater than 0 and less than 25. The default value
is 20. If a request specifies a limit of 0, the service instead uses the maximum limit.
:param offset: int (optional)
The offset of the first run to return, relative to the most recent run. Deprecated since June 2023.
Use `page_token` to iterate through the pages instead.
:param page_token: str (optional)
Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or
previous page of runs respectively.
:param run_type: :class:`RunType` (optional)
The type of runs to return. For a description of run types, see :method:jobs/getRun.
:param start_time_from: int (optional)
Show runs that started _at or after_ this value. The value must be a UTC timestamp in milliseconds.
Can be combined with _start_time_to_ to filter by a time range.
:param start_time_to: int (optional)
Show runs that started _at or before_ this value. The value must be a UTC timestamp in milliseconds.
Can be combined with _start_time_from_ to filter by a time range.
:returns: Iterator over :class:`BaseRun`
"""
# fetch runs with limited elements in top level arrays
runs_list = super().list_runs(active_only=active_only,
completed_only=completed_only,
expand_tasks=expand_tasks,
job_id=job_id,
limit=limit,
offset=offset,
page_token=page_token,
run_type=run_type,
start_time_from=start_time_from,
start_time_to=start_time_to)

if not expand_tasks:
yield from runs_list

# fully fetch all top level arrays for each run in the list
for run in runs_list:
if run.has_more:
run_from_get_call = self.get_run(run.run_id)
run.tasks = run_from_get_call.tasks
run.job_clusters = run_from_get_call.job_clusters
run.job_parameters = run_from_get_call.job_parameters
run.repair_history = run_from_get_call.repair_history
# Remove has_more fields for each run in the list.
# This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run.
# This function hides pagination details from the user. So the field does not play useful role here.
if hasattr(run, 'has_more'):
delattr(run, 'has_more')
yield run

def get_run(self,
run_id: int,
*,
Expand Down
207 changes: 202 additions & 5 deletions tests/test_jobs_mixin.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
import json
import re
from typing import Pattern
from typing import Optional, Pattern

from databricks.sdk import WorkspaceClient


def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
)
def make_getrun_path_pattern(run_id: int, page_token: Optional[str] = None) -> Pattern[str]:
if page_token:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
)
else:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?run_id={run_id}")}')


def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
Expand All @@ -17,6 +21,12 @@ def make_getjob_path_pattern(job_id: int, page_token: str) -> Pattern[str]:
)


def make_listruns_path_pattern(page_token: str) -> Pattern[str]:
return re.compile(
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}'
)


def test_get_run_with_no_pagination(config, requests_mock):
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
Expand Down Expand Up @@ -261,3 +271,190 @@ def test_get_job_pagination_with_tasks(config, requests_mock):
}]
}
}


def test_list_runs_without_task_expansion(config, requests_mock):
listruns_page1 = {
"runs": [{
"run_id": 100,
"run_name": "run100",
}, {
"run_id":
200,
"run_name":
"run200",
"job_parameters": [{
"name": "param1",
"default": "default1"
}, {
"name": "param2",
"default": "default2"
}]
}, {
"run_id": 300,
"run_name": "run300",
}],
"next_page_token":
"tokenToSecondPage"
}
listruns_page2 = {
"runs": [{
"run_id": 400,
"run_name": "run400",
"repair_history": [{
"id": "repair400_1",
}, {
"id": "repair400_2",
}]
}]
}

requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
w = WorkspaceClient(config=config)

runs_list = list(w.jobs.list_runs(expand_tasks=False, page_token="initialToken"))
runs_dict = [run.as_dict() for run in runs_list]

assert runs_dict == [{
"run_id": 100,
"run_name": "run100",
}, {
"run_id":
200,
"run_name":
"run200",
"job_parameters": [{
"name": "param1",
"default": "default1"
}, {
"name": "param2",
"default": "default2"
}]
}, {
"run_id": 300,
"run_name": "run300",
}, {
"run_id": 400,
"run_name": "run400",
"repair_history": [{
"id": "repair400_1",
}, {
"id": "repair400_2",
}]
}]

# only two requests should be made which are jobs/list requests
assert requests_mock.call_count == 2


def test_list_runs(config, requests_mock):
listruns_page1 = {
"runs": [{
"run_id": 100,
"tasks": [{
"task_key": "taskkey101"
}, {
"task_key": "taskkey102"
}],
"has_more": True
}, {
"run_id": 200,
"tasks": [{
"task_key": "taskkey201"
}]
}, {
"run_id": 300,
"tasks": [{
"task_key": "taskkey301"
}]
}],
"next_page_token":
"tokenToSecondPage"
}
listruns_page2 = {
"runs": [{
"run_id": 400,
"tasks": [{
"task_key": "taskkey401"
}, {
"task_key": "taskkey402"
}],
"has_more": True
}]
}

getrun_100_page1 = {
"run_id": 100,
"tasks": [{
"task_key": "taskkey101"
}, {
"task_key": "taskkey102"
}],
"next_page_token": "tokenToSecondPage_100"
}
getrun_100_page2 = {"run_id": 100, "tasks": [{"task_key": "taskkey103"}]}
getrun_400_page1 = {
"run_id": 400,
"tasks": [{
"task_key": "taskkey401"
}, {
"task_key": "taskkey403"
}],
"next_page_token": "tokenToSecondPage_400"
}
getrun_400_page2 = {"run_id": 400, "tasks": [{"task_key": "taskkey402"}, {"task_key": "taskkey404"}]}

requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))

requests_mock.get(make_getrun_path_pattern(100), text=json.dumps(getrun_100_page1))
requests_mock.get(make_getrun_path_pattern(100, "tokenToSecondPage_100"),
text=json.dumps(getrun_100_page2))

requests_mock.get(make_getrun_path_pattern(400), text=json.dumps(getrun_400_page1))
requests_mock.get(make_getrun_path_pattern(400, "tokenToSecondPage_400"),
text=json.dumps(getrun_400_page2))
w = WorkspaceClient(config=config)

runs_list = list(w.jobs.list_runs(expand_tasks=True, page_token="initialToken"))
runs_dict = [run.as_dict() for run in runs_list]

assert runs_dict == [{
"run_id":
100,
"tasks": [{
"task_key": "taskkey101",
}, {
"task_key": "taskkey102",
}, {
"task_key": "taskkey103",
}],
}, {
"run_id": 200,
"tasks": [{
"task_key": "taskkey201",
}],
}, {
"run_id": 300,
"tasks": [{
"task_key": "taskkey301",
}],
}, {
"run_id":
400,
"tasks": [{
"task_key": "taskkey401",
}, {
"task_key": "taskkey403",
}, {
"task_key": "taskkey402",
}, {
"task_key": "taskkey404",
}],
}]

# check that job_id 200 and 300 was never used in runs/get call
history = requests_mock.request_history
assert all('300' not in request.qs.get("run_id", ['']) for request in history)
assert all('200' not in request.qs.get("run_id", ['']) for request in history)
Loading