Skip to content

Commit 65327b9

Browse files
authored
[Internal] Update Jobs list function to support paginated responses (#410)
## What changes are proposed in this pull request? Introduces logic in the extension for jobs ListJobs call. The extended logic accounts for the new response format of API 2.2. API 2.1 format returns all tasks and job_cluster for each job in the jobs list. API 2.2 format truncates tasks and job_cluster to 100 elements. The extended ListJobs logic calls GetJob for each job in the list to populate the full list of tasks and job_clusters. The code consumes iterator from `super.list` and produces iterator that contains already paginated jobs. ## How is this tested? Unit tests. I can't do manual tests because I haven't figured out how to run the code locally.
1 parent 47b2a5a commit 65327b9

File tree

3 files changed

+205
-0
lines changed

3 files changed

+205
-0
lines changed

NEXT_CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
### Documentation
1010

1111
### Internal Changes
12+
* Update Jobs ListJobs API to support paginated responses ([#411](https://github.com/databricks/databricks-sdk-java/pull/411))
1213
* Introduce automated tagging ([#409](https://github.com/databricks/databricks-sdk-java/pull/409)).
1314
* Update Jobs GetJob API to support paginated responses ([#403](https://github.com/databricks/databricks-sdk-java/pull/403)).
1415
* Update Jobs GetRun API to support paginated responses ([#402](https://github.com/databricks/databricks-sdk-java/pull/402)).

databricks-sdk-java/src/main/java/com/databricks/sdk/mixin/JobsExt.java

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.databricks.sdk.core.ApiClient;
44
import com.databricks.sdk.service.jobs.*;
55
import java.util.Collection;
6+
import java.util.Iterator;
67

78
public class JobsExt extends JobsAPI {
89

@@ -14,6 +15,52 @@ public JobsExt(JobsService mock) {
1415
super(mock);
1516
}
1617

18+
/**
19+
* List jobs.
20+
*
21+
* <p>Retrieves a list of jobs. If the job has multiple pages of tasks, job_clusters, parameters
22+
* or environments, it will paginate through all pages and aggregate the results.
23+
*/
24+
public Iterable<BaseJob> list(ListJobsRequest request) {
25+
// fetch jobs with limited elements in top level arrays
26+
Iterable<BaseJob> jobsList = super.list(request);
27+
28+
if (!request.getExpandTasks()) {
29+
return jobsList;
30+
}
31+
32+
Iterator<BaseJob> iterator = jobsList.iterator();
33+
return () ->
34+
new Iterator<BaseJob>() {
35+
@Override
36+
public boolean hasNext() {
37+
return iterator.hasNext();
38+
}
39+
40+
@Override
41+
public BaseJob next() {
42+
BaseJob job = iterator.next();
43+
44+
// The has_more field is only present in jobs with 100+ tasks, that is served from Jobs
45+
// API 2.2.
46+
// Extra tasks and other fields need to be fetched only when has_more is true.
47+
if (job.getHasMore() != null && job.getHasMore()) {
48+
// fully fetch all top level arrays for the job
49+
GetJobRequest getJobRequest = new GetJobRequest().setJobId(job.getJobId());
50+
Job fullJob = get(getJobRequest);
51+
job.getSettings().setTasks(fullJob.getSettings().getTasks());
52+
job.getSettings().setJobClusters(fullJob.getSettings().getJobClusters());
53+
job.getSettings().setParameters(fullJob.getSettings().getParameters());
54+
job.getSettings().setEnvironments(fullJob.getSettings().getEnvironments());
55+
}
56+
// Set the has_more field to false to indicate that there are no more tasks and other
57+
// fields to fetch.
58+
job.setHasMore(null);
59+
return job;
60+
}
61+
};
62+
}
63+
1764
/**
1865
* Wrap the {@code JobsApi.getRun} operation to retrieve paginated content without breaking the
1966
* response contract.

databricks-sdk-java/src/test/java/com/databricks/sdk/mixin/JobsExtTest.java

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.databricks.sdk.mixin;
22

33
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertNull;
45
import static org.mockito.ArgumentMatchers.any;
56
import static org.mockito.Mockito.*;
67

@@ -202,4 +203,160 @@ private void addJobEnvironments(Job job, String... environmentKeys) {
202203
}
203204
job.getSettings().setEnvironments(environments);
204205
}
206+
207+
@Test
208+
public void testListJobsWithoutExpandTasks() {
209+
JobsService service = Mockito.mock(JobsService.class);
210+
BaseJob job1 = new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1"));
211+
BaseJob job2 = new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2"));
212+
BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
213+
BaseJob job4 = new BaseJob().setJobId(400L).setSettings(new JobSettings().setName("job4"));
214+
215+
List<BaseJob> jobsOnFirstPage = new ArrayList<>();
216+
jobsOnFirstPage.add(job1);
217+
jobsOnFirstPage.add(job2);
218+
List<BaseJob> jobsOnSecondPage = new ArrayList<>();
219+
jobsOnSecondPage.add(job3);
220+
jobsOnSecondPage.add(job4);
221+
when(service.list(any()))
222+
.thenReturn(new ListJobsResponse().setJobs(jobsOnFirstPage))
223+
.thenReturn(new ListJobsResponse().setJobs(jobsOnSecondPage));
224+
JobsExt jobsExt = new JobsExt(service);
225+
226+
ListJobsRequest request = new ListJobsRequest().setExpandTasks(false);
227+
Iterable<BaseJob> jobsList = jobsExt.list(request);
228+
229+
List<BaseJob> expectedJobsList = new ArrayList<>();
230+
expectedJobsList.add(job1);
231+
expectedJobsList.add(job2);
232+
expectedJobsList.add(job3);
233+
expectedJobsList.add(job4);
234+
for (BaseJob job : jobsList) {
235+
BaseJob expectedJob =
236+
expectedJobsList.stream()
237+
.filter(e -> e.getJobId().equals(job.getJobId()))
238+
.findFirst()
239+
.orElse(null);
240+
assertEquals(expectedJob, job);
241+
}
242+
verify(service, times(0)).get(any());
243+
}
244+
245+
@Test
246+
public void testListJobs() {
247+
JobsService service = Mockito.mock(JobsService.class);
248+
BaseJob job1 =
249+
new BaseJob()
250+
.setJobId(100L)
251+
.setSettings(new JobSettings().setName("job1"))
252+
.setHasMore(true);
253+
addTasks(job1, "job1_taskKey1", "job1_taskKey2");
254+
BaseJob job2 =
255+
new BaseJob()
256+
.setJobId(200L)
257+
.setSettings(new JobSettings().setName("job2"))
258+
.setHasMore(true);
259+
addTasks(job2, "job2_taskKey1", "job2_taskKey2");
260+
BaseJob job3 = new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
261+
addTasks(job3, "job3_taskKey1", "job3_taskKey2");
262+
263+
Job getJob1_page1 =
264+
new Job()
265+
.setJobId(100L)
266+
.setNextPageToken("job1_page2token")
267+
.setSettings(new JobSettings().setName("job1"));
268+
addTasks(getJob1_page1, "job1_taskKey1", "job1_taskKey2");
269+
Job getJob1_page2 = new Job().setJobId(100L).setSettings(new JobSettings().setName("job1"));
270+
addTasks(getJob1_page2, "job1_taskKey3", "job1_taskKey4");
271+
272+
Job getJob2_page1 =
273+
new Job()
274+
.setJobId(200L)
275+
.setNextPageToken("job2_page2token")
276+
.setSettings(new JobSettings().setName("job2"));
277+
addTasks(getJob2_page1, "job2_taskKey1", "job2_taskKey2");
278+
Job getJob2_page2 = new Job().setJobId(200L).setSettings(new JobSettings().setName("job2"));
279+
addTasks(getJob2_page2, "job2_taskKey3", "job2_taskKey4");
280+
281+
doReturn(getJob1_page1)
282+
.when(service)
283+
.get(
284+
argThat(
285+
request ->
286+
request != null
287+
&& request.getJobId() == 100L
288+
&& request.getPageToken() == null));
289+
doReturn(getJob1_page2)
290+
.when(service)
291+
.get(
292+
argThat(
293+
request ->
294+
request != null
295+
&& request.getJobId() == 100L
296+
&& "job1_page2token".equals(request.getPageToken())));
297+
doReturn(getJob2_page1)
298+
.when(service)
299+
.get(
300+
argThat(
301+
request ->
302+
request != null
303+
&& request.getJobId() == 200L
304+
&& request.getPageToken() == null));
305+
doReturn(getJob2_page2)
306+
.when(service)
307+
.get(
308+
argThat(
309+
request ->
310+
request != null
311+
&& request.getJobId() == 200L
312+
&& "job2_page2token".equals(request.getPageToken())));
313+
314+
List<BaseJob> jobsOnFirstPage = new ArrayList<>();
315+
jobsOnFirstPage.add(job1);
316+
jobsOnFirstPage.add(job2);
317+
ListJobsResponse responsePage1 =
318+
new ListJobsResponse().setJobs(jobsOnFirstPage).setNextPageToken("page2token");
319+
List<BaseJob> jobsOnSecondPage = new ArrayList<>();
320+
jobsOnSecondPage.add(job3);
321+
ListJobsResponse responsePage2 = new ListJobsResponse().setJobs(jobsOnSecondPage);
322+
323+
JobsExt jobsExt = new JobsExt(service);
324+
when(service.list(any())).thenReturn(responsePage1).thenReturn(responsePage2);
325+
326+
ListJobsRequest request = new ListJobsRequest().setExpandTasks(true);
327+
Iterable<BaseJob> jobsList = jobsExt.list(request);
328+
329+
BaseJob expectedJob1 =
330+
new BaseJob().setJobId(100L).setSettings(new JobSettings().setName("job1"));
331+
addTasks(expectedJob1, "job1_taskKey1", "job1_taskKey2", "job1_taskKey3", "job1_taskKey4");
332+
BaseJob expectedJob2 =
333+
new BaseJob().setJobId(200L).setSettings(new JobSettings().setName("job2"));
334+
addTasks(expectedJob2, "job2_taskKey1", "job2_taskKey2", "job2_taskKey3", "job2_taskKey4");
335+
BaseJob expectedJob3 =
336+
new BaseJob().setJobId(300L).setSettings(new JobSettings().setName("job3"));
337+
addTasks(expectedJob3, "job3_taskKey1", "job3_taskKey2");
338+
List<BaseJob> expectedJobsList = new ArrayList<>();
339+
expectedJobsList.add(expectedJob1);
340+
expectedJobsList.add(expectedJob2);
341+
expectedJobsList.add(expectedJob3);
342+
for (BaseJob job : jobsList) {
343+
BaseJob expectedJob =
344+
expectedJobsList.stream()
345+
.filter(e -> e.getJobId().equals(job.getJobId()))
346+
.findFirst()
347+
.orElse(null);
348+
assertEquals(expectedJob, job);
349+
assertNull(expectedJob.getHasMore());
350+
}
351+
// 2 getRun calls for job1, 2 getRun calls for job2, 0 getRun call for job3
352+
verify(service, times(4)).get(any());
353+
}
354+
355+
private void addTasks(BaseJob job, String... taskKeys) {
356+
Collection<Task> tasks = new ArrayList<>();
357+
for (String taskKey : taskKeys) {
358+
tasks.add(new Task().setTaskKey(taskKey));
359+
}
360+
job.getSettings().setTasks(tasks);
361+
}
205362
}

0 commit comments

Comments
 (0)