Skip to content

Commit 3215790

Browse files
darshan-sjever
authored andcommitted
Correcting Dataflow jobs resource leaks (GoogleCloudPlatform#3381)
1 parent 23ba905 commit 3215790

File tree

10 files changed

+13
-29
lines changed

10 files changed

+13
-29
lines changed

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -543,26 +543,10 @@ protected PipelineLauncher buildLauncher(
543543
* jobs getting leaked.
544544
*/
545545
protected LaunchInfo launchTemplate(LaunchConfig.Builder options) throws IOException {
546-
return this.launchTemplate(options, true, this.template);
546+
return this.launchTemplate(options, this.template);
547547
}
548548

549-
/**
550-
* Launch the template with the given options and configuration for hook.
551-
*
552-
* @param options Options to use for launch.
553-
* @param setupShutdownHook Whether should setup a hook to cancel the job upon VM termination.
554-
* This is useful to teardown resources if the VM/test terminates unexpectedly.
555-
* @return Job details.
556-
* @throws IOException Thrown when {@link PipelineLauncher#launch(String, String, LaunchConfig)}
557-
* fails.
558-
*/
559-
protected LaunchInfo launchTemplate(LaunchConfig.Builder options, boolean setupShutdownHook)
560-
throws IOException {
561-
return this.launchTemplate(options, setupShutdownHook, this.template);
562-
}
563-
564-
protected LaunchInfo launchTemplate(
565-
LaunchConfig.Builder options, boolean setupShutdownHook, Template templateMetadata)
549+
protected LaunchInfo launchTemplate(LaunchConfig.Builder options, Template templateMetadata)
566550
throws IOException {
567551

568552
boolean flex =
@@ -638,8 +622,8 @@ protected LaunchInfo launchTemplate(
638622

639623
LaunchInfo launchInfo = pipelineLauncher.launch(PROJECT, REGION, options.build());
640624

641-
// if the launch succeeded and setupShutdownHook is enabled, setup a thread to cancel job
642-
if (setupShutdownHook && launchInfo.jobId() != null && !launchInfo.jobId().isEmpty()) {
625+
// if the launch succeeded, setup a thread to cancel job
626+
if (launchInfo.jobId() != null && !launchInfo.jobId().isEmpty()) {
643627
Runtime.getRuntime()
644628
.addShutdownHook(new Thread(new CancelJobShutdownHook(pipelineLauncher, launchInfo)));
645629
}

v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ protected LaunchInfo launchDataflowJob(
417417

418418
// Run
419419
LOG.info("Launching Dataflow job with parameters: {}", params);
420-
LaunchInfo jobInfo = launchTemplate(options, false);
420+
LaunchInfo jobInfo = launchTemplate(options);
421421
assertThatPipeline(jobInfo).isRunning();
422422

423423
return jobInfo;

v2/gcs-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/GCSToSourceDbInterleaveIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -382,7 +382,7 @@ private void launchWriterDataflowJob() throws IOException {
382382
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
383383
options.setParameters(params);
384384
// Run
385-
writerJobInfo = launchTemplate(options, false);
385+
writerJobInfo = launchTemplate(options);
386386
}
387387

388388
private void launchReaderDataflowJob() throws IOException {

v2/gcs-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/GCSToSourceDbInterleaveMultiShardIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ private void launchWriterDataflowJob() throws IOException {
419419
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
420420
options.setParameters(params);
421421
// Run
422-
writerJobInfo = launchTemplate(options, false);
422+
writerJobInfo = launchTemplate(options);
423423
}
424424

425425
private void launchReaderDataflowJob() throws IOException {

v2/gcs-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/GCSToSourceDbWithReaderIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ private void launchWriterDataflowJob() throws IOException {
212212
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
213213
options.setParameters(params);
214214
// Run
215-
writerJobInfo = launchTemplate(options, false);
215+
writerJobInfo = launchTemplate(options);
216216
}
217217

218218
private void launchReaderDataflowJob() throws IOException {

v2/gcs-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/GCSToSourceDbWithoutReaderIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private void launchWriterDataflowJob(CustomTransformation customTransformation)
279279
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
280280
options.setParameters(params);
281281
// Run
282-
jobInfo = launchTemplate(options, false);
282+
jobInfo = launchTemplate(options);
283283
}
284284

285285
private void createAndUploadShardConfigToGcs(

v2/gcs-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/TimezoneIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ private void launchWriterDataflowJob() throws IOException {
244244
LaunchConfig.Builder options = LaunchConfig.builder(jobName, specPath);
245245
options.setParameters(params);
246246
// Run
247-
writerJobInfo = launchTemplate(options, false);
247+
writerJobInfo = launchTemplate(options);
248248
}
249249

250250
private void launchReaderDataflowJob() throws IOException {

v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ protected PipelineLauncher.LaunchInfo launchDataflowJob(
274274
options.addEnvironment("numWorkers", 2);
275275
options.addEnvironment("ipConfiguration", "WORKER_IP_PRIVATE");
276276
// Run
277-
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options, false);
277+
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options);
278278
assertThatPipeline(jobInfo).isRunning();
279279

280280
return jobInfo;

v2/spanner-change-streams-to-sharded-file-sink/src/test/java/com/google/cloud/teleport/v2/templates/SpannerChangeStreamToGcsITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ public PipelineLauncher.LaunchInfo launchReaderDataflowJob(
176176
options.setParameters(params);
177177
options.addEnvironment("additionalExperiments", Collections.singletonList("use_runner_v2"));
178178
// Run
179-
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options, false);
179+
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options);
180180
assertThatPipeline(jobInfo).isRunning();
181181
return jobInfo;
182182
}

v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbITBase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ public PipelineLauncher.LaunchInfo launchDataflowJob(
275275
options.addEnvironment("additionalExperiments", Collections.singletonList("use_runner_v2"));
276276
options.addEnvironment("ipConfiguration", "WORKER_IP_PRIVATE");
277277
// Run
278-
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options, false);
278+
PipelineLauncher.LaunchInfo jobInfo = launchTemplate(options);
279279
assertThatPipeline(jobInfo).isRunning();
280280
return jobInfo;
281281
}

0 commit comments

Comments
 (0)