From de882828673a6e7babaeeb9bc4122a4bf80170c1 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 13 Nov 2025 09:58:48 -0500 Subject: [PATCH 1/5] [Proposal] Allow users to specify trusted Avro serializable classes to Dataflow worker --- .../dataflow/options/DataflowPipelineOptions.java | 10 ++++++++++ sdks/java/container/boot.go | 4 ++++ 2 files changed, 14 insertions(+) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 57f927d73073..0b83a31549d5 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -266,4 +266,14 @@ public String create(PipelineOptions options) { List getJdkAddOpenModules(); void setJdkAddOpenModules(List options); + + /** + * Set trusted serializable classes for use with the Avro `java-class` schema property. + * + *

See: https://github.com/apache/avro/pull/3376 + */ + @Description("Serializable classes required by java-class props in Avro 1.11.4+") + List getAvroSerializableClasses(); + + void setAvroSerializableClasses(List options); } diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 2b8b510ee9b3..8c0a9670d30b 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -276,6 +276,10 @@ func main() { args = append(args, "--add-modules="+module.GetStringValue()) } } + // Add trusted Avro serializable classes + if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok { + args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClasses.GetStringValue(), ",") + } } // Automatically open modules for Java 11+ openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar" From e027e2db182c69fddb29a08de63ba75ba1e6ce67 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Thu, 13 Nov 2025 10:26:19 -0500 Subject: [PATCH 2/5] Fixup boot.go --- sdks/java/container/boot.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 8c0a9670d30b..0f840007e9d2 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -278,7 +278,11 @@ func main() { } // Add trusted Avro serializable classes if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok { - args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClasses.GetStringValue(), ",") + var serializableClassesSlice []string + for _, cls := range serializableClasses.GetListValue().GetValues() { + serializableClassesSlice = append(serializableClassesSlice, cls.GetStringValue()) + } + args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesSlice, ",")) } } // Automatically open modules for Java 11+ From 83a8513894025c7d13887633c01119d84c5dd286 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 17 Nov 2025 14:59:02 -0500 Subject: [PATCH 3/5] Add default factory; add tests --- .../options/DataflowPipelineOptions.java | 23 ++++++++++++++++--- .../options/DataflowPipelineOptionsTest.java | 23 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index 0b83a31549d5..d6f41d044b5e 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow.options; import com.google.api.services.dataflow.Dataflow; +import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -267,12 +268,28 @@ public String create(PipelineOptions options) { void setJdkAddOpenModules(List options); + class AvroSerializableClassesFactory implements DefaultValueFactory> { + @Override + public List create(PipelineOptions options) { + return Arrays.asList( + "java.math.BigDecimal", + "java.math.BigInteger", + "java.net.URI", + "java.net.URL", + "java.io.File", + "java.lang.Integer"); + } + } + /** - * Set trusted serializable classes for use with the Avro `java-class` schema property. - * - *

