Skip to content

Commit 8e2d17b

Browse files
Merge pull request #217 from datastax/stream770
STREAM-770: Added support for tuple datatype
2 parents 00b5260 + 40d446b commit 8e2d17b

File tree

4 files changed

+155
-9
lines changed

4 files changed

+155
-9
lines changed

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractGenericConverter.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import com.datastax.oss.driver.api.core.CqlIdentifier;
1919
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
2020
import com.datastax.oss.driver.api.core.cql.Row;
21+
import com.datastax.oss.driver.api.core.data.TupleValue;
2122
import com.datastax.oss.driver.api.core.data.UdtValue;
2223
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2324
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2425
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
2526
import com.datastax.oss.driver.api.core.type.DataType;
27+
import com.datastax.oss.driver.api.core.type.TupleType;
2628
import com.datastax.oss.driver.api.core.type.UserDefinedType;
2729
import com.datastax.oss.protocol.internal.ProtocolConstants;
2830
import com.datastax.oss.pulsar.source.Converter;
@@ -98,6 +100,7 @@ public boolean isSupportedCqlType(DataType dataType) {
98100
case ProtocolConstants.DataType.FLOAT:
99101
case ProtocolConstants.DataType.INET:
100102
case ProtocolConstants.DataType.UDT:
103+
case ProtocolConstants.DataType.TUPLE:
101104
return true;
102105
}
103106
return false;
@@ -184,6 +187,15 @@ RecordSchemaBuilder addFieldSchema(RecordSchemaBuilder recordSchemaBuilder,
184187
fieldSchemaBuilder.optional().defaultValue(null);
185188
}
186189
break;
190+
case ProtocolConstants.DataType.TUPLE: {
191+
TupleType tupleType = (TupleType) dataType;
192+
FieldSchemaBuilder fieldSchemaBuilder = recordSchemaBuilder
193+
.field(fieldName, buildTupleSchema(ksm, dataType.asCql(false, true), schemaType, tupleType, optional))
194+
.type(schemaType);
195+
if (optional)
196+
fieldSchemaBuilder.optional().defaultValue(null);
197+
}
198+
break;
187199
default:
188200
log.debug("Ignoring unsupported type fields name={} type={}", fieldName, dataType.asCql(false, true));
189201
}
@@ -204,6 +216,19 @@ GenericSchema<GenericRecord> buildUDTSchema(KeyspaceMetadata ksm, String typeNam
204216
return genericSchema;
205217
}
206218

219+
GenericSchema<GenericRecord> buildTupleSchema(KeyspaceMetadata ksm, String typeName, SchemaType schemaType, TupleType tupleType, boolean optional) {
220+
String recordName = "Tuple_" + Integer.toHexString(typeName.hashCode());
221+
RecordSchemaBuilder tupleSchemaBuilder = SchemaBuilder.record(recordName);
222+
int i = 0;
223+
for (DataType componentType : tupleType.getComponentTypes()) {
224+
String fieldName = "index_" + i++;
225+
addFieldSchema(tupleSchemaBuilder, ksm, fieldName, componentType, schemaType, optional);
226+
}
227+
SchemaInfo schemaInfo = tupleSchemaBuilder.build(schemaType);
228+
GenericSchema<GenericRecord> genericSchema = Schema.generic(schemaInfo);
229+
udtSchemas.put(typeName, genericSchema);
230+
return genericSchema;
231+
}
207232
@Override
208233
public Schema<GenericRecord> getSchema() {
209234
return this.schema;
@@ -262,6 +287,9 @@ public GenericRecord toConnectData(Row row) {
262287
case ProtocolConstants.DataType.UDT:
263288
genericRecordBuilder.set(cm.getName().toString(), buildUDTValue(row.getUdtValue(cm.getName())));
264289
break;
290+
case ProtocolConstants.DataType.TUPLE:
291+
genericRecordBuilder.set(cm.getName().toString(), buildTupleValue(row.getTupleValue(cm.getName())));
292+
break;
265293
default:
266294
log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true));
267295
}
@@ -335,6 +363,17 @@ GenericRecord buildUDTValue(UdtValue udtValue) {
335363
return genericRecordBuilder.build();
336364
}
337365

