Skip to content

Commit 3b62ba0

Browse files
committed
Templates share image
* Support ContainerStageTracker for Java and XLang Templates
1 parent 7be9f75 commit 3b62ba0

File tree

39 files changed

+385
-242
lines changed

39 files changed

+385
-242
lines changed

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/TemplateTestBase.java

Lines changed: 58 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import com.google.api.services.dataflow.model.Job;
2727
import com.google.auth.Credentials;
2828
import com.google.cloud.bigquery.TableId;
29+
import com.google.cloud.storage.Blob;
30+
import com.google.cloud.storage.Storage;
2931
import com.google.cloud.teleport.metadata.DirectRunnerTest;
3032
import com.google.cloud.teleport.metadata.MultiTemplateIntegrationTest;
3133
import com.google.cloud.teleport.metadata.SkipRunnerV2Test;
@@ -52,6 +54,7 @@
5254
import java.util.Date;
5355
import java.util.List;
5456
import java.util.Random;
57+
import java.util.UUID;
5558
import java.util.concurrent.ExecutionException;
5659
import org.apache.beam.it.common.PipelineLauncher;
5760
import org.apache.beam.it.common.PipelineLauncher.JobState;
@@ -75,6 +78,7 @@
7578
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
7679
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.RemovalNotification;
7780
import org.apache.commons.lang3.RandomStringUtils;
81+
import org.apache.parquet.Strings;
7882
import org.checkerframework.checker.nullness.qual.Nullable;
7983
import org.junit.After;
8084
import org.junit.Before;
@@ -142,7 +146,14 @@ protected void starting(Description description) {
142146
})
143147
.build();
144148

149+
public static final String STAGING_PREFIX;
150+
145151
static {
152+
STAGING_PREFIX =
153+
new SimpleDateFormat("yyyy-MM-dd-HH-mm").format(new Date())
154+
+ "-"
155+
+ UUID.randomUUID().toString().substring(0, 6)
156+
+ "_IT";
146157
Runtime.getRuntime().addShutdownHook(new Thread(stagedTemplates::invalidateAll));
147158
}
148159

@@ -303,36 +314,53 @@ private String getSpecPath(
303314
LOG.info("A spec path was given, not staging template {}", templateMetadata.name());
304315
return TestProperties.specPath();
305316
} else {
306-
return stagedTemplates.get(
307-
templateMetadata.name(),
317+
boolean flex = !Strings.isNullOrEmpty(templateMetadata.flexContainerName());
318+
319+
// Use bucketName unless only artifactBucket is provided
320+
String bucketName;
321+
if (TestProperties.hasStageBucket()) {
322+
bucketName = TestProperties.stageBucket();
323+
} else if (TestProperties.hasArtifactBucket()) {
324+
bucketName = TestProperties.artifactBucket();
325+
LOG.warn(
326+
"-DstageBucket was not specified, using -DartifactBucket ({}) for stage step",
327+
bucketName);
328+
} else {
329+
throw new IllegalArgumentException(
330+
"-DstageBucket was not specified, so Template can not be staged. Either give a"
331+
+ " -DspecPath or provide a proper -DstageBucket for automatic staging.");
332+
}
333+
334+
String blobPath =
335+
String.format("%s/%s%s", STAGING_PREFIX, flex ? "flex/" : "", templateMetadata.name());
336+
String stagePath = String.format("gs://%s/%s", bucketName, blobPath);
337+
338+
String identifier = flex ? template.flexContainerName() : templateMetadata.name();
339+
340+
stagedTemplates.get(
341+
identifier,
308342
() -> {
309343
LOG.info("Preparing test for {} ({})", templateMetadata.name(), dataflowTemplateClass);
310344

311-
String prefix = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()) + "_IT";
312-
313345
File pom = new File(pomPath).getAbsoluteFile();
314346
if (!pom.exists()) {
315347
throw new IllegalArgumentException(
316348
"To use tests staging templates, please run in the Maven module directory"
317349
+ " containing the template.");
318350
}
319351

320-
// Use bucketName unless only artifactBucket is provided
321-
String bucketName;
322-
if (TestProperties.hasStageBucket()) {
323-
bucketName = TestProperties.stageBucket();
324-
} else if (TestProperties.hasArtifactBucket()) {
325-
bucketName = TestProperties.artifactBucket();
326-
LOG.warn(
327-
"-DstageBucket was not specified, using -DartifactBucket ({}) for stage step",
328-
bucketName);
329-
} else {
330-
throw new IllegalArgumentException(
331-
"-DstageBucket was not specified, so Template can not be staged. Either give a"
332-
+ " -DspecPath or provide a proper -DstageBucket for automatic staging.");
352+
// Check template metadata file existence
353+
try (Storage storage = ArtifactUtils.createStorageClient(credentials)) {
354+
Blob blob =
355+
storage.get(
356+
bucketName, blobPath, Storage.BlobGetOption.fields(Storage.BlobField.SIZE));
357+
if (blob != null && blob.exists() && blob.getSize() > 0) {
358+
LOG.info("Find templates at {}", stagePath);
359+
return stagePath;
360+
}
333361
}
334362

335-
String[] mavenCmd = buildMavenStageCommand(prefix, pom, bucketName, template);
363+
String[] mavenCmd = buildMavenStageCommand(STAGING_PREFIX, pom, bucketName, template);
336364
LOG.info("Running command to stage templates: {}", String.join(" ", mavenCmd));
337365

338366
try {
@@ -344,17 +372,12 @@ private String getSpecPath(
344372
throw new RuntimeException("Error staging template, check Maven logs.");
345373
}
346374

347-
boolean flex =
348-
templateMetadata.flexContainerName() != null
349-
&& !templateMetadata.flexContainerName().isEmpty();
350-
return String.format(
351-
"gs://%s/%s/%s%s",
352-
bucketName, prefix, flex ? "flex/" : "", templateMetadata.name());
353-
375+
return stagePath;
354376
} catch (Exception e) {
355377
throw new IllegalArgumentException("Error staging template", e);
356378
}
357379
});
380+
return stagePath;
358381
}
359382
}
360383

