Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions NEXT_CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
### Documentation

### Internal Changes
* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.databricks.sdk.core.ApiClient;
import com.databricks.sdk.service.jobs.*;
import java.util.Collection;
import java.util.Iterator;

public class JobsExt extends JobsAPI {

Expand All @@ -14,6 +15,52 @@ public JobsExt(JobsService mock) {
super(mock);
}

/**
* List jobs.
*
* <p>Retrieves a list of jobs. If the job has multiple pages of tasks, job_clusters, parameters
* or environments, it will paginate through all pages and aggregate the results.
*/
public Iterable<BaseJob> list(ListJobsRequest request) {
// fetch jobs with limited elements in top level arrays
Iterable<BaseJob> jobsList = super.list(request);

if (!request.getExpandTasks()) {
return jobsList;
}

Iterator<BaseJob> iterator = jobsList.iterator();
return () ->
new Iterator<BaseJob>() {
@Override
public boolean hasNext() {
return iterator.hasNext();
}

@Override
public BaseJob next() {
BaseJob job = iterator.next();

// The has_more field is only present in jobs with 100+ tasks, that is served from Jobs
// API 2.2.
// Extra tasks and other fields need to be fetched only when has_more is true.
if (job.getHasMore() != null && job.getHasMore()) {
// fully fetch all top level arrays for the job
GetJobRequest getJobRequest = new GetJobRequest().setJobId(job.getJobId());
Job fullJob = get(getJobRequest);
job.getSettings().setTasks(fullJob.getSettings().getTasks());
job.getSettings().setJobClusters(fullJob.getSettings().getJobClusters());
job.getSettings().setParameters(fullJob.getSettings().getParameters());
job.getSettings().setEnvironments(fullJob.getSettings().getEnvironments());
}
// Set the has_more field to false to indicate that there are no more tasks and other
// fields to fetch.
job.setHasMore(null);
return job;
}
};
}

/**
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
* response contract.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.databricks.sdk.mixin;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -202,4 +203,160 @@ private void addJobEnvironments(Job job, String... environmentKeys) {
}
job.getSettings().setEnvironments(environments);
}

@Test
public void testListJobsWithoutExpandTasks() {
JobsService service = Mockito.mock(JobsService.class);
BaseJob job1 = new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1"));
BaseJob job2 = new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2"));
BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
BaseJob job4 = new BaseJob().setJobId(400L).setSettings(new JobSettings().setName("job4"));

List<BaseJob> jobsOnFirstPage = new ArrayList<>();
jobsOnFirstPage.add(job1);
jobsOnFirstPage.add(job2);
List<BaseJob> jobsOnSecondPage = new ArrayList<>();
jobsOnSecondPage.add(job3);
jobsOnSecondPage.add(job4);
when(service.list(any()))
.thenReturn(new ListJobsResponse().setJobs(jobsOnFirstPage))
.thenReturn(new ListJobsResponse().setJobs(jobsOnSecondPage));
JobsExt jobsExt = new JobsExt(service);

ListJobsRequest request = new ListJobsRequest().setExpandTasks(false);
Iterable<BaseJob> jobsList = jobsExt.list(request);

List<BaseJob> expectedJobsList = new ArrayList<>();
expectedJobsList.add(job1);
expectedJobsList.add(job2);
expectedJobsList.add(job3);
expectedJobsList.add(job4);
for (BaseJob job : jobsList) {
BaseJob expectedJob =
expectedJobsList.stream()
.filter(e -> e.getJobId().equals(job.getJobId()))
.findFirst()
.orElse(null);
assertEquals(expectedJob, job);
}
verify(service, times(0)).get(any());
}

@Test
public void testListJobs() {
JobsService service = Mockito.mock(JobsService.class);
BaseJob job1 =
new BaseJob()
.setJobId(100L)
.setSettings(new JobSettings().setName("job1"))
.setHasMore(true);
addTasks(job1, "job1_taskKey1", "job1_taskKey2");
BaseJob job2 =
new BaseJob()
.setJobId(200L)
.setSettings(new JobSettings().setName("job2"))
.setHasMore(true);
addTasks(job2, "job2_taskKey1", "job2_taskKey2");
BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
addTasks(job3, "job3_taskKey1", "job3_taskKey2");

Job getJob1_page1 =
new Job()
.setJobId(100L)
.setNextPageToken("job1_page2token")
.setSettings(new JobSettings().setName("job1"));
addTasks(getJob1_page1, "job1_taskKey1", "job1_taskKey2");
Job getJob1_page2 = new Job().setJobId(100L).setSettings(new JobSettings().setName("job1"));
addTasks(getJob1_page2, "job1_taskKey3", "job1_taskKey4");

Job getJob2_page1 =
new Job()
.setJobId(200L)
.setNextPageToken("job2_page2token")
.setSettings(new JobSettings().setName("job2"));
addTasks(getJob2_page1, "job2_taskKey1", "job2_taskKey2");
Job getJob2_page2 = new Job().setJobId(200L).setSettings(new JobSettings().setName("job2"));
addTasks(getJob2_page2, "job2_taskKey3", "job2_taskKey4");

doReturn(getJob1_page1)
.when(service)
.get(
argThat(
request ->
request != null
&& request.getJobId() == 100L
&& request.getPageToken() == null));
doReturn(getJob1_page2)
.when(service)
.get(
argThat(
request ->
request != null
&& request.getJobId() == 100L
&& "job1_page2token".equals(request.getPageToken())));
doReturn(getJob2_page1)
.when(service)
.get(
argThat(
request ->
request != null
&& request.getJobId() == 200L
&& request.getPageToken() == null));
doReturn(getJob2_page2)
.when(service)
.get(
argThat(
request ->
request != null
&& request.getJobId() == 200L
&& "job2_page2token".equals(request.getPageToken())));

List<BaseJob> jobsOnFirstPage = new ArrayList<>();
jobsOnFirstPage.add(job1);
jobsOnFirstPage.add(job2);
ListJobsResponse responsePage1 =
new ListJobsResponse().setJobs(jobsOnFirstPage).setNextPageToken("page2token");
List<BaseJob> jobsOnSecondPage = new ArrayList<>();
jobsOnSecondPage.add(job3);
ListJobsResponse responsePage2 = new ListJobsResponse().setJobs(jobsOnSecondPage);

JobsExt jobsExt = new JobsExt(service);
when(service.list(any())).thenReturn(responsePage1).thenReturn(responsePage2);

ListJobsRequest request = new ListJobsRequest().setExpandTasks(true);
Iterable<BaseJob> jobsList = jobsExt.list(request);

BaseJob expectedJob1 =
new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1"));
addTasks(expectedJob1, "job1_taskKey1", "job1_taskKey2", "job1_taskKey3", "job1_taskKey4");
BaseJob expectedJob2 =
new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2"));
addTasks(expectedJob2, "job2_taskKey1", "job2_taskKey2", "job2_taskKey3", "job2_taskKey4");
BaseJob expectedJob3 =
new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
addTasks(expectedJob3, "job3_taskKey1", "job3_taskKey2");
List<BaseJob> expectedJobsList = new ArrayList<>();
expectedJobsList.add(expectedJob1);
expectedJobsList.add(expectedJob2);
expectedJobsList.add(expectedJob3);
for (BaseJob job : jobsList) {
BaseJob expectedJob =
expectedJobsList.stream()
.filter(e -> e.getJobId().equals(job.getJobId()))
.findFirst()
.orElse(null);
assertEquals(expectedJob, job);
assertNull(expectedJob.getHasMore());
}
// 2 getRun calls for job1, 2 getRun calls for job2, 0 getRun call for job3
verify(service, times(4)).get(any());
}

private void addTasks(BaseJob job, String... taskKeys) {
Collection<Task> tasks = new ArrayList<>();
for (String taskKey : taskKeys) {
tasks.add(new Task().setTaskKey(taskKey));
}
job.getSettings().setTasks(tasks);
}
}
Loading