Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions sdks/java/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,28 @@ func main() {
args = append(args, "--add-modules="+module.GetStringValue())
}
}
// Add trusted Avro serializable classes
var serializableClassesList []string
if serializableClasses, ok := pipelineOptions.GetStructValue().GetFields()["avroSerializableClasses"]; ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have both; a default that matches avro and this option to override when needed. Ideally the avro default list should be public so users can extend/modify it as required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, for classes already caused regressions ("java.math.BigDecimal", and if there are others) we can add it here for users

for _, cls := range serializableClasses.GetListValue().GetValues() {
// User can specify an empty list, which is serialized as a single, blank value
if cls.GetStringValue() != "" {
serializableClassesList = append(serializableClassesList, cls.GetStringValue())
}
}
} else {
serializableClassesList = []string{
"java.math.BigDecimal",
"java.math.BigInteger",
"java.net.URI",
"java.net.URL",
"java.io.File",
"java.lang.Integer",
}
}
if len(serializableClassesList) > 0 {
args = append(args, "-Dorg.apache.avro.SERIALIZABLE_CLASSES="+strings.Join(serializableClassesList, ","))
}
}
// Automatically open modules for Java 11+
openModuleAgentJar := "/opt/apache/beam/jars/open-module-agent.jar"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,4 +440,16 @@ public Duration create(PipelineOptions options) {
int getElementProcessingTimeoutMinutes();

void setElementProcessingTimeoutMinutes(int value);

/**
* The Avro spec supports the `java-class` schema annotation, which allows fields to be serialized
* and deserialized via their toString/String constructor. As of Avro 1.11.4+, allowed Java
* classes must be explicitly specified via the jvm option. The comma-separated String value of
* this pipeline option will be passed to the Dataflow worker via the
* -Dorg.apache.avro.SERIALIZABLE_CLASSES jvm option.
*/
@Description("Serializable classes required by java-class props in Avro 1.11.4+")
List<String> getAvroSerializableClasses();

void setAvroSerializableClasses(List<String> options);
}
Loading