Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Documentation

### Internal Changes
* Update Jobs ListRuns API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410))
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.databricks.sdk.core.ApiClient;
import com.databricks.sdk.service.jobs.*;
import java.util.Collection;
import java.util.Iterator;

public class JobsExt extends JobsAPI {

Expand All @@ -14,6 +15,55 @@ public JobsExt(JobsService mock) {
super(mock);
}

/**
* List job runs.
*
* <p>Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or
* repair_history, it will paginate through all pages and aggregate the results.
*/
public Iterable<BaseRun> listRuns(ListRunsRequest request) {
// fetch runs with limited elements in top level arrays
Iterable<BaseRun> runsList = super.listRuns(request);

if (!request.getExpandTasks()) {
return runsList;
}

Iterator<BaseRun> iterator = runsList.iterator();
return () ->
new Iterator<BaseRun>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public BaseRun next() {
BaseRun run = iterator.next();

// The has_more field is only present in run with 100+ tasks, that is served from Jobs
// API 2.2.
// Extra tasks and other fields need to be fetched only when has_more is true.
if (run.getHasMore() != null && run.getHasMore()) {
// fully fetch all top level arrays for the run
GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId());
Run fullRun = getRun(getRunRequest);
run.setTasks(fullRun.getTasks());
run.setJobClusters(fullRun.getJobClusters());
run.setJobParameters(fullRun.getJobParameters());
run.setRepairHistory(fullRun.getRepairHistory());
}
// Set the has_more fields to null.
// This field in Jobs API 2.2 is useful for pagination. It indicates if there are more
// than 100 tasks or job_clusters in the run.
// This function hides pagination details from the user. So the field does not play
// useful role here.
run.setHasMore(null);
return run;
}
};
}

/**
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
* response contract.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,193 @@ private void addJobEnvironments(Job job, String... environmentKeys) {
}
job.getSettings().setEnvironments(environments);
}

@Test
public void testListRunsWithoutExpandTasks() {
JobsService service = Mockito.mock(JobsService.class);
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L);
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L);
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L);

List<BaseRun> runsOnFirstPage = new ArrayList<>();
runsOnFirstPage.add(run1);
runsOnFirstPage.add(run2);
List<BaseRun> runsOnSecondPage = new ArrayList<>();
runsOnSecondPage.add(run3);
runsOnSecondPage.add(run4);
when(service.listRuns(any()))
.thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage))
.thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage));
JobsExt jobsExt = new JobsExt(service);

ListRunsRequest request = new ListRunsRequest().setExpandTasks(false);
Iterable<BaseRun> runsList = jobsExt.listRuns(request);

List<BaseRun> expectedRunsList = new ArrayList<>();
expectedRunsList.add(run1);
expectedRunsList.add(run2);
expectedRunsList.add(run3);
expectedRunsList.add(run4);
for (BaseRun run : runsList) {
BaseRun expectedRun =
expectedRunsList.stream()
.filter(e -> e.getRunId().equals(run.getRunId()))
.findFirst()
.orElse(null);
assertEquals(expectedRun, run);
}
verify(service, times(0)).getRun(any());
}

@Test
public void testListRuns() {
JobsService service = Mockito.mock(JobsService.class);
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true);
addTasks(run1, 101L, 102L);
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true);
addTasks(run2, 201L, 202L);
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
addTasks(run3, 301L);
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true);
addTasks(run4, 401L, 402L);

List<BaseRun> runsOnFirstPage = new ArrayList<>();
runsOnFirstPage.add(run1);
runsOnFirstPage.add(run2);
runsOnFirstPage.add(run3);
ListRunsResponse listRunsResponse1 =
new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token");
List<BaseRun> runsOnSecondPage = new ArrayList<>();
runsOnSecondPage.add(run4);
ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage);

when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2);

// runs/get?run_id=100
Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token");
addTasks(getRun1_page1, 101L, 102L);
Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token");
addTasks(getRun1_page2, 103L, 104L);
Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L);
addTasks(getRun1_page3, 105L);

// runs/get?run_id=200
Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token");
addTasks(getRun2_page1, 201L, 202L);
Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L);
addTasks(getRun2_page2, 203L);

// runs/get?run_id=300
Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L);
addTasks(getRun3_page1, 301L);

// runs/get?run_id=400
Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token");
addTasks(getRun4_page1, 401L, 402L);
Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L);
addTasks(getRun4_page2, 403L, 404L);

doReturn(getRun1_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& request.getPageToken() == null));
doReturn(getRun1_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& "run1_page2token".equals(request.getPageToken())));
doReturn(getRun1_page3)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& "run1_page3token".equals(request.getPageToken())));
doReturn(getRun2_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 200L
&& request.getPageToken() == null));
doReturn(getRun2_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 200L
&& "run2_page2token".equals(request.getPageToken())));
doReturn(getRun3_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 300L
&& request.getPageToken() == null));
doReturn(getRun4_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 400L
&& request.getPageToken() == null));
doReturn(getRun4_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 400L
&& "run4_page2token".equals(request.getPageToken())));

JobsExt jobsExt = new JobsExt(service);
ListRunsRequest request = new ListRunsRequest().setExpandTasks(true);
Iterable<BaseRun> runsList = jobsExt.listRuns(request);

BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L);
addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L);
BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L);
addTasks(expectedRun2, 201L, 202L, 203L);
BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L);
addTasks(expectedRun3, 301L);
BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L);
addTasks(expectedRun4, 401L, 402L, 403L, 404L);
List<BaseRun> expectedRunsList = new ArrayList<>();
expectedRunsList.add(expectedRun1);
expectedRunsList.add(expectedRun2);
expectedRunsList.add(expectedRun3);
expectedRunsList.add(expectedRun4);
for (BaseRun run : runsList) {
BaseRun expectedRun =
expectedRunsList.stream()
.filter(e -> e.getRunId().equals(run.getRunId()))
.findFirst()
.orElse(null);
assertEquals(expectedRun, run);
}
// 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun
// calls for run 400
verify(service, times(7)).getRun(any());
}

private void addTasks(BaseRun run, long... taskRunIds) {
Collection<RunTask> tasks = new ArrayList<>();
for (long runId : taskRunIds) {
tasks.add(new RunTask().setRunId(runId));
}
run.setTasks(tasks);
}
}
Loading