Skip to content

Commit a6e3afb

Browse files
authored
[Internal] Update Jobs listRuns function to support paginated responses (#411)
## What changes are proposed in this pull request? Introduces logic in the extension for jobs ListRuns call. The extended logic accounts for the new response format of API 2.2. API 2.1 format returns all tasks and job_cluster for each run in the jobs list. API 2.2 format truncates tasks and job_cluster to 100 elements. The extended ListRuns logic calls GetRun for each job in the list to populate the full list of tasks and job_clusters. The code consumes iterator from `super.listRuns` and produces iterator that contains already paginated jobs. ## How is this tested? Unit tests. I can't do manual tests because I haven't figured out how to run the code locally.
1 parent 65327b9 commit a6e3afb

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

NEXT_CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
### Documentation
1010

1111
### Internal Changes
12-
* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
12+
* Update Jobs ListRuns API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
13+
* Update Jobs ListJobs API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410))
1314
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
1415
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
1516
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).

databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,55 @@ public BaseJob next() {
6161
};
6262
}
6363

64+
/**
65+
* List job runs.
66+
*
67+
* <p>Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or
68+
* repair_history, it will paginate through all pages and aggregate the results.
69+
*/
70+
public Iterable<BaseRun> listRuns(ListRunsRequest request) {
71+
// fetch runs with limited elements in top level arrays
72+
Iterable<BaseRun> runsList = super.listRuns(request);
73+
74+
if (!request.getExpandTasks()) {
75+
return runsList;
76+
}
77+
78+
Iterator<BaseRun> iterator = runsList.iterator();
79+
return () ->
80+
new Iterator<BaseRun>() {
81+
@Override
82+
public boolean hasNext() {
83+
return iterator.hasNext();
84+
}
85+
86+
@Override
87+
public BaseRun next() {
88+
BaseRun run = iterator.next();
89+
90+
// The has_more field is only present in run with 100+ tasks, that is served from Jobs
91+
// API 2.2.
92+
// Extra tasks and other fields need to be fetched only when has_more is true.
93+
if (run.getHasMore() != null && run.getHasMore()) {
94+
// fully fetch all top level arrays for the run
95+
GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId());
96+
Run fullRun = getRun(getRunRequest);
97+
run.setTasks(fullRun.getTasks());
98+
run.setJobClusters(fullRun.getJobClusters());
99+
run.setJobParameters(fullRun.getJobParameters());
100+
run.setRepairHistory(fullRun.getRepairHistory());
101+
}
102+
// Set the has_more fields to null.
103+
// This field in Jobs API 2.2 is useful for pagination. It indicates if there are more
104+
// than 100 tasks or job_clusters in the run.
105+
// This function hides pagination details from the user. So the field does not play
106+
// useful role here.
107+
run.setHasMore(null);
108+
return run;
109+
}
110+
};
111+
}
112+
64113
/**
65114
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
66115
* response contract.

databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,193 @@ private void addTasks(BaseJob job, String... taskKeys) {
359359
}
360360
job.getSettings().setTasks(tasks);
361361
}
362+
363+
@Test
364+
public void testListRunsWithoutExpandTasks() {
365+
JobsService service = Mockito.mock(JobsService.class);
366+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L);
367+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L);
368+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
369+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L);
370+
371+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
372+
runsOnFirstPage.add(run1);
373+
runsOnFirstPage.add(run2);
374+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
375+
runsOnSecondPage.add(run3);
376+
runsOnSecondPage.add(run4);
377+
when(service.listRuns(any()))
378+
.thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage))
379+
.thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage));
380+
JobsExt jobsExt = new JobsExt(service);
381+
382+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(false);
383+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
384+
385+
List<BaseRun> expectedRunsList = new ArrayList<>();
386+
expectedRunsList.add(run1);
387+
expectedRunsList.add(run2);
388+
expectedRunsList.add(run3);
389+
expectedRunsList.add(run4);
390+
for (BaseRun run : runsList) {
391+
BaseRun expectedRun =
392+
expectedRunsList.stream()
393+
.filter(e -> e.getRunId().equals(run.getRunId()))
394+
.findFirst()
395+
.orElse(null);
396+
assertEquals(expectedRun, run);
397+
}
398+
verify(service, times(0)).getRun(any());
399+
}
400+
401+
@Test
402+
public void testListRuns() {
403+
JobsService service = Mockito.mock(JobsService.class);
404+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true);
405+
addTasks(run1, 101L, 102L);
406+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true);
407+
addTasks(run2, 201L, 202L);
408+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
409+
addTasks(run3, 301L);
410+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true);
411+
addTasks(run4, 401L, 402L);
412+
413+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
414+
runsOnFirstPage.add(run1);
415+
runsOnFirstPage.add(run2);
416+
runsOnFirstPage.add(run3);
417+
ListRunsResponse listRunsResponse1 =
418+
new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token");
419+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
420+
runsOnSecondPage.add(run4);
421+
ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage);
422+
423+
when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2);
424+
425+
// runs/get?run_id=100
426+
Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token");
427+
addTasks(getRun1_page1, 101L, 102L);
428+
Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token");
429+
addTasks(getRun1_page2, 103L, 104L);
430+
Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L);
431+
addTasks(getRun1_page3, 105L);
432+
433+
// runs/get?run_id=200
434+
Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token");
435+
addTasks(getRun2_page1, 201L, 202L);
436+
Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L);
437+
addTasks(getRun2_page2, 203L);
438+
439+
// runs/get?run_id=300
440+
Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L);
441+
addTasks(getRun3_page1, 301L);
442+
443+
// runs/get?run_id=400
444+
Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token");
445+
addTasks(getRun4_page1, 401L, 402L);
446+
Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L);
447+
addTasks(getRun4_page2, 403L, 404L);
448+
449+
doReturn(getRun1_page1)
450+
.when(service)
451+
.getRun(
452+
argThat(
453+
request ->
454+
request != null
455+
&& request.getRunId() == 100L
456+
&& request.getPageToken() == null));
457+
doReturn(getRun1_page2)
458+
.when(service)
459+
.getRun(
460+
argThat(
461+
request ->
462+
request != null
463+
&& request.getRunId() == 100L
464+
&& "run1_page2token".equals(request.getPageToken())));
465+
doReturn(getRun1_page3)
466+
.when(service)
467+
.getRun(
468+
argThat(
469+
request ->
470+
request != null
471+
&& request.getRunId() == 100L
472+
&& "run1_page3token".equals(request.getPageToken())));
473+
doReturn(getRun2_page1)
474+
.when(service)
475+
.getRun(
476+
argThat(
477+
request ->
478+
request != null
479+
&& request.getRunId() == 200L
480+
&& request.getPageToken() == null));
481+
doReturn(getRun2_page2)
482+
.when(service)
483+
.getRun(
484+
argThat(
485+
request ->
486+
request != null
487+
&& request.getRunId() == 200L
488+
&& "run2_page2token".equals(request.getPageToken())));
489+
doReturn(getRun3_page1)
490+
.when(service)
491+
.getRun(
492+
argThat(
493+
request ->
494+
request != null
495+
&& request.getRunId() == 300L
496+
&& request.getPageToken() == null));
497+
doReturn(getRun4_page1)
498+
.when(service)
499+
.getRun(
500+
argThat(
501+
request ->
502+
request != null
503+
&& request.getRunId() == 400L
504+
&& request.getPageToken() == null));
505+
doReturn(getRun4_page2)
506+
.when(service)
507+
.getRun(
508+
argThat(
509+
request ->
510+
request != null
511+
&& request.getRunId() == 400L
512+
&& "run4_page2token".equals(request.getPageToken())));
513+
514+
JobsExt jobsExt = new JobsExt(service);
515+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(true);
516+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
517+
518+
BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L);
519+
addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L);
520+
BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L);
521+
addTasks(expectedRun2, 201L, 202L, 203L);
522+
BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L);
523+
addTasks(expectedRun3, 301L);
524+
BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L);
525+
addTasks(expectedRun4, 401L, 402L, 403L, 404L);
526+
List<BaseRun> expectedRunsList = new ArrayList<>();
527+
expectedRunsList.add(expectedRun1);
528+
expectedRunsList.add(expectedRun2);
529+
expectedRunsList.add(expectedRun3);
530+
expectedRunsList.add(expectedRun4);
531+
for (BaseRun run : runsList) {
532+
BaseRun expectedRun =
533+
expectedRunsList.stream()
534+
.filter(e -> e.getRunId().equals(run.getRunId()))
535+
.findFirst()
536+
.orElse(null);
537+
assertEquals(expectedRun, run);
538+
}
539+
// 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun
540+
// calls for run 400
541+
verify(service, times(7)).getRun(any());
542+
}
543+
544+
private void addTasks(BaseRun run, long... taskRunIds) {
545+
Collection<RunTask> tasks = new ArrayList<>();
546+
for (long runId : taskRunIds) {
547+
tasks.add(new RunTask().setRunId(runId));
548+
}
549+
run.setTasks(tasks);
550+
}
362551
}

0 commit comments

Comments
 (0)