Skip to content

Commit f8994b7

Browse files
Merge branch 'main' into ClustersImpI
2 parents 993093e + a6e3afb commit f8994b7

File tree

3 files changed

+240
-1
lines changed

3 files changed

+240
-1
lines changed

NEXT_CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
### Documentation
1010

1111
### Internal Changes
12-
* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
12+
* Update Jobs ListRuns API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
13+
* Update Jobs ListJobs API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410))
1314
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
1415
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
1516
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).

databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,55 @@ public BaseJob next() {
6161
};
6262
}
6363

64+
/**
65+
* List job runs.
66+
*
67+
* <p>Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or
68+
* repair_history, it will paginate through all pages and aggregate the results.
69+
*/
70+
public Iterable<BaseRun> listRuns(ListRunsRequest request) {
71+
// fetch runs with limited elements in top level arrays
72+
Iterable<BaseRun> runsList = super.listRuns(request);
73+
74+
if (!request.getExpandTasks()) {
75+
return runsList;
76+
}
77+
78+
Iterator<BaseRun> iterator = runsList.iterator();
79+
return () ->
80+
new Iterator<BaseRun>() {
81+
@Override
82+
public boolean hasNext() {
83+
return iterator.hasNext();
84+
}
85+
86+
@Override
87+
public BaseRun next() {
88+
BaseRun run = iterator.next();
89+
90+
// The has_more field is only present in run with 100+ tasks, that is served from Jobs
91+
// API 2.2.
92+
// Extra tasks and other fields need to be fetched only when has_more is true.
93+
if (run.getHasMore() != null && run.getHasMore()) {
94+
// fully fetch all top level arrays for the run
95+
GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId());
96+
Run fullRun = getRun(getRunRequest);
97+
run.setTasks(fullRun.getTasks());
98+
run.setJobClusters(fullRun.getJobClusters());
99+
run.setJobParameters(fullRun.getJobParameters());
100+
run.setRepairHistory(fullRun.getRepairHistory());
101+
}
102+
// Set the has_more fields to null.
103+
// This field in Jobs API 2.2 is useful for pagination. It indicates if there are more
104+
// than 100 tasks or job_clusters in the run.
105+
// This function hides pagination details from the user. So the field does not play
106+
// useful role here.
107+
run.setHasMore(null);
108+
return run;
109+
}
110+
};
111+
}
112+
64113
/**
65114
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
66115
* response contract.

databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,4 +359,193 @@ private void addTasks(BaseJob job, String... taskKeys) {
359359
}
360360
job.getSettings().setTasks(tasks);
361361
}
362+
363+
@Test
364+
public void testListRunsWithoutExpandTasks() {
365+
JobsService service = Mockito.mock(JobsService.class);
366+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L);
367+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L);
368+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
369+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L);
370+
371+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
372+
runsOnFirstPage.add(run1);
373+
runsOnFirstPage.add(run2);
374+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
375+
runsOnSecondPage.add(run3);
376+
runsOnSecondPage.add(run4);
377+
when(service.listRuns(any()))
378+
.thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage))
379+
.thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage));
380+
JobsExt jobsExt = new JobsExt(service);
381+
382+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(false);
383+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
384+
385+
List<BaseRun> expectedRunsList = new ArrayList<>();
386+
expectedRunsList.add(run1);
387+
expectedRunsList.add(run2);
388+
expectedRunsList.add(run3);
389+
expectedRunsList.add(run4);
390+
for (BaseRun run : runsList) {
391+
BaseRun expectedRun =
392+
expectedRunsList.stream()
393+
.filter(e -> e.getRunId().equals(run.getRunId()))
394+
.findFirst()
395+
.orElse(null);
396+
assertEquals(expectedRun, run);
397+
}
398+
verify(service, times(0)).getRun(any());
399+
}
400+
401+
@Test
402+
public void testListRuns() {
403+
JobsService service = Mockito.mock(JobsService.class);
404+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true);
405+
addTasks(run1, 101L, 102L);
406+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true);
407+
addTasks(run2, 201L, 202L);
408+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
409+
addTasks(run3, 301L);
410+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true);
411+
addTasks(run4, 401L, 402L);
412+
413+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
414+
runsOnFirstPage.add(run1);
415+
runsOnFirstPage.add(run2);
416+
runsOnFirstPage.add(run3);
417+
ListRunsResponse listRunsResponse1 =
418+
new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token");
419+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
420+
runsOnSecondPage.add(run4);
421+
ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage);
422+
423+
when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2);
424+
425+
// runs/get?run_id=100
426+
Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token");
427+
addTasks(getRun1_page1, 101L, 102L);
428+
Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token");
429+
addTasks(getRun1_page2, 103L, 104L);
430+
Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L);
431+
addTasks(getRun1_page3, 105L);
432+
433+
// runs/get?run_id=200
434+
Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token");
435+
addTasks(getRun2_page1, 201L, 202L);
436+
Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L);
437+
addTasks(getRun2_page2, 203L);
438+
439+
// runs/get?run_id=300
440+
Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L);
441+
addTasks(getRun3_page1, 301L);
442+
443+
// runs/get?run_id=400
444+
Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token");
445+
addTasks(getRun4_page1, 401L, 402L);
446+
Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L);
447+
addTasks(getRun4_page2, 403L, 404L);
448+
449+
doReturn(getRun1_page1)
450+
.when(service)
451+
.getRun(
452+
argThat(
453+
request ->
454+
request != null
455+
&& request.getRunId() == 100L
456+
&& request.getPageToken() == null));
457+
doReturn(getRun1_page2)
458+
.when(service)
459+
.getRun(
460+
argThat(
461+
request ->
462+
request != null
463+
&& request.getRunId() == 100L
464+
&& "run1_page2token".equals(request.getPageToken())));
465+
doReturn(getRun1_page3)
466+
.when(service)
467+
.getRun(
468+
argThat(
469+
request ->
470+
request != null
471+
&& request.getRunId() == 100L
472+
&& "run1_page3token".equals(request.getPageToken())));
473+
doReturn(getRun2_page1)
474+
.when(service)
475+
.getRun(
476+
argThat(
477+
request ->
478+
request != null
479+
&& request.getRunId() == 200L
480+
&& request.getPageToken() == null));
481+
doReturn(getRun2_page2)
482+
.when(service)
483+
.getRun(
484+
argThat(
485+
request ->
486+
request != null
487+
&& request.getRunId() == 200L
488+
&& "run2_page2token".equals(request.getPageToken())));
489+
doReturn(getRun3_page1)
490+
.when(service)
491+
.getRun(
492+
argThat(
493+
request ->
494+
request != null
495+
&& request.getRunId() == 300L
496+
&& request.getPageToken() == null));
497+
doReturn(getRun4_page1)
498+
.when(service)
499+
.getRun(
500+
argThat(
501+
request ->
502+
request != null
503+
&& request.getRunId() == 400L
504+
&& request.getPageToken() == null));
505+
doReturn(getRun4_page2)
506+
.when(service)
507+
.getRun(
508+
argThat(
509+
request ->
510+
request != null
511+
&& request.getRunId() == 400L
512+
&& "run4_page2token".equals(request.getPageToken())));
513+
514+
JobsExt jobsExt = new JobsExt(service);
515+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(true);
516+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
517+
518+
BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L);
519+
addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L);
520+
BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L);
521+
addTasks(expectedRun2, 201L, 202L, 203L);
522+
BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L);
523+
addTasks(expectedRun3, 301L);
524+
BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L);
525+
addTasks(expectedRun4, 401L, 402L, 403L, 404L);
526+
List<BaseRun> expectedRunsList = new ArrayList<>();
527+
expectedRunsList.add(expectedRun1);
528+
expectedRunsList.add(expectedRun2);
529+
expectedRunsList.add(expectedRun3);
530+
expectedRunsList.add(expectedRun4);
531+
for (BaseRun run : runsList) {
532+
BaseRun expectedRun =
533+
expectedRunsList.stream()
534+
.filter(e -> e.getRunId().equals(run.getRunId()))
535+
.findFirst()
536+
.orElse(null);
537+
assertEquals(expectedRun, run);
538+
}
539+
// 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun
540+
// calls for run 400
541+
verify(service, times(7)).getRun(any());
542+
}
543+
544+
private void addTasks(BaseRun run, long... taskRunIds) {
545+
Collection<RunTask> tasks = new ArrayList<>();
546+
for (long runId : taskRunIds) {
547+
tasks.add(new RunTask().setRunId(runId));
548+
}
549+
run.setTasks(tasks);
550+
}
362551
}

0 commit comments

Comments
 (0)