diff --git a/NEXT_CHANGELOG.md b/NEXT_CHANGELOG.md index 584443e52..8a67400d9 100644 --- a/NEXT_CHANGELOG.md +++ b/NEXT_CHANGELOG.md @@ -9,6 +9,7 @@ ### Documentation ### Internal Changes +* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411)) * Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)). * Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)). * Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)). diff --git a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java index 8dfd0a7a8..fea5e6733 100644 --- a/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java +++ b/databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java @@ -3,6 +3,7 @@ import com.databricks.sdk.core.ApiClient; import com.databricks.sdk.service.jobs.*; import java.util.Collection; +import java.util.Iterator; public class JobsExt extends JobsAPI { @@ -14,6 +15,52 @@ public JobsExt(JobsService mock) { super(mock); } + /** + * List jobs. + * + *

Retrieves a list of jobs. If the job has multiple pages of tasks, job_clusters, parameters + * or environments, it will paginate through all pages and aggregate the results. + */ + public Iterable list(ListJobsRequest request) { + // fetch jobs with limited elements in top level arrays + Iterable jobsList = super.list(request); + + if (!request.getExpandTasks()) { + return jobsList; + } + + Iterator iterator = jobsList.iterator(); + return () -> + new Iterator() { + @Override + public boolean hasNext() { + return iterator.hasNext(); + } + + @Override + public BaseJob next() { + BaseJob job = iterator.next(); + + // The has_more field is only present in jobs with 100+ tasks, that is served from Jobs + // API 2.2. + // Extra tasks and other fields need to be fetched only when has_more is true. + if (job.getHasMore() != null && job.getHasMore()) { + // fully fetch all top level arrays for the job + GetJobRequest getJobRequest = new GetJobRequest().setJobId(job.getJobId()); + Job fullJob = get(getJobRequest); + job.getSettings().setTasks(fullJob.getSettings().getTasks()); + job.getSettings().setJobClusters(fullJob.getSettings().getJobClusters()); + job.getSettings().setParameters(fullJob.getSettings().getParameters()); + job.getSettings().setEnvironments(fullJob.getSettings().getEnvironments()); + } + // Set the has_more field to false to indicate that there are no more tasks and other + // fields to fetch. + job.setHasMore(null); + return job; + } + }; + } + /** * Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the * response contract. diff --git a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java index 46b04d912..c4bb49033 100644 --- a/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java +++ b/databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java @@ -1,6 +1,7 @@ package com.databricks.sdk.mixin; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -202,4 +203,160 @@ private void addJobEnvironments(Job job, String... environmentKeys) { } job.getSettings().setEnvironments(environments); } + + @Test + public void testListJobsWithoutExpandTasks() { + JobsService service = Mockito.mock(JobsService.class); + BaseJob job1 = new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1")); + BaseJob job2 = new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2")); + BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3")); + BaseJob job4 = new BaseJob().setJobId(400L).setSettings(new JobSettings().setName("job4")); + + List jobsOnFirstPage = new ArrayList<>(); + jobsOnFirstPage.add(job1); + jobsOnFirstPage.add(job2); + List jobsOnSecondPage = new ArrayList<>(); + jobsOnSecondPage.add(job3); + jobsOnSecondPage.add(job4); + when(service.list(any())) + .thenReturn(new ListJobsResponse().setJobs(jobsOnFirstPage)) + .thenReturn(new ListJobsResponse().setJobs(jobsOnSecondPage)); + JobsExt jobsExt = new JobsExt(service); + + ListJobsRequest request = new ListJobsRequest().setExpandTasks(false); + Iterable jobsList = jobsExt.list(request); + + List expectedJobsList = new ArrayList<>(); + expectedJobsList.add(job1); + expectedJobsList.add(job2); + expectedJobsList.add(job3); + expectedJobsList.add(job4); + for (BaseJob job : jobsList) { + BaseJob expectedJob = + expectedJobsList.stream() + .filter(e -> e.getJobId().equals(job.getJobId())) + .findFirst() + .orElse(null); + assertEquals(expectedJob, job); + } + verify(service, times(0)).get(any()); + } + + @Test + public void testListJobs() { + JobsService service = Mockito.mock(JobsService.class); + BaseJob job1 = + new BaseJob() + .setJobId(100L) + .setSettings(new JobSettings().setName("job1")) + .setHasMore(true); + addTasks(job1, "job1_taskKey1", "job1_taskKey2"); + BaseJob job2 = + new BaseJob() + .setJobId(200L) + .setSettings(new JobSettings().setName("job2")) + .setHasMore(true); + addTasks(job2, "job2_taskKey1", "job2_taskKey2"); + BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3")); + addTasks(job3, "job3_taskKey1", "job3_taskKey2"); + + Job getJob1_page1 = + new Job() + .setJobId(100L) + .setNextPageToken("job1_page2token") + .setSettings(new JobSettings().setName("job1")); + addTasks(getJob1_page1, "job1_taskKey1", "job1_taskKey2"); + Job getJob1_page2 = new Job().setJobId(100L).setSettings(new JobSettings().setName("job1")); + addTasks(getJob1_page2, "job1_taskKey3", "job1_taskKey4"); + + Job getJob2_page1 = + new Job() + .setJobId(200L) + .setNextPageToken("job2_page2token") + .setSettings(new JobSettings().setName("job2")); + addTasks(getJob2_page1, "job2_taskKey1", "job2_taskKey2"); + Job getJob2_page2 = new Job().setJobId(200L).setSettings(new JobSettings().setName("job2")); + addTasks(getJob2_page2, "job2_taskKey3", "job2_taskKey4"); + + doReturn(getJob1_page1) + .when(service) + .get( + argThat( + request -> + request != null + && request.getJobId() == 100L + && request.getPageToken() == null)); + doReturn(getJob1_page2) + .when(service) + .get( + argThat( + request -> + request != null + && request.getJobId() == 100L + && "job1_page2token".equals(request.getPageToken()))); + doReturn(getJob2_page1) + .when(service) + .get( + argThat( + request -> + request != null + && request.getJobId() == 200L + && request.getPageToken() == null)); + doReturn(getJob2_page2) + .when(service) + .get( + argThat( + request -> + request != null + && request.getJobId() == 200L + && "job2_page2token".equals(request.getPageToken()))); + + List jobsOnFirstPage = new ArrayList<>(); + jobsOnFirstPage.add(job1); + jobsOnFirstPage.add(job2); + ListJobsResponse responsePage1 = + new ListJobsResponse().setJobs(jobsOnFirstPage).setNextPageToken("page2token"); + List jobsOnSecondPage = new ArrayList<>(); + jobsOnSecondPage.add(job3); + ListJobsResponse responsePage2 = new ListJobsResponse().setJobs(jobsOnSecondPage); + + JobsExt jobsExt = new JobsExt(service); + when(service.list(any())).thenReturn(responsePage1).thenReturn(responsePage2); + + ListJobsRequest request = new ListJobsRequest().setExpandTasks(true); + Iterable jobsList = jobsExt.list(request); + + BaseJob expectedJob1 = + new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1")); + addTasks(expectedJob1, "job1_taskKey1", "job1_taskKey2", "job1_taskKey3", "job1_taskKey4"); + BaseJob expectedJob2 = + new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2")); + addTasks(expectedJob2, "job2_taskKey1", "job2_taskKey2", "job2_taskKey3", "job2_taskKey4"); + BaseJob expectedJob3 = + new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3")); + addTasks(expectedJob3, "job3_taskKey1", "job3_taskKey2"); + List expectedJobsList = new ArrayList<>(); + expectedJobsList.add(expectedJob1); + expectedJobsList.add(expectedJob2); + expectedJobsList.add(expectedJob3); + for (BaseJob job : jobsList) { + BaseJob expectedJob = + expectedJobsList.stream() + .filter(e -> e.getJobId().equals(job.getJobId())) + .findFirst() + .orElse(null); + assertEquals(expectedJob, job); + assertNull(expectedJob.getHasMore()); + } + // 2 getRun calls for job1, 2 getRun calls for job2, 0 getRun call for job3 + verify(service, times(4)).get(any()); + } + + private void addTasks(BaseJob job, String... taskKeys) { + Collection tasks = new ArrayList<>(); + for (String taskKey : taskKeys) { + tasks.add(new Task().setTaskKey(taskKey)); + } + job.getSettings().setTasks(tasks); + } }