Skip to content

Commit 978b9be

Browse files
committed
Added support for arbitrarily complex Avro types
KAFKA-35
1 parent c8eea6d commit 978b9be

File tree

3 files changed

+163
-87
lines changed

3 files changed

+163
-87
lines changed

config/checkstyle/suppressions.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
<suppressions>
2323
<suppress checks="LineLength" files=".*MongoSinkTopicConfig.java"/>
2424
<suppress checks="MethodLength" files=".*MongoSinkTopicConfig.java"/>
25+
<suppress checks="LineLength" files=".*RecordConverterTest.java"/>
26+
<suppress checks="MethodLength" files=".*RecordConverterTest.java"/>
2527

2628
<suppress checks="Javadoc*" files="com[\\/]mongodb[\\/]kafka[\\/]connect[\\/]"/>
2729

src/main/java/com/mongodb/kafka/connect/sink/converter/AvroJsonSchemafulRecordConverter.java

Lines changed: 64 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020

2121
import static java.util.Arrays.asList;
2222
import static java.util.Collections.unmodifiableSet;
23+
import static org.apache.kafka.connect.data.Schema.Type.ARRAY;
24+
import static org.apache.kafka.connect.data.Schema.Type.MAP;
25+
import static org.apache.kafka.connect.data.Schema.Type.STRUCT;
2326

2427
import java.util.HashMap;
2528
import java.util.HashSet;
@@ -42,6 +45,7 @@
4245
import org.bson.BsonArray;
4346
import org.bson.BsonDocument;
4447
import org.bson.BsonNull;
48+
import org.bson.BsonValue;
4549