@@ -417,6 +440,14 @@ private String[] buildMavenStageCommand(
417440
// that will copy only the shaded jar to the docker image.
418441
boolean skipShade = templateMetadata.type() != TemplateType.XLANG;
419442

443+
String templateOrContainer;
444+
@Nullable String flexContainerName = templateMetadata.flexContainerName();
445+
if (Strings.isNullOrEmpty(flexContainerName)) {
446+
templateOrContainer = "-DtemplateName=" + templateMetadata.name();
447+
} else {
448+
templateOrContainer = "-DflexContainerName=" + flexContainerName;
449+
}
450+
420451
return new String[] {
421452
"mvn",
422453
"compile",
@@ -442,7 +473,7 @@ private String[] buildMavenStageCommand(
442473
"-DbucketName=" + bucketName,
443474
"-DgcpTempLocation=" + bucketName,
444475
"-DstagePrefix=" + prefix,
445-
"-DtemplateName=" + templateMetadata.name(),
476+
templateOrContainer,
446477
"-DunifiedWorker=" + System.getProperty("unifiedWorker"),
447478
// Print stacktrace when command fails
448479
"-e"
@@ -814,7 +845,7 @@ private static void cleanUpTemplates(String metafileName) {
814845
if (cmd != null) {
815846
Process exec = Runtime.getRuntime().exec(cmd);
816847
if (exec.waitFor() != 0) {
817-
LOG.warn("Error deleting staged image {}", imgName);
848+
LOG.warn("Error deleting staged image {}. It might already be deleted.", imgName);
818849
}
819850
}
820851
} else {

it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/dataflow/FlexTemplateDataflowJobResourceManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
import org.slf4j.Logger;
3737
import org.slf4j.LoggerFactory;
3838

39+
/**
40+
* Used when additional flex template is needed for integration tests (e.g. using another template
41+
* to generate data). For generic template integration test, Use TemplateTestBase's subclasses to
42+
* manage the templates.
43+
*/
3944
public class FlexTemplateDataflowJobResourceManager implements ResourceManager {
4045

4146
private static final Logger LOG =
@@ -49,6 +54,9 @@ public class FlexTemplateDataflowJobResourceManager implements ResourceManager {
4954
private static final String PROJECT = TestProperties.project();
5055
private static final String REGION = TestProperties.region();
5156
private static final Credentials CREDENTIALS = TestProperties.googleCredentials();
57+
// TODO(yathu): we should use TemplateTestBase.stagedTemplates to managed all staged templates
58+
// during workflow run.
59+
// Currently templates involved here get compiled and staged twice.
5260
private static Map<String, String> specPaths = new HashMap<>();
5361

5462
private FlexTemplateDataflowJobResourceManager(Builder builder) {
@@ -173,6 +181,11 @@ public FlexTemplateDataflowJobResourceManager build() {
173181
}
174182
}
175183

184+
// TODO(yathu) this method was forked and diverged from TemplateTestBase.buildAndStageTemplate,
185+
// causing involved
186+
// templates get compiled and staged twice. We should use TemplateTestBase.stagedTemplates to
187+
// managed all staged
188+
// templates during workflow run.
176189
private void buildAndStageTemplate(
177190
String templateName, String modulePath, String additionalMavenProfile) {
178191
LOG.info("Building and Staging {} template", templateName);

plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/DockerfileGenerator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
* includes Python, Yaml and Xlang templates
4242
*/
4343
public class DockerfileGenerator {
44-
44+
// TODO(DO NOT MERGE) - testing!
4545
public static final String BASE_CONTAINER_IMAGE =
46-
"gcr.io/dataflow-templates-base/java17-template-launcher-base-distroless:latest";
46+
"gcr.io/dataflow-build/yathu/java17-template-launcher-base-distroless:850175030";
4747
// Keep in sync with python version used in
4848
// https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/python/generate_dependencies.sh
4949
public static final String BASE_PYTHON_CONTAINER_IMAGE =

plugins/core-plugin/src/main/java/com/google/cloud/teleport/plugin/TemplateSpecsGenerator.java

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import java.io.File;
2727
import java.io.FileWriter;
2828
import java.io.IOException;
29-
import java.nio.file.Path;
3029
import java.util.List;
3130
import java.util.logging.Logger;
3231
import org.apache.commons.lang3.StringUtils;
@@ -121,14 +120,6 @@ public File saveMetadata(
121120
}
122121

123122
String imageName = templateDash.toLowerCase();
124-
if (StringUtils.isNotEmpty(templateAnnotation.flexContainerName())) {
125-
imageName = Path.of(templateAnnotation.flexContainerName()).getFileName().toString();
126-
}
127-
128-
if (!targetDirectory.exists()) {
129-
targetDirectory.mkdirs();
130-
}
131-
132123
File file = new File(targetDirectory, imageName + "-generated-metadata.json");
133124
LOG.info("Saving image spec metadata " + file.getAbsolutePath());
134125

plugins/core-plugin/src/test/java/com/google/cloud/teleport/plugin/TemplateSpecsGeneratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ public void saveMetadataNestedFlex() {
105105
assertNotNull(saveMetadata);
106106
assertTrue(saveMetadata.exists());
107107
assertEquals(
108-
saveMetadata.getPath(),
109-
outputFolder.toPath().resolve("AtoBNestedFlex-generated-metadata.json").toString());
108+
outputFolder.toPath().resolve("atobnestedflex-generated-metadata.json").toString(),
109+
saveMetadata.getPath());
110110
}
111111

112112
@Test

plugins/templates-maven-plugin/src/main/java/com/google/cloud/teleport/plugin/maven/TemplatesReleaseMojo.java

Lines changed: 49 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ public class TemplatesReleaseMojo extends TemplatesBaseMojo {
6767
@Parameter(defaultValue = "${templateName}", readonly = true, required = false)
6868
protected String templateName;
6969

70+
@Parameter(defaultValue = "${flexContainerName}", readonly = true, required = false)
71+
protected String flexContainerName;
72+
7073
@Parameter(defaultValue = "${bucketName}", readonly = true, required = true)
7174
protected String bucketName;
7275

@@ -202,61 +205,54 @@ public void execute() throws MojoExecutionException {
202205
"Stage Prefix must be informed for releases, when releasing templates or yaml blueprints.");
203206
}
204207

205-
if (!templateDefinitions.isEmpty()) {
206-
LOG.info("Found {} templates to release.", templateDefinitions.size());
207-
LOG.info("Trying to stage templates...");
208-
209-
for (TemplateDefinitions definition : templateDefinitions) {
210-
211-
ImageSpec imageSpec = definition.buildSpecModel(true);
212-
String currentTemplateName = imageSpec.getMetadata().getName();
213-
214-
LOG.info("Staging template {}...", currentTemplateName);
215-
216-
String useRegion = StringUtils.isNotEmpty(region) ? region : "us-central1";
217-
218-
// TODO: is there a better way to get the plugin on the _same project_?
219-
TemplatesStageMojo configuredMojo =
220-
new TemplatesStageMojo(
221-
project,
222-
session,
223-
outputDirectory,
224-
outputClassesDirectory,
225-
resourcesDirectory,
226-
targetDirectory,
227-
projectId,
228-
templateName,
229-
bucketName,
230-
librariesBucketName,
231-
stagePrefix,
232-
useRegion,
233-
artifactRegion,
234-
gcpTempLocation,
235-
baseContainerImage,
236-
basePythonContainerImage,
237-
pythonTemplateLauncherEntryPoint,
238-
javaTemplateLauncherEntryPoint,
239-
pythonVersion,
240-
beamVersion,
241-
artifactRegistry,
242-
stagingArtifactRegistry,
243-
unifiedWorker,
244-
generateSBOM);
245-
246-
String templatePath = configuredMojo.stageTemplate(definition, imageSpec, pluginManager);
247-
248-
if (!definition.getTemplateAnnotation().stageImageOnly()) {
249-
LOG.info("Template staged: {}", templatePath);
250-
251-
// Export the specs for collection
252-
generator.saveMetadata(definition, imageSpec.getMetadata(), targetDirectory);
253-
if (definition.isFlex()) {
254-
generator.saveImageSpec(definition, imageSpec, targetDirectory);
255-
}
208+
String useRegion = StringUtils.isNotEmpty(region) ? region : "us-central1";
209+
TemplatesStageMojo configuredMojo =
210+
new TemplatesStageMojo(
211+
project,
212+
session,
213+
outputDirectory,
214+
outputClassesDirectory,
215+
resourcesDirectory,
216+
targetDirectory,
217+
projectId,
218+
templateName,
219+
flexContainerName,
220+
bucketName,
221+
librariesBucketName,
222+
stagePrefix,
223+
useRegion,
224+
artifactRegion,
225+
gcpTempLocation,
226+
baseContainerImage,
227+
basePythonContainerImage,
228+
pythonTemplateLauncherEntryPoint,
229+
javaTemplateLauncherEntryPoint,
230+
pythonVersion,
231+
beamVersion,
232+
artifactRegistry,
233+
stagingArtifactRegistry,
234+
unifiedWorker,
235+
generateSBOM);
236+
configuredMojo.stageCommandSpecs(templateDefinitions);
237+
238+
for (TemplateDefinitions definition : templateDefinitions) {
239+
240+
ImageSpec imageSpec = definition.buildSpecModel(true);
241+
String currentTemplateName = imageSpec.getMetadata().getName();
242+
243+
LOG.info("Staging template {}...", currentTemplateName);
244+
245+
String templatePath = configuredMojo.stageTemplate(definition, imageSpec, pluginManager);
246+
247+
if (!definition.getTemplateAnnotation().stageImageOnly()) {
248+
LOG.info("Template staged: {}", templatePath);
249+
250+
// Export the specs for collection
251+
generator.saveMetadata(definition, imageSpec.getMetadata(), targetDirectory);
252+
if (definition.isFlex()) {
253+
generator.saveImageSpec(definition, imageSpec, targetDirectory);
256254
}
257255
}
258-
} else {
259-
LOG.warn("Did not find any templates to release in this module.");
260256
}
261257

262258
if (publishYamlBlueprints) {

plugins/templates-maven-plugin/src/main/java/com/google/cloud/teleport/plugin/maven/TemplatesRunMojo.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ public class TemplatesRunMojo extends TemplatesBaseMojo {
7474
@Parameter(defaultValue = "${templateName}", readonly = true, required = false)
7575
protected String templateName;
7676

77+
@Parameter(defaultValue = "${flexContainerName}", readonly = true, required = false)
78+
protected String flexContainerName;
79+
7780
@Parameter(defaultValue = "${bucketName}", readonly = true, required = true)
7881
protected String bucketName;
7982

@@ -203,6 +206,7 @@ public void execute() throws MojoExecutionException {
203206
targetDirectory,
204207
projectId,
205208
templateName,
209+
flexContainerName,
206210
bucketName,
207211
bucketName,
208212
stagePrefix,

0 commit comments

Comments
 (0)