Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@
### Documentation

### Internal Changes
* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
* Update Jobs ListRuns API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
* Update Jobs ListJobs API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410))
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,55 @@ public BaseJob next() {
};
}

/**
* List job runs.
*
* <p>Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or
* repair_history, it will paginate through all pages and aggregate the results.
*/
public Iterable<BaseRun> listRuns(ListRunsRequest request) {
// fetch runs with limited elements in top level arrays
Iterable<BaseRun> runsList = super.listRuns(request);

if (!request.getExpandTasks()) {
return runsList;
}

Iterator<BaseRun> iterator = runsList.iterator();
return () ->
new Iterator<BaseRun>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public BaseRun next() {
BaseRun run = iterator.next();

// The has_more field is only present in run with 100+ tasks, that is served from Jobs
// API 2.2.
// Extra tasks and other fields need to be fetched only when has_more is true.
if (run.getHasMore() != null && run.getHasMore()) {
// fully fetch all top level arrays for the run
GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId());
Run fullRun = getRun(getRunRequest);
run.setTasks(fullRun.getTasks());
run.setJobClusters(fullRun.getJobClusters());
run.setJobParameters(fullRun.getJobParameters());
run.setRepairHistory(fullRun.getRepairHistory());
}
// Set the has_more fields to null.
// This field in Jobs API 2.2 is useful for pagination. It indicates if there are more
// than 100 tasks or job_clusters in the run.
// This function hides pagination details from the user. So the field does not play
// useful role here.
run.setHasMore(null);
return run;
}
};
}

/**
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
* response contract.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,4 +359,193 @@ private void addTasks(BaseJob job, String... taskKeys) {
}
job.getSettings().setTasks(tasks);
}

@Test
public void testListRunsWithoutExpandTasks() {
JobsService service = Mockito.mock(JobsService.class);
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L);
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L);
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L);

List<BaseRun> runsOnFirstPage = new ArrayList<>();
runsOnFirstPage.add(run1);
runsOnFirstPage.add(run2);
List<BaseRun> runsOnSecondPage = new ArrayList<>();
runsOnSecondPage.add(run3);
runsOnSecondPage.add(run4);
when(service.listRuns(any()))
.thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage))
.thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage));
JobsExt jobsExt = new JobsExt(service);

ListRunsRequest request = new ListRunsRequest().setExpandTasks(false);
Iterable<BaseRun> runsList = jobsExt.listRuns(request);

List<BaseRun> expectedRunsList = new ArrayList<>();
expectedRunsList.add(run1);
expectedRunsList.add(run2);
expectedRunsList.add(run3);
expectedRunsList.add(run4);
for (BaseRun run : runsList) {
BaseRun expectedRun =
expectedRunsList.stream()
.filter(e -> e.getRunId().equals(run.getRunId()))
.findFirst()
.orElse(null);
assertEquals(expectedRun, run);
}
verify(service, times(0)).getRun(any());
}

@Test
public void testListRuns() {
JobsService service = Mockito.mock(JobsService.class);
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true);
addTasks(run1, 101L, 102L);
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true);
addTasks(run2, 201L, 202L);
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
addTasks(run3, 301L);
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true);
addTasks(run4, 401L, 402L);

List<BaseRun> runsOnFirstPage = new ArrayList<>();
runsOnFirstPage.add(run1);
runsOnFirstPage.add(run2);
runsOnFirstPage.add(run3);
ListRunsResponse listRunsResponse1 =
new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token");
List<BaseRun> runsOnSecondPage = new ArrayList<>();
runsOnSecondPage.add(run4);
ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage);

when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2);

// runs/get?run_id=100
Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token");
addTasks(getRun1_page1, 101L, 102L);
Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token");
addTasks(getRun1_page2, 103L, 104L);
Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L);
addTasks(getRun1_page3, 105L);

// runs/get?run_id=200
Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token");
addTasks(getRun2_page1, 201L, 202L);
Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L);
addTasks(getRun2_page2, 203L);

// runs/get?run_id=300
Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L);
addTasks(getRun3_page1, 301L);

// runs/get?run_id=400
Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token");
addTasks(getRun4_page1, 401L, 402L);
Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L);
addTasks(getRun4_page2, 403L, 404L);

doReturn(getRun1_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& request.getPageToken() == null));
doReturn(getRun1_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& "run1_page2token".equals(request.getPageToken())));
doReturn(getRun1_page3)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 100L
&& "run1_page3token".equals(request.getPageToken())));
doReturn(getRun2_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 200L
&& request.getPageToken() == null));
doReturn(getRun2_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 200L
&& "run2_page2token".equals(request.getPageToken())));
doReturn(getRun3_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 300L
&& request.getPageToken() == null));
doReturn(getRun4_page1)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 400L
&& request.getPageToken() == null));
doReturn(getRun4_page2)
.when(service)
.getRun(
argThat(
request ->
request != null
&& request.getRunId() == 400L
&& "run4_page2token".equals(request.getPageToken())));

JobsExt jobsExt = new JobsExt(service);
ListRunsRequest request = new ListRunsRequest().setExpandTasks(true);
Iterable<BaseRun> runsList = jobsExt.listRuns(request);

BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L);
addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L);
BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L);
addTasks(expectedRun2, 201L, 202L, 203L);
BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L);
addTasks(expectedRun3, 301L);
BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L);
addTasks(expectedRun4, 401L, 402L, 403L, 404L);
List<BaseRun> expectedRunsList = new ArrayList<>();
expectedRunsList.add(expectedRun1);
expectedRunsList.add(expectedRun2);
expectedRunsList.add(expectedRun3);
expectedRunsList.add(expectedRun4);
for (BaseRun run : runsList) {
BaseRun expectedRun =
expectedRunsList.stream()
.filter(e -> e.getRunId().equals(run.getRunId()))
.findFirst()
.orElse(null);
assertEquals(expectedRun, run);
}
// 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun
// calls for run 400
verify(service, times(7)).getRun(any());
}

private void addTasks(BaseRun run, long... taskRunIds) {
Collection<RunTask> tasks = new ArrayList<>();
for (long runId : taskRunIds) {
tasks.add(new RunTask().setRunId(runId));
}
run.setTasks(tasks);
}
}
Loading