Skip to content

Commit 2e313a9

Browse files
authored
added waitForConditionAndForceCancel (#35465)
1 parent 2ad4eea commit 2e313a9

File tree

5 files changed

+79
-0
lines changed

5 files changed

+79
-0
lines changed

it/common/src/main/java/org/apache/beam/it/common/PipelineLauncher.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,17 @@ List<JobMessage> listMessages(
427427
*/
428428
Job drainJob(String project, String region, String jobId) throws IOException;
429429

430+
/**
431+
* Force-cancels the given job.
432+
*
433+
* @param project the project that the job is running under
434+
* @param region the region that the job was launched in
435+
* @param jobId the id of the job to force-cancel
436+
* @throws IOException if there is an issue sending the request
437+
* @return Updated job instance
438+
*/
439+
Job forceCancelJob(String project, String region, String jobId) throws IOException;
440+
430441
/**
431442
* Get the specified metric of the given job.
432443
*

it/common/src/main/java/org/apache/beam/it/common/PipelineOperator.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ public Result waitForConditionAndCancel(Config config, Supplier<Boolean>... cond
141141
return waitForConditionAndExecute(config, conditionCheck, this::cancelJobAndFinish);
142142
}
143143

144+
/**
145+
* Similar to {@link #waitForConditionAndCancel(Config, Supplier...)} but force-cancels the job.
146+
*/
147+
public Result waitForConditionAndForceCancel(Config config, Supplier<Boolean>... conditionCheck)
148+
throws IOException {
149+
return waitForConditionAndExecute(config, conditionCheck, this::forceCancelJobAndFinish);
150+
}
151+
144152
private Result waitForConditionAndExecute(
145153
Config config,
146154
Supplier<Boolean>[] conditionCheck,
@@ -171,6 +179,12 @@ public Result cancelJobAndFinish(Config config) throws IOException {
171179
return waitUntilDone(config);
172180
}
173181

182+
/** Similar to {@link #cancelJobAndFinish(Config)} but force-cancels the job. */
183+
public Result forceCancelJobAndFinish(Config config) throws IOException {
184+
client.forceCancelJob(config.project(), config.region(), config.jobId());
185+
return waitUntilDone(config);
186+
}
187+
174188
private static Result finishOrTimeout(
175189
Config config, Supplier<Boolean>[] conditionCheck, Supplier<Boolean>... stopChecking) {
176190
Instant start = Instant.now();

it/common/src/test/java/org/apache/beam/it/common/PipelineOperatorTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -199,4 +199,38 @@ public void testFinishAfterConditionTimeout() throws IOException {
199199
verify(client).drainJob(any(), any(), any());
200200
assertThat(result).isEqualTo(Result.TIMEOUT);
201201
}
202+
203+
@Test
204+
public void testForceCancelAfterCondition() throws IOException {
205+
// Arrange
206+
AtomicInteger callCount = new AtomicInteger();
207+
int totalCalls = 3;
208+
Supplier<Boolean> checker = () -> callCount.incrementAndGet() >= totalCalls;
209+
210+
when(client.getJobStatus(any(), any(), any()))
211+
.thenReturn(JobState.RUNNING)
212+
.thenThrow(new IOException())
213+
.thenReturn(JobState.RUNNING)
214+
.thenReturn(JobState.CANCELLING)
215+
.thenReturn(JobState.CANCELLED);
216+
217+
// Act
218+
Result result =
219+
new PipelineOperator(client).waitForConditionAndForceCancel(DEFAULT_CONFIG, checker);
220+
221+
// Assert
222+
verify(client, atLeast(totalCalls))
223+
.getJobStatus(projectCaptor.capture(), regionCaptor.capture(), jobIdCaptor.capture());
224+
verify(client)
225+
.forceCancelJob(projectCaptor.capture(), regionCaptor.capture(), jobIdCaptor.capture());
226+
227+
Set<String> allProjects = new HashSet<>(projectCaptor.getAllValues());
228+
Set<String> allRegions = new HashSet<>(regionCaptor.getAllValues());
229+
Set<String> allJobIds = new HashSet<>(jobIdCaptor.getAllValues());
230+
231+
assertThat(allProjects).containsExactly(PROJECT);
232+
assertThat(allRegions).containsExactly(REGION);
233+
assertThat(allJobIds).containsExactly(JOB_ID);
234+
assertThat(result).isEqualTo(Result.CONDITION_MET);
235+
}
202236
}

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/AbstractPipelineLauncher.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,20 @@ public Job drainJob(String project, String region, String jobId) {
140140
client.projects().locations().jobs().update(project, region, jobId, job).execute());
141141
}
142142

143+
@Override
144+
public Job forceCancelJob(String project, String region, String jobId) throws IOException {
145+
LOG.info("Force-cancelling {} under {}", jobId, project);
146+
Job job =
147+
new Job()
148+
.setRequestedState(JobState.CANCELLED.toString())
149+
.setLabels(ImmutableMap.of("force_cancel_job", "true"));
150+
LOG.info("Sending job to update {}:\n{}", jobId, formatForLogging(job));
151+
return Failsafe.with(clientRetryPolicy())
152+
.get(
153+
() ->
154+
client.projects().locations().jobs().update(project, region, jobId, job).execute());
155+
}
156+
143157
@Override
144158
public @Nullable Double getMetric(String project, String region, String jobId, String metricName)
145159
throws IOException {

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/DirectRunnerClient.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ public Job drainJob(String project, String region, String jobId) {
135135
return cancelJob(project, region, jobId);
136136
}
137137

138+
@Override
139+
public Job forceCancelJob(String project, String region, String jobId) throws IOException {
140+
LOG.warn("Cannot force-cancel a direct runner job. Cancelling the job instead.");
141+
return cancelJob(project, region, jobId);
142+
}
143+
138144
@Override
139145
public Double getMetric(String project, String region, String jobId, String metricName) {
140146
return null;

0 commit comments

Comments
 (0)