366+
GenericRecord buildTupleValue(TupleValue tupleValue) {
367+
String typeName = tupleValue.getType().asCql(false, true);
368+
GenericSchema<?> genericSchema = udtSchemas.get(typeName);
369+
assert genericSchema != null : "Generic schema not found for Tuple=" + typeName;
370+
GenericRecordBuilder genericRecordBuilder = genericSchema.newRecordBuilder();
371+
for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) {
372+
genericRecordBuilder.set("index_" + i, tupleValue.getObject(i));
373+
}
374+
return genericRecordBuilder.build();
375+
}
376+
338377
/**
339378
* Convert GenericRecord to primary key column values.
340379
*

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java

Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.datastax.oss.cdc.NativeSchemaWrapper;
2020
import com.datastax.oss.driver.api.core.CqlIdentifier;
2121
import com.datastax.oss.driver.api.core.cql.Row;
22+
import com.datastax.oss.driver.api.core.data.TupleValue;
2223
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2324
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2425
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
@@ -27,12 +28,14 @@
2728
import com.datastax.oss.driver.api.core.type.ListType;
2829
import com.datastax.oss.driver.api.core.type.MapType;
2930
import com.datastax.oss.driver.api.core.type.SetType;
31+
import com.datastax.oss.driver.api.core.type.TupleType;
3032
import com.datastax.oss.driver.api.core.type.UserDefinedType;
3133
import com.datastax.oss.protocol.internal.ProtocolConstants;
3234
import com.datastax.oss.pulsar.source.Converter;
3335
import lombok.extern.slf4j.Slf4j;
3436
import org.apache.avro.Schema;
3537
import org.apache.avro.SchemaBuilder;
38+
import org.apache.avro.generic.GenericData;
3639
import org.apache.avro.generic.GenericRecord;
3740
import org.apache.pulsar.common.schema.SchemaType;
3841

@@ -83,6 +86,11 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List<Colu
8386
log.info("Add vector schema {}={}", field.name(), vectorSchema);
8487
}
8588
break;
89+
case ProtocolConstants.DataType.TUPLE:
90+
Schema tupleSchema = dataTypeSchema(ksm, cm.getType());
91+
subSchemas.put(field.name(), tupleSchema);
92+
log.info("Add tuple schema {}={}", field.name(), tupleSchema);
93+
break;
8694
}
8795
}
8896
}
@@ -132,6 +140,8 @@ boolean isSupportedCqlType(DataType dataType) {
132140
return true;
133141
case ProtocolConstants.DataType.CUSTOM:
134142
return dataType instanceof CqlVectorType;
143+
case ProtocolConstants.DataType.TUPLE:
144+
return true;
135145
}
136146
return false;
137147
}
@@ -205,6 +215,9 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) {
205215
CqlVectorType vectorType = (CqlVectorType) dataType;
206216
return org.apache.avro.Schema.createArray(dataTypeSchema(ksm, vectorType.getSubtype()));
207217
}
218+
case ProtocolConstants.DataType.TUPLE:
219+
TupleType tupleType = (TupleType) dataType;
220+
return buildTupleSchema(ksm, dataType.asCql(false, true), tupleType, true);
208221
default:
209222
throw new UnsupportedOperationException("Ignoring unsupported type=" + dataType.asCql(false, true));
210223
}
@@ -229,6 +242,26 @@ Schema buildUDTSchema(KeyspaceMetadata ksm, String typeName, boolean optional) {
229242
return udtSchema;
230243
}
231244

245+
Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleType, boolean optional) {
246+
List<Schema.Field> fieldSchemas = new ArrayList<>();
247+
int i = 0;
248+
for (DataType componentType : tupleType.getComponentTypes()) {
249+
String fieldName = "index_" + i;
250+
Schema.Field fieldSchema = fieldSchema(ksm, fieldName, componentType, optional);
251+
if (fieldSchema != null) {
252+
fieldSchemas.add(fieldSchema);
253+
String path = typeName + "." + fieldName;
254+
subSchemas.put(path, dataTypeSchema(ksm, componentType));
255+
}
256+
i++;
257+
}
258+
Schema tupleSchema = Schema.createRecord("Tuple_" + Integer.toHexString(
259+
tupleType.asCql(false, true).hashCode()
260+
), "CQL type " + typeName, ksm.getName().asInternal(), false, fieldSchemas);
261+
subSchemas.put(typeName, tupleSchema);
262+
return tupleSchema;
263+
}
264+
232265
String stringify(DataType dataType, Object value) {
233266
switch (dataType.getProtocolCode()) {
234267
case ProtocolConstants.DataType.ASCII:
@@ -270,8 +303,11 @@ String stringify(DataType dataType, Object value) {
270303
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
271304
*/
272305
Object marshalCollectionValue(Object collectionValue) {
273-
if(collectionValue instanceof Instant) {
274-
return ((Instant)collectionValue).toEpochMilli();
306+
if (collectionValue instanceof Instant) {
307+
return ((Instant) collectionValue).toEpochMilli();
308+
}
309+
if (collectionValue instanceof TupleValue) {
310+
return buildTupleValue((TupleValue) collectionValue);
275311
}
276312
return collectionValue;
277313
}
@@ -286,9 +322,25 @@ Object marshalCollectionValue(Object collectionValue) {
286322
*/
287323
Object marshalCollectionValue(Map.Entry<? super Object, ? super Object> entry) {
288324
Object collectionValue = entry.getValue();
289-
if(collectionValue instanceof Instant) {
290-
return ((Instant)collectionValue).toEpochMilli();
325+
if (collectionValue instanceof Instant) {
326+
return ((Instant) collectionValue).toEpochMilli();
327+
}
328+
if (collectionValue instanceof TupleValue) {
329+
return buildTupleValue((TupleValue) collectionValue);
291330
}
292331
return collectionValue;
293332
}
333+
334+
GenericRecord buildTupleValue(TupleValue tupleValue) {
335+
String typeName = tupleValue.getType().asCql(false, true);
336+
Schema tupleSchema = subSchemas.get(typeName);
337+
if (tupleSchema == null) {
338+
throw new IllegalStateException("Missing tuple schema for " + typeName);
339+
}
340+
GenericRecord record = new GenericData.Record(tupleSchema);
341+
for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) {
342+
record.put("index_" + i, marshalCollectionValue(tupleValue.getObject(i)));
343+
}
344+
return record;
345+
}
294346
}

connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.datastax.oss.driver.api.core.cql.Row;
2222
import com.datastax.oss.driver.api.core.data.CqlDuration;
2323
import com.datastax.oss.driver.api.core.data.CqlVector;
24+
import com.datastax.oss.driver.api.core.data.TupleValue;
2425
import com.datastax.oss.driver.api.core.data.UdtValue;
2526
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2627
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
@@ -178,6 +179,11 @@ public byte[] toConnectData(Row row) {
178179
}
179180
}
180181
break;
182+
case ProtocolConstants.DataType.TUPLE: {
183+
TupleValue tupleValue = row.getTupleValue(cm.getName());
184+
genericRecordBuilder.put(fieldName, buildTupleValue(tupleValue));
185+
}
186+
break;
181187
default:
182188
log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true));
183189
}

0 commit comments

Comments
 (0)