4650
import com.mongodb.kafka.connect.sink.converter.types.sink.bson.BooleanFieldConverter;
4751
import com.mongodb.kafka.connect.sink.converter.types.sink.bson.BytesFieldConverter;
@@ -92,13 +96,10 @@ class AvroJsonSchemafulRecordConverter implements RecordConverter {
9296

9397
@Override
9498
public BsonDocument convert(final Schema schema, final Object value) {
95-
9699
if (schema == null || value == null) {
97100
throw new DataException("Error: schema and/or value was null for AVRO conversion");
98101
}
99-
100-
return toBsonDoc(schema, value);
101-
102+
return toBsonDoc(schema, value).asDocument();
102103
}
103104

104105
private void registerSinkFieldConverter(final SinkFieldConverter converter) {
@@ -109,19 +110,67 @@ private void registerSinkFieldLogicalConverter(final SinkFieldConverter converte
109110
logicalConverters.put(converter.getSchema().name(), converter);
110111
}
111112

112-
private BsonDocument toBsonDoc(final Schema schema, final Object value) {
113+
private BsonValue toBsonDoc(final Schema schema, final Object value) {
114+
if (value == null) {
115+
return BsonNull.VALUE;
116+
}
113117
BsonDocument doc = new BsonDocument();
114-
schema.fields().forEach(f -> processField(doc, (Struct) value, f));
118+
if (schema.type() == MAP) {
119+
Schema fieldSchema = schema.valueSchema();
120+
Map m = (Map) value;
121+
for (Object entry : m.keySet()) {
122+
String key = (String) entry;
123+
if (fieldSchema.type().isPrimitive()) {
124+
doc.put(key, getConverter(fieldSchema).toBson(m.get(key), fieldSchema));
125+
} else if (fieldSchema.type().equals(ARRAY)) {
126+
doc.put(key, toBsonArray(fieldSchema, m.get(key)));
127+
} else {
128+
if (m.get(key) == null) {
129+
doc.put(key, BsonNull.VALUE);
130+
} else {
131+
doc.put(key, toBsonDoc(fieldSchema, m.get(key)));
132+
}
133+
}
134+
}
135+
} else {
136+
schema.fields().forEach(f -> doc.put(f.name(), processField((Struct) value, f)));
137+
}
115138
return doc;
116139
}
117140

118-
private void processField(final BsonDocument doc, final Struct struct, final Field field) {
141+
private BsonValue toBsonArray(final Schema schema, final Object value) {
142+
if (value == null) {
143+
return BsonNull.VALUE;
144+
}
145+
Schema fieldSchema = schema.valueSchema();
146+
BsonArray bsonArray = new BsonArray();
147+
List<?> myList = (List) value;
148+
myList.forEach(v -> {
149+
if (fieldSchema.type().isPrimitive()) {
150+
if (v == null) {
151+
bsonArray.add(BsonNull.VALUE);
152+
} else {
153+
bsonArray.add(getConverter(fieldSchema).toBson(v));
154+
}
155+
} else if (fieldSchema.type().equals(ARRAY)) {
156+
bsonArray.add(toBsonArray(fieldSchema, v));
157+
} else {
158+
bsonArray.add(toBsonDoc(fieldSchema, v));
159+
}
160+
});
161+
return bsonArray;
162+
}
119163

164+
private BsonValue processField(final Struct struct, final Field field) {
120165
LOGGER.trace("processing field '{}'", field.name());
121166

167+
if (struct.get(field.name()) == null) {
168+
LOGGER.trace("no field in struct -> adding null");
169+
return BsonNull.VALUE;
170+
}
171+
122172
if (isSupportedLogicalType(field.schema())) {
123-
doc.put(field.name(), getConverter(field.schema()).toBson(struct.get(field), field.schema()));
124-
return;
173+
return getConverter(field.schema()).toBson(struct.get(field), field.schema());
125174
}
126175

127176
try {
@@ -135,78 +184,23 @@ private void processField(final BsonDocument doc, final Struct struct, final Fie
135184
case INT64:
136185
case STRING:
137186
case BYTES:
138-
handlePrimitiveField(doc, struct, field);
139-
break;
187+
return handlePrimitiveField(struct, field);
140188
case STRUCT:
141-
handleStructField(doc, struct, field);
142-
break;
143-
case ARRAY:
144-
handleArrayField(doc, struct, field);
145-
break;
146189
case MAP:
147-
handleMapField(doc, struct, field);
148-
break;
190+
return toBsonDoc(field.schema(), struct.get(field));
191+
case ARRAY:
192+
return toBsonArray(field.schema(), struct.get(field));
149193
default:
150194
throw new DataException("unexpected / unsupported schema type " + field.schema().type());
151195
}
152196
} catch (Exception exc) {
153197
throw new DataException("error while processing field " + field.name(), exc);
154198
}
155-
156199
}
157200

158-
private void handleMapField(final BsonDocument doc, final Struct struct, final Field field) {
159-
LOGGER.trace("handling complex type 'map'");
160-
BsonDocument bd = new BsonDocument();
161-
if (struct.get(field) == null) {
162-
LOGGER.trace("no field in struct -> adding null");
163-
doc.put(field.name(), BsonNull.VALUE);
164-
return;
165-
}
166-
Map m = (Map) struct.get(field);
167-
for (Object entry : m.keySet()) {
168-
String key = (String) entry;
169-
if (field.schema().valueSchema().type().isPrimitive()) {
170-
bd.put(key, getConverter(field.schema().valueSchema()).toBson(m.get(key), field.schema()));
171-
} else {
172-
bd.put(key, toBsonDoc(field.schema().valueSchema(), m.get(key)));
173-
}
174-
}
175-
doc.put(field.name(), bd);
176-
}
177-
178-
private void handleArrayField(final BsonDocument doc, final Struct struct, final Field field) {
179-
LOGGER.trace("handling complex type 'array'");
180-
BsonArray array = new BsonArray();
181-
if (struct.get(field) == null) {
182-
LOGGER.trace("no field in struct -> adding null");
183-
doc.put(field.name(), BsonNull.VALUE);
184-
return;
185-
}
186-
for (Object element : (List) struct.get(field)) {
187-
if (field.schema().valueSchema().type().isPrimitive()) {
188-
array.add(getConverter(field.schema().valueSchema()).toBson(element, field.schema()));
189-
} else {
190-
array.add(toBsonDoc(field.schema().valueSchema(), element));
191-
}
192-
}
193-
doc.put(field.name(), array);
194-
}
195-
196-
private void handleStructField(final BsonDocument doc, final Struct struct, final Field field) {
197-
LOGGER.trace("handling complex type 'struct'");
198-
if (struct.get(field) != null) {
199-
LOGGER.trace(struct.get(field).toString());
200-
doc.put(field.name(), toBsonDoc(field.schema(), struct.get(field)));
201-
} else {
202-
LOGGER.trace("no field in struct -> adding null");
203-
doc.put(field.name(), BsonNull.VALUE);
204-
}
205-
}
206-
207-
private void handlePrimitiveField(final BsonDocument doc, final Struct struct, final Field field) {
201+
private BsonValue handlePrimitiveField(final Struct struct, final Field field) {
208202
LOGGER.trace("handling primitive type '{}'", field.schema().type());
209-
doc.put(field.name(), getConverter(field.schema()).toBson(struct.get(field), field.schema()));
203+
return getConverter(field.schema()).toBson(struct.get(field), field.schema());
210204
}
211205

212206
private boolean isSupportedLogicalType(final Schema schema) {
@@ -228,7 +222,6 @@ private SinkFieldConverter getConverter(final Schema schema) {
228222
if (converter == null) {
229223
throw new ConnectException("error no registered converter found for " + schema.type().getName());
230224
}
231-
232225
return converter;
233226
}
234227
}

0 commit comments

Comments
 (0)