3030import java .io .IOException ;
3131import java .util .List ;
3232import java .util .Map ;
33- import java .util .Optional ;
33+ import java .util .concurrent . atomic . AtomicBoolean ;
3434import org .apache .beam .sdk .coders .CannotProvideCoderException ;
3535import org .apache .beam .sdk .coders .Coder ;
3636import org .apache .beam .sdk .coders .CoderRegistry ;
@@ -279,6 +279,11 @@ static class ConstantTimePartitioningClusteringDestinations<T>
279279 private final @ Nullable ValueProvider <String > jsonTimePartitioning ;
280280 private final @ Nullable ValueProvider <String > jsonClustering ;
281281
282+ // Lazily initialized and cached values.
283+ private @ Nullable String evaluatedPartitioning = null ;
284+ private @ Nullable String evaluatedClustering = null ;
285+ private final AtomicBoolean initialized = new AtomicBoolean (false );
286+
282287 ConstantTimePartitioningClusteringDestinations (
283288 DynamicDestinations <T , TableDestination > inner ,
284289 ValueProvider <String > jsonTimePartitioning ,
@@ -299,19 +304,42 @@ static class ConstantTimePartitioningClusteringDestinations<T>
299304 this .jsonClustering = jsonClustering ;
300305 }
301306
307+ boolean isJsonConfigPresent (String json ) {
308+ return json != null && !JsonParser .parseString (json ).getAsJsonObject ().isEmpty ();
309+ }
310+
311+ private synchronized void evaluateOncePartitioningAndClustering () {
312+ if (initialized .get ()) {
313+ return ;
314+ }
315+ if (jsonTimePartitioning != null ) {
316+ String partitioning = jsonTimePartitioning .get ();
317+ if (isJsonConfigPresent (partitioning )) {
318+ this .evaluatedPartitioning = partitioning ;
319+ }
320+ }
321+ if (jsonClustering != null ) {
322+ String clustering = jsonClustering .get ();
323+ if (isJsonConfigPresent (clustering )) {
324+ this .evaluatedClustering = clustering ;
325+ }
326+ }
327+ initialized .set (true );
328+ }
329+
302330 @ Override
303331 public TableDestination getDestination (@ Nullable ValueInSingleWindow <T > element ) {
332+ if (!initialized .get ()) {
333+ evaluateOncePartitioningAndClustering ();
334+ }
304335 TableDestination destination = super .getDestination (element );
336+
305337 String partitioning =
306- Optional .ofNullable (jsonTimePartitioning ).map (ValueProvider ::get ).orElse (null );
307- if (partitioning == null
308- || JsonParser .parseString (partitioning ).getAsJsonObject ().isEmpty ()) {
309- partitioning = destination .getJsonTimePartitioning ();
310- }
311- String clustering = Optional .ofNullable (jsonClustering ).map (ValueProvider ::get ).orElse (null );
312- if (clustering == null || JsonParser .parseString (clustering ).getAsJsonObject ().isEmpty ()) {
313- clustering = destination .getJsonClustering ();
314- }
338+ evaluatedPartitioning != null
339+ ? evaluatedPartitioning
340+ : destination .getJsonTimePartitioning ();
341+ String clustering =
342+ evaluatedClustering != null ? evaluatedClustering : destination .getJsonClustering ();
315343
316344 return new TableDestination (
317345 destination .getTableSpec (), destination .getTableDescription (), partitioning , clustering );
0 commit comments