See: https://github.com/apache/avro/pull/3376 + * The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized + * and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java + * classes must be explicitly specified via the jvm option. The comma-separated String value of + * this pipeline option will be passed to the Dataflow worker via the + * -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option. */ @Description("Serializable classes required by java-class props in Avro 1.11.4+") + @Default.InstanceFactory(AvroSerializableClassesFactory.class) List getAvroSerializableClasses(); void setAvroSerializableClasses(List options); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index b381396f2bcc..f3f56b300788 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; @@ -252,6 +253,28 @@ public void testDefaultGcpRegionFromGcloud() { } } + @Test + public void testDefaultAvroSerializableClasses() { + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + assertEquals( + Arrays.asList( + "java.math.BigDecimal", + "java.math.BigInteger", + "java.net.URI", + "java.net.URL", + "java.io.File", + "java.lang.Integer"), + options.getAvroSerializableClasses()); + } + + @Test + public void testOverriddenAvroSerializableClasses() { + final List opts = Arrays.asList("foo", "bar"); + DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); + options.setAvroSerializableClasses(opts); + assertEquals(opts, options.getAvroSerializableClasses()); + } + /** * If gcloud gets stuck, test that {@link DefaultGcpRegionFactory#getRegionFromGcloudCli(long)} * times out instead of blocking forever. From 7cf5b2c9fcd1b9e4de787f311a2fa66a1d711ae2 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Mon, 17 Nov 2025 16:21:58 -0500 Subject: [PATCH 4/5] Set default options from boot.go; move PipelineOpt to SdkHarnessOptions --- .../options/DataflowPipelineOptions.java | 27 ------------------- .../options/DataflowPipelineOptionsTest.java | 23 ---------------- sdks/java/container/boot.go | 17 +++++++++--- .../beam/sdk/options/SdkHarnessOptions.java | 12 +++++++++ 4 files changed, 26 insertions(+), 53 deletions(-) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java index d6f41d044b5e..57f927d73073 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptions.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.dataflow.options; import com.google.api.services.dataflow.Dataflow; -import java.util.Arrays; import java.util.List; import java.util.Map; import org.apache.beam.runners.dataflow.DataflowRunner; @@ -267,30 +266,4 @@ public String create(PipelineOptions options) { List getJdkAddOpenModules(); void setJdkAddOpenModules(List options); - - class AvroSerializableClassesFactory implements DefaultValueFactory> { - @Override - public List create(PipelineOptions options) { - return Arrays.asList( - "java.math.BigDecimal", - "java.math.BigInteger", - "java.net.URI", - "java.net.URL", - "java.io.File", - "java.lang.Integer"); - } - } - - /** - * The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized - * and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java - * classes must be explicitly specified via the jvm option. The comma-separated String value of - * this pipeline option will be passed to the Dataflow worker via the - * -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option. - */ - @Description("Serializable classes required by java-class props in Avro 1.11.4+") - @Default.InstanceFactory(AvroSerializableClassesFactory.class) - List getAvroSerializableClasses(); - - void setAvroSerializableClasses(List options); } diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java index f3f56b300788..b381396f2bcc 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/options/DataflowPipelineOptionsTest.java @@ -29,7 +29,6 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.util.Arrays; import java.util.List; import java.util.concurrent.TimeoutException; import org.apache.beam.sdk.extensions.gcp.storage.NoopPathValidator; @@ -253,28 +252,6 @@ public void testDefaultGcpRegionFromGcloud() { } } - @Test - public void testDefaultAvroSerializableClasses() { - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - assertEquals( - Arrays.asList( - "java.math.BigDecimal", - "java.math.BigInteger", - "java.net.URI", - "java.net.URL", - "java.io.File", - "java.lang.Integer"), - options.getAvroSerializableClasses()); - } - - @Test - public void testOverriddenAvroSerializableClasses() { - final List opts = Arrays.asList("foo", "bar"); - DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); - options.setAvroSerializableClasses(opts); - assertEquals(opts, options.getAvroSerializableClasses()); - } - /** * If gcloud gets stuck, test that {@link DefaultGcpRegionFactory#getRegionFromGcloudCli(long)} * times out instead of blocking forever. diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index 0f840007e9d2..c3c342045d59 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -277,12 +277,23 @@ func main() { } } // Add trusted Avro serializable classes + var serializableClassesList []string if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok { - var serializableClassesSlice []string for _, cls := range serializableClasses.GetListValue().GetValues() { - serializableClassesSlice = append(serializableClassesSlice, cls.GetStringValue()) + serializableClassesList = append(serializableClassesList, cls.GetStringValue()) } - args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesSlice, ",")) + } else { + serializableClassesList = []string{ + "java.math.BigDecimal", + "java.math.BigInteger", + "java.net.URI", + "java.net.URL", + "java.io.File", + "java.lang.Integer", + } + } + if len(serializableClassesList) > 0 { + args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesList, ",")) } } // Automatically open modules for Java 11+ diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java index ad5b1451075c..ecebeee4bba3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/options/SdkHarnessOptions.java @@ -440,4 +440,16 @@ public Duration create(PipelineOptions options) { int getElementProcessingTimeoutMinutes(); void setElementProcessingTimeoutMinutes(int value); + + /** + * The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized + * and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java + * classes must be explicitly specified via the jvm option. The comma-separated String value of + * this pipeline option will be passed to the Dataflow worker via the + * -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option. + */ + @Description("Serializable classes required by java-class props in Avro 1.11.4+") + List getAvroSerializableClasses(); + + void setAvroSerializableClasses(List options); } From afb1461689941ba9e35e6585a7d27b641b1f67e3 Mon Sep 17 00:00:00 2001 From: Claire McGinty Date: Tue, 18 Nov 2025 13:07:56 -0500 Subject: [PATCH 5/5] Add check for empty list --- sdks/java/container/boot.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/sdks/java/container/boot.go b/sdks/java/container/boot.go index c3c342045d59..f6c33b635d3c 100644 --- a/sdks/java/container/boot.go +++ b/sdks/java/container/boot.go @@ -280,7 +280,10 @@ func main() { var serializableClassesList []string if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok { for _, cls := range serializableClasses.GetListValue().GetValues() { - serializableClassesList = append(serializableClassesList, cls.GetStringValue()) + // User can specify an empty list, which is serialized as a single, blank value + if cls.GetStringValue() != "" { + serializableClassesList = append(serializableClassesList, cls.GetStringValue()) + } } } else { serializableClassesList = []string{