Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,33 @@ public class RecordVisitor
*/
protected final boolean _overridden;

/**
* When Avro schema for this JavaType ({@code _type}) results in UNION of multiple Avro types, _typeSchema keeps track
* which Avro type in the UNION represents this JavaType ({@code _type}) so that fields of this JavaType can be set to the right Avro type by {@code builtAvroSchema()}.
*
* Example:
* <pre>
* @JsonSubTypes({
* @JsonSubTypes.Type(value = Apple.class),
* @JsonSubTypes.Type(value = Pear.class) })
* class Fruit {}
*
* class Apple extends Fruit {}
* class Orange extends Fruit {}
* </pre>
* When _type = Fruit.class
* Then
* _avroSchema if Fruit.class is union of Fruit record, Apple record and Orange record schemas: [
* { name: Fruit, type: record, fields: [..] }, <--- _typeSchema points here
* { name: Apple, type: record, fields: [..] },
* { name: Orange, type: record, fields: [..]}
* ]
* _typeSchema points to Fruit.class without subtypes record schema
*
* FIXME: When _typeSchema is not null, then _overridden must be true, therefore (_overridden == true) can be replaced with (_typeSchema != null),
* but it might be considered API change cause _overridden has protected access modifier.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that as a follow-up for 2.20 (2.x branch)

*/
private Schema _typeSchema;
protected Schema _avroSchema;

protected List<Schema.Field> _fields = new ArrayList<>();
Expand All @@ -42,32 +69,59 @@ public RecordVisitor(SerializerProvider p, JavaType type, VisitorFormatWrapperIm
_visitorWrapper = visitorWrapper;
// Check if the schema for this record is overridden
BeanDescription bean = getProvider().getConfig().introspectDirectClassAnnotations(_type);
List<NamedType> subTypes = getProvider().getAnnotationIntrospector().findSubtypes(bean.getClassInfo());
AvroSchema ann = bean.getClassInfo().getAnnotation(AvroSchema.class);
if (ann != null) {
_avroSchema = AvroSchemaHelper.parseJsonSchema(ann.value());
_overridden = true;
} else if (subTypes != null && !subTypes.isEmpty()) {
List<Schema> unionSchemas = new ArrayList<>();
try {
for (NamedType subType : subTypes) {
JsonSerializer<?> ser = getProvider().findValueSerializer(subType.getType());
VisitorFormatWrapperImpl visitor = _visitorWrapper.createChildWrapper();
ser.acceptJsonFormatVisitor(visitor, getProvider().getTypeFactory().constructType(subType.getType()));
unionSchemas.add(visitor.getAvroSchema());
}
_avroSchema = Schema.createUnion(unionSchemas);
_overridden = true;
} catch (JsonMappingException jme) {
throw new RuntimeException("Failed to build schema", jme);
}
} else {
_avroSchema = AvroSchemaHelper.initializeRecordSchema(bean);
// If Avro schema for this _type results in UNION I want to know Avro type where to assign fields
_typeSchema = AvroSchemaHelper.initializeRecordSchema(bean);
_avroSchema = _typeSchema;
_overridden = false;
AvroMeta meta = bean.getClassInfo().getAnnotation(AvroMeta.class);
if (meta != null) {
_avroSchema.addProp(meta.key(), meta.value());
}

List<NamedType> subTypes = getProvider().getAnnotationIntrospector().findSubtypes(bean.getClassInfo());
if (subTypes != null && !subTypes.isEmpty()) {
// alreadySeenClasses prevents subType processing in endless loop
Set<Class<?>> alreadySeenClasses = new HashSet<>();
alreadySeenClasses.add(_type.getRawClass());

// At this point calculating hashCode for _typeSchema fails with NPE because RecordSchema.fields is NULL
// see org.apache.avro.Schema.RecordSchema#computeHash.
// Therefore, unionSchemas must not be HashSet (or any other type using hashCode() for equality check).
// Set ensures that each subType schema is once in resulting union.
// IdentityHashMap is used because it is using reference-equality.
Set<Schema> unionSchemas = Collections.newSetFromMap(new IdentityHashMap<>());
// Initialize with this schema
if (_type.isConcrete()) {
unionSchemas.add(_typeSchema);
}

try {
for (NamedType subType : subTypes) {
if (!alreadySeenClasses.add(subType.getType())) {
continue;
}
JsonSerializer<?> ser = getProvider().findValueSerializer(subType.getType());
VisitorFormatWrapperImpl visitor = _visitorWrapper.createChildWrapper();
ser.acceptJsonFormatVisitor(visitor, getProvider().getTypeFactory().constructType(subType.getType()));
// Add subType schema into this union, unless it is already there.
Schema subTypeSchema = visitor.getAvroSchema();
// When subType schema is union itself, include each its type into this union if not there already
if (subTypeSchema.getType() == Type.UNION) {
unionSchemas.addAll(subTypeSchema.getTypes());
} else {
unionSchemas.add(subTypeSchema);
}
}
_avroSchema = Schema.createUnion(new ArrayList<>(unionSchemas));
} catch (JsonMappingException jme) {
throw new RuntimeException("Failed to build schema", jme);
}
}
}
_visitorWrapper.getSchemas().addSchema(type, _avroSchema);
}
Expand All @@ -76,7 +130,7 @@ public RecordVisitor(SerializerProvider p, JavaType type, VisitorFormatWrapperIm
public Schema builtAvroSchema() {
if (!_overridden) {
// Assumption now is that we are done, so let's assign fields
_avroSchema.setFields(_fields);
_typeSchema.setFields(_fields);
}
return _avroSchema;
}
Expand Down
Loading