|
17 | 17 | */ |
18 | 18 | package org.apache.beam.it.gcp; |
19 | 19 |
|
20 | | -import static java.nio.charset.StandardCharsets.UTF_8; |
21 | | - |
22 | | -import com.fasterxml.jackson.databind.JsonNode; |
23 | | -import com.fasterxml.jackson.databind.ObjectMapper; |
24 | 20 | import com.google.api.gax.core.CredentialsProvider; |
25 | 21 | import com.google.api.gax.core.FixedCredentialsProvider; |
26 | 22 | import com.google.api.services.dataflow.model.Job; |
|
35 | 31 | import com.google.cloud.teleport.metadata.TemplateCreationParameters; |
36 | 32 | import com.google.cloud.teleport.metadata.TemplateIntegrationTest; |
37 | 33 | import com.google.cloud.teleport.metadata.util.MetadataUtils; |
38 | | -import com.google.common.collect.ImmutableList; |
39 | 34 | import java.io.File; |
40 | 35 | import java.io.IOException; |
41 | | -import java.io.Reader; |
42 | 36 | import java.lang.reflect.Method; |
43 | | -import java.nio.channels.Channels; |
44 | | -import java.nio.channels.ReadableByteChannel; |
45 | 37 | import java.text.SimpleDateFormat; |
46 | 38 | import java.time.Duration; |
47 | 39 | import java.util.ArrayList; |
|
51 | 43 | import java.util.Date; |
52 | 44 | import java.util.List; |
53 | 45 | import java.util.Random; |
54 | | -import java.util.concurrent.ConcurrentHashMap; |
55 | 46 | import java.util.concurrent.ExecutionException; |
56 | 47 | import org.apache.beam.it.common.PipelineLauncher; |
57 | 48 | import org.apache.beam.it.common.PipelineLauncher.JobState; |
|
67 | 58 | import org.apache.beam.it.gcp.dataflow.DirectRunnerClient; |
68 | 59 | import org.apache.beam.it.gcp.dataflow.FlexTemplateClient; |
69 | 60 | import org.apache.beam.it.gcp.storage.GcsResourceManager; |
70 | | -import org.apache.beam.sdk.io.FileSystems; |
71 | | -import org.apache.beam.sdk.io.fs.MatchResult; |
72 | | -import org.apache.beam.sdk.io.fs.ResourceId; |
73 | | -import org.apache.beam.sdk.options.PipelineOptionsFactory; |
74 | | -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.CharStreams; |
| 61 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache; |
| 62 | +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder; |
75 | 63 | import org.apache.commons.lang3.RandomStringUtils; |
76 | 64 | import org.junit.After; |
77 | 65 | import org.junit.Before; |
@@ -128,12 +116,7 @@ protected void starting(Description description) { |
128 | 116 | protected String testId; |
129 | 117 |
|
130 | 118 | /** Cache to avoid staging the same template multiple times on the same execution. */ |
131 | | - private static final ConcurrentHashMap<String, String> stagedTemplates = |
132 | | - new ConcurrentHashMap<>(); |
133 | | - |
134 | | - static { |
135 | | - Runtime.getRuntime().addShutdownHook(new Thread(TemplateTestBase::cleanUpTemplates)); |
136 | | - } |
| 119 | + private static final Cache<String, String> stagedTemplates = CacheBuilder.newBuilder().build(); |
137 | 120 |
|
138 | 121 | // Template metadata used only for single template tests specified via @TemplateIntegrationTest. |
139 | 122 | protected Template template; |
@@ -292,10 +275,10 @@ private String getSpecPath( |
292 | 275 | LOG.info("A spec path was given, not staging template {}", templateMetadata.name()); |
293 | 276 | return TestProperties.specPath(); |
294 | 277 | } else { |
295 | | - return stagedTemplates.computeIfAbsent( |
| 278 | + return stagedTemplates.get( |
296 | 279 | templateMetadata.name(), |
297 | | - (name) -> { |
298 | | - LOG.info("Preparing test for {} ({})", name, dataflowTemplateClass); |
| 280 | + () -> { |
| 281 | + LOG.info("Preparing test for {} ({})", templateMetadata.name(), dataflowTemplateClass); |
299 | 282 |
|
300 | 283 | String prefix = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss").format(new Date()) + "_IT"; |
301 | 284 |
|
@@ -337,7 +320,9 @@ private String getSpecPath( |
337 | 320 | templateMetadata.flexContainerName() != null |
338 | 321 | && !templateMetadata.flexContainerName().isEmpty(); |
339 | 322 | return String.format( |
340 | | - "gs://%s/%s/%s%s", bucketName, prefix, flex ? "flex/" : "", name); |
| 323 | + "gs://%s/%s/%s%s", |
| 324 | + bucketName, prefix, flex ? "flex/" : "", templateMetadata.name()); |
| 325 | + |
341 | 326 | } catch (Exception e) { |
342 | 327 | throw new IllegalArgumentException("Error staging template", e); |
343 | 328 | } |
@@ -638,13 +623,13 @@ protected String getGcsPath(String artifactId, GcsResourceManager gcsResourceMan |
638 | 623 | artifactId); |
639 | 624 | } |
640 | 625 |
|
641 | | - /** Create the default configuration {@link Config} for a specific job info. */ |
642 | | - protected Config createConfig(LaunchInfo info) { |
| 626 | + /** Create the default configuration {@link PipelineOperator.Config} for a specific job info. */ |
| 627 | + protected PipelineOperator.Config createConfig(LaunchInfo info) { |
643 | 628 | return createConfig(info, Duration.ofMinutes(45)); |
644 | 629 | } |
645 | 630 |
|
646 | | - /** Create the default configuration {@link Config} for a specific job info. */ |
647 | | - protected Config createConfig(LaunchInfo info, Duration duration) { |
| 631 | + /** Create the default configuration {@link PipelineOperator.Config} for a specific job info. */ |
| 632 | + protected PipelineOperator.Config createConfig(LaunchInfo info, Duration duration) { |
648 | 633 | Config.Builder configBuilder = |
649 | 634 | Config.builder().setJobId(info.jobId()).setProject(PROJECT).setRegion(REGION); |
650 | 635 |
|
@@ -765,57 +750,4 @@ public void run() { |
765 | 750 | } |
766 | 751 | } |
767 | 752 | } |
768 | | - |
769 | | - private static void cleanUpTemplates() { |
770 | | - FileSystems.registerFileSystemsOnce(PipelineOptionsFactory.create()); |
771 | | - ObjectMapper mapper = new ObjectMapper(); |
772 | | - |
773 | | - // clean up staged templates after all tests run |
774 | | - for (String metafileName : stagedTemplates.values()) { |
775 | | - try { |
776 | | - MatchResult result = FileSystems.match(metafileName); |
777 | | - if (result.metadata().size() != 1) { |
778 | | - continue; |
779 | | - } |
780 | | - ResourceId rid = result.metadata().get(0).resourceId(); |
781 | | - // for flex template, also clean up staged image |
782 | | - if (metafileName.contains("/flex/")) { |
783 | | - String raw; |
784 | | - try (ReadableByteChannel channel = FileSystems.open(rid)) { |
785 | | - Reader reader = Channels.newReader(channel, UTF_8); |
786 | | - raw = CharStreams.toString(reader); |
787 | | - } |
788 | | - JsonNode parsed = mapper.readTree(raw); |
789 | | - JsonNode valueNode = parsed.get("image"); |
790 | | - |
791 | | - // Check if the key exists and retrieve its text value |
792 | | - if (valueNode != null) { |
793 | | - String imgName = valueNode.asText(); |
794 | | - if (!imgName.contains(":")) { |
795 | | - imgName = imgName + ":latest"; |
796 | | - } |
797 | | - String[] cmd = null; |
798 | | - if (imgName.contains("gcr.io")) { |
799 | | - cmd = new String[] {"gcloud", "container", "images", "delete", "-q", imgName}; |
800 | | - } else if (imgName.contains("pkg.dev")) { |
801 | | - cmd = |
802 | | - new String[] {"gcloud", "artifacts", "docker", "images", "delete", "-q", imgName}; |
803 | | - } |
804 | | - if (cmd != null) { |
805 | | - Process exec = Runtime.getRuntime().exec(cmd); |
806 | | - if (exec.waitFor() != 0) { |
807 | | - LOG.warn("Error deleting staged image {}", imgName); |
808 | | - } |
809 | | - } |
810 | | - } else { |
811 | | - LOG.warn("Error during clean up staged template: unable to find image from metadata"); |
812 | | - } |
813 | | - } |
814 | | - FileSystems.delete(ImmutableList.of(rid)); |
815 | | - } catch (Exception e) { |
816 | | - LOG.warn("Error during clean up staged template.", e); |
817 | | - } |
818 | | - } |
819 | | - stagedTemplates.clear(); |
820 | | - } |
821 | 753 | } |
0 commit comments