3030import java .io .IOException ;
3131import java .util .List ;
3232import java .util .Map ;
33- import java .util .Optional ;
33+ import java .util .concurrent . atomic . AtomicBoolean ;
3434import org .apache .beam .sdk .coders .CannotProvideCoderException ;
3535import org .apache .beam .sdk .coders .Coder ;
3636import org .apache .beam .sdk .coders .CoderRegistry ;
@@ -279,6 +279,11 @@ static class ConstantTimePartitioningClusteringDestinations<T>
279279 private final @ Nullable ValueProvider <String > jsonTimePartitioning ;
280280 private final @ Nullable ValueProvider <String > jsonClustering ;
281281
282+ // Lazily initialized and cached values.
283+ private @ Nullable String evaluatedPartitioning = null ;
284+ private @ Nullable String evaluatedClustering = null ;
285+ private final AtomicBoolean initialized = new AtomicBoolean (false );
286+
282287 ConstantTimePartitioningClusteringDestinations (
283288 DynamicDestinations <T , TableDestination > inner ,
284289 ValueProvider <String > jsonTimePartitioning ,
@@ -299,19 +304,41 @@ static class ConstantTimePartitioningClusteringDestinations<T>
299304 this .jsonClustering = jsonClustering ;
300305 }
301306
307+ static boolean isJsonConfigPresent (ValueProvider <String > json ) {
308+ String jsonValue = json .get ();
309+ return jsonValue != null && !JsonParser .parseString (jsonValue ).getAsJsonObject ().isEmpty ();
310+ }
311+
312+ private synchronized void evaluateOncePartitioningAndClustering () {
313+ if (initialized .get ()) {
314+ return ;
315+ }
316+ if (jsonTimePartitioning != null ) {
317+ if (isJsonConfigPresent (jsonTimePartitioning )) {
318+ this .evaluatedPartitioning = jsonTimePartitioning .get ();
319+ }
320+ }
321+ if (jsonClustering != null ) {
322+ if (isJsonConfigPresent (jsonClustering )) {
323+ this .evaluatedClustering = jsonClustering .get ();
324+ }
325+ }
326+ initialized .set (true );
327+ }
328+
302329 @ Override
303330 public TableDestination getDestination (@ Nullable ValueInSingleWindow <T > element ) {
331+ if (!initialized .get ()) {
332+ evaluateOncePartitioningAndClustering ();
333+ }
304334 TableDestination destination = super .getDestination (element );
335+
305336 String partitioning =
306- Optional .ofNullable (jsonTimePartitioning ).map (ValueProvider ::get ).orElse (null );
307- if (partitioning == null
308- || JsonParser .parseString (partitioning ).getAsJsonObject ().isEmpty ()) {
309- partitioning = destination .getJsonTimePartitioning ();
310- }
311- String clustering = Optional .ofNullable (jsonClustering ).map (ValueProvider ::get ).orElse (null );
312- if (clustering == null || JsonParser .parseString (clustering ).getAsJsonObject ().isEmpty ()) {
313- clustering = destination .getJsonClustering ();
314- }
337+ evaluatedPartitioning != null
338+ ? evaluatedPartitioning
339+ : destination .getJsonTimePartitioning ();
340+ String clustering =
341+ evaluatedClustering != null ? evaluatedClustering : destination .getJsonClustering ();
315342
316343 return new TableDestination (
317344 destination .getTableSpec (), destination .getTableDescription (), partitioning , clustering );
0 commit comments