From e63009035469aa3b5eb862fbabfdd6b65de60001 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 24 Oct 2025 17:39:12 +0000 Subject: [PATCH 1/2] fix race condition --- .../apache/beam/sdk/util/RowJsonUtils.java | 29 +++++++++---------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java index c83048ca8def..4d2019dca877 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java @@ -51,21 +51,20 @@ public class RowJsonUtils { * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit. * If needed, call this method during pipeline run time, e.g. in DoFn.setup. */ - public static void increaseDefaultStreamReadConstraints(int newLimit) { - if (newLimit <= defaultBufferLimit) { - return; + public static synchronized void increaseDefaultStreamReadConstraints(int newLimit) { + if (newLimit > defaultBufferLimit) { + try { + Class.forName("com.fasterxml.jackson.core.StreamReadConstraints"); + + com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( + com.fasterxml.jackson.core.StreamReadConstraints.builder() + .maxStringLength(newLimit) + .build()); + } catch (ClassNotFoundException e) { + // <2.15, do nothing + } + defaultBufferLimit = newLimit; } - try { - Class unused = Class.forName("com.fasterxml.jackson.core.StreamReadConstraints"); - - com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( - com.fasterxml.jackson.core.StreamReadConstraints.builder() - .maxStringLength(newLimit) - .build()); - } catch (ClassNotFoundException e) { - // <2.15, do nothing - } - defaultBufferLimit = newLimit; } static { @@ -101,7 +100,7 @@ static void setStreamReadConstraints(JsonFactory jsonFactory, int sizeLimit) { * factory to higher limits. If needed, call this method during pipeline run time, e.g. in * DoFn.setup. This avoids a data race caused by modifying the global default settings. */ - public static JsonFactory createJsonFactory(int sizeLimit) { + public static synchronized JsonFactory createJsonFactory(int sizeLimit) { sizeLimit = Math.max(sizeLimit, MAX_STRING_LENGTH); JsonFactory jsonFactory = new JsonFactory(); if (STREAM_READ_CONSTRAINTS_AVAILABLE) { From fea502d9e8b48c023f160e0f39b27628e2c2c5a4 Mon Sep 17 00:00:00 2001 From: Derrick Williams Date: Fri, 31 Oct 2025 19:18:39 +0000 Subject: [PATCH 2/2] update based on comments --- .../apache/beam/sdk/util/RowJsonUtils.java | 38 ++++++++++--------- 1 file changed, 20 insertions(+), 18 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java index 4d2019dca877..ee41d0da28fe 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java @@ -46,23 +46,19 @@ public class RowJsonUtils { /** * Increase the default jackson-databind stream read constraint. * - *

StreamReadConstraints was introduced in jackson 2.15 causing string > 20MB (5MB in 2.15.0) - * parsing failure. This has caused regressions in its dependencies include Beam. Here we - * overwrite the default buffer size limit to 100 MB, and exposes this interface for higher limit. - * If needed, call this method during pipeline run time, e.g. in DoFn.setup. + *

In Jackson 2.15, a new constraint is added on the max string length of JSON parsing, see + * https://github.com/FasterXML/jackson-core/issues/863. The default is 20M characters. This is + * too small for some of our users. This method allows users to increase this limit. */ public static synchronized void increaseDefaultStreamReadConstraints(int newLimit) { + if (!STREAM_READ_CONSTRAINTS_AVAILABLE) { + return; + } if (newLimit > defaultBufferLimit) { - try { - Class.forName("com.fasterxml.jackson.core.StreamReadConstraints"); - - com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( - com.fasterxml.jackson.core.StreamReadConstraints.builder() - .maxStringLength(newLimit) - .build()); - } catch (ClassNotFoundException e) { - // <2.15, do nothing - } + com.fasterxml.jackson.core.StreamReadConstraints.overrideDefaultStreamReadConstraints( + com.fasterxml.jackson.core.StreamReadConstraints.builder() + .maxStringLength(newLimit) + .build()); defaultBufferLimit = newLimit; } } @@ -100,13 +96,19 @@ static void setStreamReadConstraints(JsonFactory jsonFactory, int sizeLimit) { * factory to higher limits. If needed, call this method during pipeline run time, e.g. in * DoFn.setup. This avoids a data race caused by modifying the global default settings. */ - public static synchronized JsonFactory createJsonFactory(int sizeLimit) { + public static JsonFactory createJsonFactory(int sizeLimit) { sizeLimit = Math.max(sizeLimit, MAX_STRING_LENGTH); - JsonFactory jsonFactory = new JsonFactory(); if (STREAM_READ_CONSTRAINTS_AVAILABLE) { - StreamReadConstraintsHelper.setStreamReadConstraints(jsonFactory, sizeLimit); + // Synchronize to avoid race condition with increaseDefaultStreamReadConstraints + // which modifies static defaults that builder() and new JsonFactory() may read. + synchronized (RowJsonUtils.class) { + JsonFactory jsonFactory = new JsonFactory(); + StreamReadConstraintsHelper.setStreamReadConstraints(jsonFactory, sizeLimit); + return jsonFactory; + } + } else { + return new JsonFactory(); } - return jsonFactory; } public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {