Skip to content

Commit 4b31117

Browse files
Added assertion of json type
1 parent 0a5317e commit 4b31117

File tree

2 files changed

+60
-11
lines changed

2 files changed

+60
-11
lines changed

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractGenericConverter.java

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,13 @@
1818
import com.datastax.oss.driver.api.core.CqlIdentifier;
1919
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
2020
import com.datastax.oss.driver.api.core.cql.Row;
21+
import com.datastax.oss.driver.api.core.data.TupleValue;
2122
import com.datastax.oss.driver.api.core.data.UdtValue;
2223
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2324
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2425
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
2526
import com.datastax.oss.driver.api.core.type.DataType;
27+
import com.datastax.oss.driver.api.core.type.TupleType;
2628
import com.datastax.oss.driver.api.core.type.UserDefinedType;
2729
import com.datastax.oss.protocol.internal.ProtocolConstants;
2830
import com.datastax.oss.pulsar.source.Converter;
@@ -184,6 +186,15 @@ RecordSchemaBuilder addFieldSchema(RecordSchemaBuilder recordSchemaBuilder,
184186
fieldSchemaBuilder.optional().defaultValue(null);
185187
}
186188
break;
189+
case ProtocolConstants.DataType.TUPLE: {
190+
TupleType tupleType = (TupleType) dataType;
191+
FieldSchemaBuilder fieldSchemaBuilder = recordSchemaBuilder
192+
.field(fieldName, buildTupleSchema(ksm, dataType.asCql(false, true), schemaType, tupleType, optional))
193+
.type(schemaType);
194+
if (optional)
195+
fieldSchemaBuilder.optional().defaultValue(null);
196+
}
197+
break;
187198
default:
188199
log.debug("Ignoring unsupported type fields name={} type={}", fieldName, dataType.asCql(false, true));
189200
}
@@ -204,6 +215,19 @@ GenericSchema<GenericRecord> buildUDTSchema(KeyspaceMetadata ksm, String typeNam
204215
return genericSchema;
205216
}
206217

218+
GenericSchema<GenericRecord> buildTupleSchema(KeyspaceMetadata ksm, String typeName, SchemaType schemaType, TupleType tupleType, boolean optional) {
219+
String recordName = "Tuple_" + Integer.toHexString(typeName.hashCode());
220+
RecordSchemaBuilder tupleSchemaBuilder = SchemaBuilder.record(recordName);
221+
int i = 0;
222+
for (DataType componentType : tupleType.getComponentTypes()) {
223+
String fieldName = "index_" + i++;
224+
addFieldSchema(tupleSchemaBuilder, ksm, fieldName, componentType, schemaType, optional);
225+
}
226+
SchemaInfo schemaInfo = tupleSchemaBuilder.build(schemaType);
227+
GenericSchema<GenericRecord> genericSchema = Schema.generic(schemaInfo);
228+
udtSchemas.put(typeName, genericSchema);
229+
return genericSchema;
230+
}
207231
@Override
208232
public Schema<GenericRecord> getSchema() {
209233
return this.schema;
@@ -262,6 +286,9 @@ public GenericRecord toConnectData(Row row) {
262286
case ProtocolConstants.DataType.UDT:
263287
genericRecordBuilder.set(cm.getName().toString(), buildUDTValue(row.getUdtValue(cm.getName())));
264288
break;
289+
case ProtocolConstants.DataType.TUPLE:
290+
genericRecordBuilder.set(cm.getName().toString(), buildTupleValue(row.getTupleValue(cm.getName())));
291+
break;
265292
default:
266293
log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true));
267294
}
@@ -335,6 +362,17 @@ GenericRecord buildUDTValue(UdtValue udtValue) {
335362
return genericRecordBuilder.build();
336363
}
337364

