diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java index eed4314e3913..52b5b954a095 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java @@ -30,7 +30,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderRegistry; @@ -279,6 +279,11 @@ static class ConstantTimePartitioningClusteringDestinations private final @Nullable ValueProvider jsonTimePartitioning; private final @Nullable ValueProvider jsonClustering; + // Lazily initialized and cached values. + private @Nullable String evaluatedPartitioning = null; + private @Nullable String evaluatedClustering = null; + private final AtomicBoolean initialized = new AtomicBoolean(false); + ConstantTimePartitioningClusteringDestinations( DynamicDestinations inner, ValueProvider jsonTimePartitioning, @@ -299,19 +304,41 @@ static class ConstantTimePartitioningClusteringDestinations this.jsonClustering = jsonClustering; } + static boolean isJsonConfigPresent(ValueProvider json) { + String jsonValue = json.get(); + return jsonValue != null && !JsonParser.parseString(jsonValue).getAsJsonObject().isEmpty(); + } + + private synchronized void evaluateOncePartitioningAndClustering() { + if (initialized.get()) { + return; + } + if (jsonTimePartitioning != null) { + if (isJsonConfigPresent(jsonTimePartitioning)) { + this.evaluatedPartitioning = jsonTimePartitioning.get(); + } + } + if (jsonClustering != null) { + if (isJsonConfigPresent(jsonClustering)) { + this.evaluatedClustering = jsonClustering.get(); + } + } + initialized.set(true); + } + @Override public TableDestination getDestination(@Nullable ValueInSingleWindow element) { + if (!initialized.get()) { + evaluateOncePartitioningAndClustering(); + } TableDestination destination = super.getDestination(element); + String partitioning = - Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null); - if (partitioning == null - || JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) { - partitioning = destination.getJsonTimePartitioning(); - } - String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null); - if (clustering == null || JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) { - clustering = destination.getJsonClustering(); - } + evaluatedPartitioning != null + ? evaluatedPartitioning + : destination.getJsonTimePartitioning(); + String clustering = + evaluatedClustering != null ? evaluatedClustering : destination.getJsonClustering(); return new TableDestination( destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering);