Skip to content

Commit bfa1cfd

Browse files
authored
Fix resolve error caused by avro record collision (#161)
1 parent 1851d2e commit bfa1cfd

File tree

2 files changed

+63
-10
lines changed

2 files changed

+63
-10
lines changed

hoptimator-avro/src/main/java/com/linkedin/hoptimator/avro/AvroConverter.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,14 @@ private AvroConverter() {
3030
}
3131

3232
public static Schema avro(String namespace, String name, RelDataType dataType) {
33+
// TODO: Schema generation is overly verbose today and does not support reuse, hence why we always defined a new namespace for records.
34+
// Ideally information should be extracted from the RelDataType to allow collapsing of records into one definition.
35+
String newNamespace = namespace + "." + name;
3336
if (dataType.isStruct()) {
3437
List<Schema.Field> fields = dataType.getFieldList().stream()
35-
.map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null))
38+
.map(x -> new Schema.Field(sanitize(x.getName()), avro(newNamespace, x.getName(), x.getType()), describe(x), null))
3639
.collect(Collectors.toList());
37-
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields),
40+
return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), newNamespace, false, fields),
3841
dataType.isNullable());
3942
} else {
4043
switch (dataType.getSqlTypeName()) {
@@ -52,7 +55,7 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
5255
case BINARY:
5356
case VARBINARY:
5457
if (dataType.getPrecision() != -1) {
55-
return createAvroSchemaWithNullability(Schema.createFixed(sanitize(name), dataType.toString(), namespace,
58+
return createAvroSchemaWithNullability(Schema.createFixed(sanitize(name), dataType.toString(), newNamespace,
5659
dataType.getPrecision()), dataType.isNullable());
5760
} else {
5861
return createAvroTypeWithNullability(Schema.Type.BYTES, dataType.isNullable());
@@ -63,12 +66,12 @@ public static Schema avro(String namespace, String name, RelDataType dataType) {
6366
return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable());
6467
case ARRAY:
6568
return createAvroSchemaWithNullability(
66-
Schema.createArray(avro(null, sanitize(name) + "ArrayElement",
69+
Schema.createArray(avro(newNamespace, sanitize(name) + "ArrayElement",
6770
Objects.requireNonNull(dataType.getComponentType()))),
6871
dataType.isNullable());
6972
case MAP:
7073
return createAvroSchemaWithNullability(
71-
Schema.createMap(avro(null, sanitize(name) + "MapElement",
74+
Schema.createMap(avro(newNamespace, sanitize(name) + "MapElement",
7275
Objects.requireNonNull(dataType.getValueType()))),
7376
dataType.isNullable());
7477
case UNKNOWN:

hoptimator-avro/src/test/java/com/linkedin/hoptimator/avro/AvroConverterTest.java

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.List;
55
import java.util.Map;
66
import java.util.Objects;
7-
87
import org.apache.avro.Schema;
98
import org.apache.calcite.plan.RelOptUtil;
109
import org.apache.calcite.rel.type.RelDataType;
@@ -103,7 +102,7 @@ public void testAvroKeyPayloadSchemaNoKeyOptions() {
103102
assertNull(result.getKey()); // Key schema should be null
104103
assertNotNull(result.getValue()); // Payload schema should not be null
105104
assertEquals("payloadSchema", result.getValue().getName());
106-
assertEquals("namespace", result.getValue().getNamespace());
105+
assertEquals("namespace.payloadSchema", result.getValue().getNamespace());
107106
assertEquals("record", result.getValue().getType().getName());
108107
assertEquals(1, result.getValue().getFields().size());
109108
assertEquals("field1", result.getValue().getFields().get(0).name());
@@ -138,14 +137,14 @@ public void testAvroKeyPayloadSchemaValidKeyOptions() {
138137

139138
assertNotNull(result.getKey()); // Key schema should not be null
140139
assertEquals("keySchema", result.getKey().getName());
141-
assertEquals("namespace", result.getKey().getNamespace());
140+
assertEquals("namespace.keySchema", result.getKey().getNamespace());
142141
assertEquals("record", result.getKey().getType().getName());
143142
assertEquals(1, result.getKey().getFields().size());
144143
assertEquals("field1", result.getKey().getFields().get(0).name()); // prefix should be stripped
145144
assertEquals("string", result.getKey().getFields().get(0).schema().getType().getName());
146145
assertNotNull(result.getValue()); // Payload schema should not be null
147146
assertEquals("payloadSchema", result.getValue().getName());
148-
assertEquals("namespace", result.getValue().getNamespace());
147+
assertEquals("namespace.payloadSchema", result.getValue().getNamespace());
149148
assertEquals("record", result.getValue().getType().getName());
150149
assertEquals(1, result.getValue().getFields().size());
151150
assertEquals("field2", result.getValue().getFields().get(0).name());
@@ -168,7 +167,7 @@ public void testAvroKeyPayloadSchemaPrimitiveKey() {
168167
assertEquals("int", result.getKey().getType().getName());
169168
assertNotNull(result.getValue()); // Payload schema should not be null
170169
assertEquals("payloadSchema", result.getValue().getName());
171-
assertEquals("namespace", result.getValue().getNamespace());
170+
assertEquals("namespace.payloadSchema", result.getValue().getNamespace());
172171
assertEquals("record", result.getValue().getType().getName());
173172
assertEquals(1, result.getValue().getFields().size());
174173
assertEquals("field1", result.getValue().getFields().get(0).name());
@@ -208,4 +207,55 @@ public void convertsNestedArray() {
208207
assertEquals("field1", structElementSchema.getFields().get(0).name());
209208
assertEquals("field2", structElementSchema.getFields().get(1).name());
210209
}
210+
211+
@Test
212+
public void handlesNamespaceInNestedArrayAndMapElements() {
213+
RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT);
214+
215+
// Create a "location" record type that will be reused - this mimics the real scenario
216+
RelDataType locationType1 = typeFactory.createStructType(
217+
List.of(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.VARCHAR)),
218+
List.of("countryCode", "postalCode"));
219+
220+
// Create another "location" record type with slightly different structure
221+
RelDataType locationType2 = typeFactory.createStructType(
222+
List.of(typeFactory.createSqlType(SqlTypeName.VARCHAR), typeFactory.createSqlType(SqlTypeName.INTEGER)),
223+
List.of("countryCode", "regionCode"));
224+
225+
// Create structures that use these location types in different contexts
226+
// This simulates the real scenario where multiple fields have the same name but different contexts
227+
RelDataType profileStruct = typeFactory.createStructType(
228+
List.of(locationType1),
229+
List.of("location"));
230+
231+
RelDataType positionStruct = typeFactory.createStructType(
232+
List.of(locationType2),
233+
List.of("location"));
234+
235+
// Put both in a map structure - this creates the collision scenario
236+
// Both will try to generate records named "location" with the same namespace
237+
RelDataType positionsMap = typeFactory.createMapType(
238+
typeFactory.createSqlType(SqlTypeName.VARCHAR),
239+
positionStruct);
240+
241+
// Create the main record that contains both location types
242+
RelDataType mainRecord = typeFactory.createStructType(
243+
List.of(profileStruct, positionsMap),
244+
List.of("profile", "positions"));
245+
246+
// Schema creation should succeed
247+
Schema schema = AvroConverter.avro("com.linkedin", "MemberProfile", mainRecord);
248+
assertNotNull(schema);
249+
250+
// Without the namespace-appending behavior in AvroConverter, this would fail with error "Can't redefine: com.linkedin.location"
251+
// The issue occurs because multiple records named "location" are created with the same namespace,
252+
// causing a collision when schema.toString(true) tries to serialize them
253+
String schemaJson = schema.toString(true);
254+
assertNotNull("Schema toString(true) should succeed without 'Can't redefine' errors", schemaJson);
255+
256+
// Verify the schema can be parsed back
257+
Schema.Parser parser = new Schema.Parser();
258+
Schema reparsedSchema = parser.parse(schemaJson);
259+
assertNotNull("Generated schema must be parseable", reparsedSchema);
260+
}
211261
}

0 commit comments

Comments
 (0)