diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 8a67400d9..a97fbc84a 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,7 +9,8 @@ ### Documentation ### Internal Changes -* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411)) +* Update Jobs ListRuns API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411)) +* Update Jobs ListJobs API to support paginated responses ([#410](https://github.com/databricks/databricks-sdk-java/pull/410)) * Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)). * Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)). * Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)). diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index fea5e6733..87375b8d0 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -61,6 +61,55 @@ public BaseJob next() { }; } + /** + * List job runs. + * + *

Retrieve a list of runs. If the run has multiple pages of tasks, job_clusters, parameters or + * repair_history, it will paginate through all pages and aggregate the results. + */ + public Iterable listRuns(ListRunsRequest request) { + // fetch runs with limited elements in top level arrays + Iterable runsList = super.listRuns(request); + + if (!request.getExpandTasks()) { + return runsList; + } + + Iterator iterator = runsList.iterator(); + return () -> + new Iterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public BaseRun next() { + BaseRun run = iterator.next(); + + // The has_more field is only present in run with 100+ tasks, that is served from Jobs + // API 2.2. + // Extra tasks and other fields need to be fetched only when has_more is true. + if (run.getHasMore() != null && run.getHasMore()) { + // fully fetch all top level arrays for the run + GetRunRequest getRunRequest = new GetRunRequest().setRunId(run.getRunId()); + Run fullRun = getRun(getRunRequest); + run.setTasks(fullRun.getTasks()); + run.setJobClusters(fullRun.getJobClusters()); + run.setJobParameters(fullRun.getJobParameters()); + run.setRepairHistory(fullRun.getRepairHistory()); + } + // Set the has_more fields to null. + // This field in Jobs API 2.2 is useful for pagination. It indicates if there are more + // than 100 tasks or job_clusters in the run. + // This function hides pagination details from the user. So the field does not play + // useful role here. + run.setHasMore(null); + return run; + } + }; + } + /** * Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the * response contract. diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java index c4bb49033..66561a93e 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -359,4 +359,193 @@ private void addTasks(BaseJob job, String... taskKeys) { } job.getSettings().setTasks(tasks); } + + @Test + public void testListRunsWithoutExpandTasks() { + JobsService service = Mockito.mock(JobsService.class); + BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L); + BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L); + BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L); + BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L); + + List runsOnFirstPage = new ArrayList<>(); + runsOnFirstPage.add(run1); + runsOnFirstPage.add(run2); + List runsOnSecondPage = new ArrayList<>(); + runsOnSecondPage.add(run3); + runsOnSecondPage.add(run4); + when(service.listRuns(any())) + .thenReturn(new ListRunsResponse().setRuns(runsOnFirstPage)) + .thenReturn(new ListRunsResponse().setRuns(runsOnSecondPage)); + JobsExt jobsExt = new JobsExt(service); + + ListRunsRequest request = new ListRunsRequest().setExpandTasks(false); + Iterable runsList = jobsExt.listRuns(request); + + List expectedRunsList = new ArrayList<>(); + expectedRunsList.add(run1); + expectedRunsList.add(run2); + expectedRunsList.add(run3); + expectedRunsList.add(run4); + for (BaseRun run : runsList) { + BaseRun expectedRun = + expectedRunsList.stream() + .filter(e -> e.getRunId().equals(run.getRunId())) + .findFirst() + .orElse(null); + assertEquals(expectedRun, run); + } + verify(service, times(0)).getRun(any()); + } + + @Test + public void testListRuns() { + JobsService service = Mockito.mock(JobsService.class); + BaseRun run1 = new BaseRun().setRunId(100L).setJobId(1L).setHasMore(true); + addTasks(run1, 101L, 102L); + BaseRun run2 = new BaseRun().setRunId(200L).setJobId(2L).setHasMore(true); + addTasks(run2, 201L, 202L); + BaseRun run3 = new BaseRun().setRunId(300L).setJobId(3L); + addTasks(run3, 301L); + BaseRun run4 = new BaseRun().setRunId(400L).setJobId(4L).setHasMore(true); + addTasks(run4, 401L, 402L); + + List runsOnFirstPage = new ArrayList<>(); + runsOnFirstPage.add(run1); + runsOnFirstPage.add(run2); + runsOnFirstPage.add(run3); + ListRunsResponse listRunsResponse1 = + new ListRunsResponse().setRuns(runsOnFirstPage).setNextPageToken("page2token"); + List runsOnSecondPage = new ArrayList<>(); + runsOnSecondPage.add(run4); + ListRunsResponse listRunsResponse2 = new ListRunsResponse().setRuns(runsOnSecondPage); + + when(service.listRuns(any())).thenReturn(listRunsResponse1).thenReturn(listRunsResponse2); + + // runs/get?run_id=100 + Run getRun1_page1 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page2token"); + addTasks(getRun1_page1, 101L, 102L); + Run getRun1_page2 = new Run().setRunId(100L).setJobId(1L).setNextPageToken("run1_page3token"); + addTasks(getRun1_page2, 103L, 104L); + Run getRun1_page3 = new Run().setRunId(100L).setJobId(1L); + addTasks(getRun1_page3, 105L); + + // runs/get?run_id=200 + Run getRun2_page1 = new Run().setRunId(200L).setJobId(2L).setNextPageToken("run2_page2token"); + addTasks(getRun2_page1, 201L, 202L); + Run getRun2_page2 = new Run().setRunId(200L).setJobId(2L); + addTasks(getRun2_page2, 203L); + + // runs/get?run_id=300 + Run getRun3_page1 = new Run().setRunId(300L).setJobId(3L); + addTasks(getRun3_page1, 301L); + + // runs/get?run_id=400 + Run getRun4_page1 = new Run().setRunId(400L).setJobId(4L).setNextPageToken("run4_page2token"); + addTasks(getRun4_page1, 401L, 402L); + Run getRun4_page2 = new Run().setRunId(400L).setJobId(4L); + addTasks(getRun4_page2, 403L, 404L); + + doReturn(getRun1_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && request.getPageToken() == null)); + doReturn(getRun1_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && "run1_page2token".equals(request.getPageToken()))); + doReturn(getRun1_page3) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 100L + && "run1_page3token".equals(request.getPageToken()))); + doReturn(getRun2_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 200L + && request.getPageToken() == null)); + doReturn(getRun2_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 200L + && "run2_page2token".equals(request.getPageToken()))); + doReturn(getRun3_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 300L + && request.getPageToken() == null)); + doReturn(getRun4_page1) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 400L + && request.getPageToken() == null)); + doReturn(getRun4_page2) + .when(service) + .getRun( + argThat( + request -> + request != null + && request.getRunId() == 400L + && "run4_page2token".equals(request.getPageToken()))); + + JobsExt jobsExt = new JobsExt(service); + ListRunsRequest request = new ListRunsRequest().setExpandTasks(true); + Iterable runsList = jobsExt.listRuns(request); + + BaseRun expectedRun1 = new BaseRun().setRunId(100L).setJobId(1L); + addTasks(expectedRun1, 101L, 102L, 103L, 104L, 105L); + BaseRun expectedRun2 = new BaseRun().setRunId(200L).setJobId(2L); + addTasks(expectedRun2, 201L, 202L, 203L); + BaseRun expectedRun3 = new BaseRun().setRunId(300L).setJobId(3L); + addTasks(expectedRun3, 301L); + BaseRun expectedRun4 = new BaseRun().setRunId(400L).setJobId(4L); + addTasks(expectedRun4, 401L, 402L, 403L, 404L); + List expectedRunsList = new ArrayList<>(); + expectedRunsList.add(expectedRun1); + expectedRunsList.add(expectedRun2); + expectedRunsList.add(expectedRun3); + expectedRunsList.add(expectedRun4); + for (BaseRun run : runsList) { + BaseRun expectedRun = + expectedRunsList.stream() + .filter(e -> e.getRunId().equals(run.getRunId())) + .findFirst() + .orElse(null); + assertEquals(expectedRun, run); + } + // 3 getRun calls for run 100, 2 getRun calls for run 200, 0 getRun call for run 300, 2 getRun + // calls for run 400 + verify(service, times(7)).getRun(any()); + } + + private void addTasks(BaseRun run, long... taskRunIds) { + Collection tasks = new ArrayList<>(); + for (long runId : taskRunIds) { + tasks.add(new RunTask().setRunId(runId)); + } + run.setTasks(tasks); + } }