Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,27 @@ public JobsExt(JobsService mock) {
*
* <p>Depending on the Jobs API version used under the hood, tasks or iteration runs retrieved by
* the initial request may be truncated due to high cardinalities. Truncation can happen for job
* runs over 100 task runs, as well as ForEach task runs with over 100 iteration runs. To avoid
* returning an incomplete {@code Run} object to the user, this method performs all the requests
* required to collect all task/iteration runs into a single {@code Run} object.
* runs with over 100 task runs, as well as ForEach task runs with over 100 iteration runs. To
* avoid returning an incomplete {@code Run} object to the user, this method performs all the
* requests required to collect all task/iteration runs into a single {@code Run} object.
*/
@Override
public Run getRun(GetRunRequest request) {
Run run = super.getRun(request);

/*
* fetch all additional pages (if any) and accumulate the result in a single response
*/

// When querying a Job run, a page token is returned when there are more than 100 tasks. No
// iterations are defined for a Job run. Therefore, the next page in the response only includes
// the next page of tasks.
// When querying a ForEach task run, a page token is returned when there are more than 100
// iterations. Only a single task is returned, corresponding to the ForEach task itself.
// Therefore, the client only reads the iterations from the next page and not the tasks.
Collection<RunTask> iterations = run.getIterations();
boolean paginatingIterations = iterations != null && !iterations.isEmpty();

Run currRun = run;
while (currRun.getNextPageToken() != null) {
request.setPageToken(currRun.getNextPageToken());
currRun = super.getRun(request);
// runs/get response includes next_page_token as long as there are more pages to fetch.
while (run.getNextPageToken() != null) {
request.setPageToken(run.getNextPageToken());
Run currRun = super.getRun(request);
if (paginatingIterations) {
Collection<RunTask> newIterations = currRun.getIterations();
if (newIterations != null) {
Expand All @@ -50,10 +52,23 @@ public Run getRun(GetRunRequest request) {
run.getTasks().addAll(newTasks);
}
}
}

// now that we've added all pages to the Run, the tokens are useless
run.setNextPageToken(null);
// Each new page of runs/get response includes the next page of the job_clusters,
// job_parameters, and repair history.
Collection<JobCluster> newClusters = currRun.getJobClusters();
if (newClusters != null) {
run.getJobClusters().addAll(newClusters);
}
Collection<JobParameter> newParameters = currRun.getJobParameters();
if (newParameters != null) {
run.getJobParameters().addAll(newParameters);
}
Collection<RepairHistoryItem> newRepairHistory = currRun.getRepairHistory();
if (newRepairHistory != null) {
run.getRepairHistory().addAll(newRepairHistory);
}
run.setNextPageToken(currRun.getNextPageToken());
}

return run;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,7 @@
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;

import com.databricks.sdk.service.jobs.GetRunRequest;
import com.databricks.sdk.service.jobs.JobsService;
import com.databricks.sdk.service.jobs.Run;
import com.databricks.sdk.service.jobs.RunTask;
import com.databricks.sdk.service.jobs.*;
import java.util.ArrayList;
import java.util.Collection;
import org.junit.jupiter.api.Test;
Expand All @@ -21,10 +18,15 @@ public void testGetRunPaginationWithTasks() {

Run firstPage = new Run().setNextPageToken("tokenToSecondPage");
addTasks(firstPage, 0L, 1L);
addJobClusters(firstPage, "clusterKey1", "clusterKey2");
addJobParameters(firstPage, "parameterKey1", "parameterKey2");
Run secondPage = new Run().setNextPageToken("tokenToThirdPage");
addTasks(secondPage, 2L, 3L);
addJobClusters(secondPage, "clusterKey3");
addJobParameters(secondPage, "parameterKey3", "parameterKey4");
Run thirdPage = new Run();
addTasks(thirdPage, 4L);
addJobParameters(thirdPage, "parameterKey5");

when(service.getRun(any())).thenReturn(firstPage).thenReturn(secondPage).thenReturn(thirdPage);

Expand All @@ -36,6 +38,14 @@ public void testGetRunPaginationWithTasks() {

Run expectedRun = new Run();
addTasks(expectedRun, 0L, 1L, 2L, 3L, 4L);
addJobClusters(expectedRun, "clusterKey1", "clusterKey2", "clusterKey3");
addJobParameters(
expectedRun,
"parameterKey1",
"parameterKey2",
"parameterKey3",
"parameterKey4",
"parameterKey5");

assertEquals(expectedRun, run);
verify(service, times(3)).getRun(any());
Expand Down Expand Up @@ -82,4 +92,20 @@ private void addIterations(Run run, long... iterationRunIds) {
}
run.setIterations(iterations);
}

private void addJobClusters(Run run, String... clusterKeys) {
Collection<JobCluster> clusters = new ArrayList<>();
for (String clusterKey : clusterKeys) {
clusters.add(new JobCluster().setJobClusterKey(clusterKey));
}
run.setJobClusters(clusters);
}

private void addJobParameters(Run run, String... parameterKeys) {
Collection<JobParameter> parameters = new ArrayList<>();
for (String parameterKey : parameterKeys) {
parameters.add(new JobParameter().setName(parameterKey).setValue(parameterKey));
}
run.setJobParameters(parameters);
}
}
Loading