Skip to content

Commit a35a635

Browse files
committed
DynamicDestinationsHelper.ConstantTimePartitioningClusteringDestinations is parsing per element json configuration for partitioning and clustering which is expensive. Cache the outcome of evaluation so it's done once.
1 parent ece2beb commit a35a635

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/DynamicDestinationsHelpers.java

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import java.io.IOException;
3131
import java.util.List;
3232
import java.util.Map;
33-
import java.util.Optional;
33+
import java.util.concurrent.atomic.AtomicBoolean;
3434
import org.apache.beam.sdk.coders.CannotProvideCoderException;
3535
import org.apache.beam.sdk.coders.Coder;
3636
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -279,11 +279,17 @@ static class ConstantTimePartitioningClusteringDestinations<T>
279279
private final @Nullable ValueProvider<String> jsonTimePartitioning;
280280
private final @Nullable ValueProvider<String> jsonClustering;
281281

282+
// Lazily initialized and cached values.
283+
private transient @Nullable String evaluatedPartitioning = null;
284+
private transient @Nullable String evaluatedClustering = null;
285+
private transient AtomicBoolean initialized = null;
286+
282287
ConstantTimePartitioningClusteringDestinations(
283288
DynamicDestinations<T, TableDestination> inner,
284289
ValueProvider<String> jsonTimePartitioning,
285290
ValueProvider<String> jsonClustering) {
286291
super(inner);
292+
initialized = new AtomicBoolean(false);
287293

288294
checkArgument(
289295
(jsonTimePartitioning != null
@@ -299,19 +305,36 @@ static class ConstantTimePartitioningClusteringDestinations<T>
299305
this.jsonClustering = jsonClustering;
300306
}
301307

308+
private synchronized void evaluateOncePartitioningAndClustering() {
309+
if (initialized.get()) {
310+
return;
311+
}
312+
if (jsonTimePartitioning != null) {
313+
String partitioning = jsonTimePartitioning.get();
314+
if (partitioning != null
315+
&& !JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) {
316+
this.evaluatedPartitioning = partitioning;
317+
}
318+
}
319+
if (jsonClustering != null) {
320+
String clustering = jsonClustering.get();
321+
if (clustering != null && !JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
322+
this.evaluatedClustering = clustering;
323+
}
324+
}
325+
initialized.set(true);
326+
}
327+
302328
@Override
303329
public TableDestination getDestination(@Nullable ValueInSingleWindow<T> element) {
330+
if (!initialized.get()) {
331+
evaluateOncePartitioningAndClustering();
332+
}
304333
TableDestination destination = super.getDestination(element);
305334
String partitioning =
306-
Optional.ofNullable(jsonTimePartitioning).map(ValueProvider::get).orElse(null);
307-
if (partitioning == null
308-
|| JsonParser.parseString(partitioning).getAsJsonObject().isEmpty()) {
309-
partitioning = destination.getJsonTimePartitioning();
310-
}
311-
String clustering = Optional.ofNullable(jsonClustering).map(ValueProvider::get).orElse(null);
312-
if (clustering == null || JsonParser.parseString(clustering).getAsJsonObject().isEmpty()) {
313-
clustering = destination.getJsonClustering();
314-
}
335+
MoreObjects.firstNonNull(evaluatedPartitioning, destination.getJsonTimePartitioning());
336+
String clustering =
337+
MoreObjects.firstNonNull(evaluatedClustering, destination.getJsonClustering());
315338

316339
return new TableDestination(
317340
destination.getTableSpec(), destination.getTableDescription(), partitioning, clustering);

0 commit comments

Comments
 (0)