Skip to content

Commit bd446fe

Browse files
committed
Add logic and tests for ListRuns
1 parent 47b2a5a commit bd446fe

File tree

2 files changed

+228
-0
lines changed

2 files changed

+228
-0
lines changed

databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.databricks.sdk.core.ApiClient;
44
import com.databricks.sdk.service.jobs.*;
55
import java.util.Collection;
6+
import java.util.Iterator;
67

78
public class JobsExt extends JobsAPI {
89

@@ -14,6 +15,44 @@ public JobsExt(JobsService mock) {
1415
super(mock);
1516
}
1617

18+
/**
19+
* List job runs.
20+
*
21+
* <p>Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or
22+
* repair_history, it will paginate through all pages and aggregate the results.
23+
*/
24+
public Iterable<BaseRun> listRuns(ListRunsRequest request) {
25+
// fetch runs with limited elements in top level arrays
26+
Iterable<BaseRun> runsList = super.listRuns(request);
27+
28+
if (!request.getExpandTasks()) {
29+
return runsList;
30+
}
31+
32+
Iterator<BaseRun> iterator = runsList.iterator();
33+
return () ->
34+
new Iterator<BaseRun>() {
35+
@Override
36+
public boolean hasNext() {
37+
return iterator.hasNext();
38+
}
39+
40+
@Override
41+
public BaseRun next() {
42+
BaseRun run = iterator.next();
43+
// fully fetch all top level arrays for the run
44+
GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId());
45+
Run fullRun = getRun(getRunRequest);
46+
run.setTasks(fullRun.getTasks());
47+
run.setJobClusters(fullRun.getJobClusters());
48+
run.setJobParameters(fullRun.getJobParameters());
49+
run.setRepairHistory(fullRun.getRepairHistory());
50+
run.setHasMore(false);
51+
return run;
52+
}
53+
};
54+
}
55+
1756
/**
1857
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
1958
* response contract.

databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,4 +202,193 @@ private void addJobEnvironments(Job job, String... environmentKeys) {
202202
}
203203
job.getSettings().setEnvironments(environments);
204204
}
205+
206+
@Test
207+
public void testListRunsWithoutExpandTasks() {
208+
JobsService service = Mockito.mock(JobsService.class);
209+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L);
210+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L);
211+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
212+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L);
213+
214+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
215+
runsOnFirstPage.add(run1);
216+
runsOnFirstPage.add(run2);
217+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
218+
runsOnSecondPage.add(run3);
219+
runsOnSecondPage.add(run4);
220+
when(service.listRuns(any()))
221+
.thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage))
222+
.thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage));
223+
JobsExt jobsExt = new JobsExt(service);
224+
225+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(false);
226+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
227+
228+
List<BaseRun> expectedRunsList = new ArrayList<>();
229+
expectedRunsList.add(run1);
230+
expectedRunsList.add(run2);
231+
expectedRunsList.add(run3);
232+
expectedRunsList.add(run4);
233+
for (BaseRun run : runsList) {
234+
BaseRun expectedRun =
235+
expectedRunsList.stream()
236+
.filter(e -> e.getRunId().equals(run.getRunId()))
237+
.findFirst()
238+
.orElse(null);
239+
assertEquals(expectedRun, run);
240+
}
241+
verify(service, times(0)).getRun(any());
242+
}
243+
244+
@Test
245+
public void testListRuns() {
246+
JobsService service = Mockito.mock(JobsService.class);
247+
BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true);
248+
addTasks(run1, 101L, 102L);
249+
BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true);
250+
addTasks(run2, 201L, 202L);
251+
BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L);
252+
addTasks(run3, 301L);
253+
BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true);
254+
addTasks(run4, 401L, 402L);
255+
256+
List<BaseRun> runsOnFirstPage = new ArrayList<>();
257+
runsOnFirstPage.add(run1);
258+
runsOnFirstPage.add(run2);
259+
runsOnFirstPage.add(run3);
260+
ListRunsResponse listRunsResponse1 =
261+
new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token");
262+
List<BaseRun> runsOnSecondPage = new ArrayList<>();
263+
runsOnSecondPage.add(run4);
264+
ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage);
265+
266+
when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2);
267+
268+
// runs/get?run_id=100
269+
Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token");
270+
addTasks(getRun1_page1, 101L, 102L);
271+
Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token");
272+
addTasks(getRun1_page2, 103L, 104L);
273+
Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L);
274+
addTasks(getRun1_page3, 105L);
275+
276+
// runs/get?run_id=200
277+
Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token");
278+
addTasks(getRun2_page1, 201L, 202L);
279+
Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L);
280+
addTasks(getRun2_page2, 203L);
281+
282+
// runs/get?run_id=300
283+
Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L);
284+
addTasks(getRun3_page1, 301L);
285+
286+
// runs/get?run_id=400
287+
Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token");
288+
addTasks(getRun4_page1, 401L, 402L);
289+
Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L);
290+
addTasks(getRun4_page2, 403L, 404L);
291+
292+
doReturn(getRun1_page1)
293+
.when(service)
294+
.getRun(
295+
argThat(
296+
request ->
297+
request != null
298+
&& request.getRunId() == 100L
299+
&& request.getPageToken() == null));
300+
doReturn(getRun1_page2)
301+
.when(service)
302+
.getRun(
303+
argThat(
304+
request ->
305+
request != null
306+
&& request.getRunId() == 100L
307+
&& "run1_page2token".equals(request.getPageToken())));
308+
doReturn(getRun1_page3)
309+
.when(service)
310+
.getRun(
311+
argThat(
312+
request ->
313+
request != null
314+
&& request.getRunId() == 100L
315+
&& "run1_page3token".equals(request.getPageToken())));
316+
doReturn(getRun2_page1)
317+
.when(service)
318+
.getRun(
319+
argThat(
320+
request ->
321+
request != null
322+
&& request.getRunId() == 200L
323+
&& request.getPageToken() == null));
324+
doReturn(getRun2_page2)
325+
.when(service)
326+
.getRun(
327+
argThat(
328+
request ->
329+
request != null
330+
&& request.getRunId() == 200L
331+
&& "run2_page2token".equals(request.getPageToken())));
332+
doReturn(getRun3_page1)
333+
.when(service)
334+
.getRun(
335+
argThat(
336+
request ->
337+
request != null
338+
&& request.getRunId() == 300L
339+
&& request.getPageToken() == null));
340+
doReturn(getRun4_page1)
341+
.when(service)
342+
.getRun(
343+
argThat(
344+
request ->
345+
request != null
346+
&& request.getRunId() == 400L
347+
&& request.getPageToken() == null));
348+
doReturn(getRun4_page2)
349+
.when(service)
350+
.getRun(
351+
argThat(
352+
request ->
353+
request != null
354+
&& request.getRunId() == 400L
355+
&& "run4_page2token".equals(request.getPageToken())));
356+
357+
JobsExt jobsExt = new JobsExt(service);
358+
ListRunsRequest request = new ListRunsRequest().setExpandTasks(true);
359+
Iterable<BaseRun> runsList = jobsExt.listRuns(request);
360+
361+
BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(false);
362+
addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L);
363+
BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(false);
364+
addTasks(expectedRun2, 201L, 202L, 203L);
365+
BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L).setHasMore(false);
366+
addTasks(expectedRun3, 301L);
367+
BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(false);
368+
addTasks(expectedRun4, 401L, 402L, 403L, 404L);
369+
List<BaseRun> expectedRunsList = new ArrayList<>();
370+
expectedRunsList.add(expectedRun1);
371+
expectedRunsList.add(expectedRun2);
372+
expectedRunsList.add(expectedRun3);
373+
expectedRunsList.add(expectedRun4);
374+
for (BaseRun run : runsList) {
375+
BaseRun expectedRun =
376+
expectedRunsList.stream()
377+
.filter(e -> e.getRunId().equals(run.getRunId()))
378+
.findFirst()
379+
.orElse(null);
380+
assertEquals(expectedRun, run);
381+
}
382+
// 3 getRun calls for run 100, 2 getRun calls for run 200, 1 getRun call for run 300, 2 getRun
383+
// calls for run 400
384+
verify(service, times(8)).getRun(any());
385+
}
386+
387+
private void addTasks(BaseRun run, long... taskRunIds) {
388+
Collection<RunTask> tasks = new ArrayList<>();
389+
for (long runId : taskRunIds) {
390+
tasks.add(new RunTask().setRunId(runId));
391+
}
392+
run.setTasks(tasks);
393+
}
205394
}

0 commit comments

Comments
 (0)