365+
GenericRecord buildTupleValue(TupleValue tupleValue) {
366+
String typeName = tupleValue.getType().asCql(false, true);
367+
GenericSchema<?> genericSchema = udtSchemas.get(typeName);
368+
assert genericSchema != null : "Generic schema not found for Tuple=" + typeName;
369+
GenericRecordBuilder genericRecordBuilder = genericSchema.newRecordBuilder();
370+
for (int i = 0; i < tupleValue.getType().getComponentTypes().size(); i++) {
371+
genericRecordBuilder.set("index_" + i, tupleValue.getObject(i));
372+
}
373+
return genericRecordBuilder.build();
374+
}
375+
338376
/**
339377
* Convert GenericRecord to primary key column values.
340378
*

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -651,13 +651,15 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
651651
// force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting
652652
// any field to non-null value will cause the udt column itself to be null in the C* table
653653
UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue);
654-
TupleType tupleType = DataTypes.tupleOf(DataTypes.TEXT, DataTypes.TEXT, DataTypes.BIGINT, DataTypes.DOUBLE, DataTypes.TEXT);
655-
TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("text").cqlValue, dataSpecMap.get("text").cqlValue, dataSpecMap.get("bigint").cqlValue, dataSpecMap.get("double").cqlValue, dataSpecMap.get("text").cqlValue);
654+
TupleType tupleTypeForMap = DataTypes.tupleOf(DataTypes.TEXT, DataTypes.TEXT, DataTypes.BIGINT, DataTypes.DOUBLE, DataTypes.TEXT);
655+
TupleValue tupleValueForMap = tupleTypeForMap.newValue(dataSpecMap.get("text").cqlValue, dataSpecMap.get("text").cqlValue, dataSpecMap.get("bigint").cqlValue, dataSpecMap.get("double").cqlValue, dataSpecMap.get("text").cqlValue);
656656
Map<String, TupleValue> mapOfTuple = new HashMap<>();
657-
mapOfTuple.put("a", tupleValue);
657+
mapOfTuple.put("a", tupleValueForMap);
658+
TupleType tupleType = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT);
659+
TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("int").cqlValue, dataSpecMap.get("text").cqlValue);
658660
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" +
659661
"xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " +
660-
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ytuple frozen<tuple<text, text, bigint, double, text>>, ymapoftuple map<text, frozen<tuple<text, text, bigint, double, text>>>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
662+
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ytuple frozen<tuple<int, text>>, ymapoftuple map<text, frozen<tuple<text, text, bigint, double, text>>>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
661663
"primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " +
662664
"WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true");
663665
cqlSession.execute("INSERT INTO " + ksName + ".table3 (" +
@@ -825,15 +827,9 @@ void assertField(String fieldName, Object value) {
825827
if (fieldName.startsWith("index_")) {
826828
int idx = Integer.parseInt(fieldName.substring("index_".length()));
827829
if (idx == 0) {
828-
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
830+
Assert.assertEquals(dataSpecMap.get("int").avroValue, value);
829831
} else if (idx == 1) {
830832
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
831-
} else if (idx == 2) {
832-
Assert.assertEquals(dataSpecMap.get("bigint").avroValue, value);
833-
} else if (idx == 3) {
834-
Assert.assertEquals(dataSpecMap.get("double").avroValue, value);
835-
} else if (idx == 4) {
836-
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
837833
}
838834
return;
839835
}
@@ -1036,10 +1032,25 @@ void assertJsonNode(String field, JsonNode node) {
10361032
}
10371033
}
10381034
return;
1035+
case "mapoftuple": {
1036+
for (Iterator<Map.Entry<String, JsonNode>> it = node.fields(); it.hasNext(); ) {
1037+
Map.Entry<String, JsonNode> f = it.next();
1038+
assertJsonTupleRecord(f.getValue());
1039+
}
1040+
}
1041+
return;
10391042
}
10401043
Assert.assertTrue("Unexpected field="+field, false);
10411044
}
10421045

1046+
void assertJsonTupleRecord(JsonNode value) {
1047+
Assert.assertEquals(dataSpecMap.get("text").jsonValue(), value.get("index_0").asText());
1048+
Assert.assertEquals(dataSpecMap.get("text").jsonValue(), value.get("index_1").asText());
1049+
Assert.assertEquals(dataSpecMap.get("bigint").jsonValue(), value.get("index_2").asLong());
1050+
Assert.assertEquals(dataSpecMap.get("double").jsonValue(), value.get("index_3").asDouble());
1051+
Assert.assertEquals(dataSpecMap.get("text").jsonValue(), value.get("index_4").asText());
1052+
}
1053+
10431054
public void testBatchInsert(String ksName) throws InterruptedException, IOException {
10441055
try {
10451056
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {

0 commit comments

Comments
 (0)