Skip to content

Commit 0a5317e

Browse files
Added map case of tuples and corrected spacing
1 parent 9fe522d commit 0a5317e

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,7 @@ Schema buildUDTSchema(KeyspaceMetadata ksm, String typeName, boolean optional) {
245245
Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleType, boolean optional) {
246246
List<Schema.Field> fieldSchemas = new ArrayList<>();
247247
int i = 0;
248-
for(DataType componentType : tupleType.getComponentTypes()) {
248+
for (DataType componentType : tupleType.getComponentTypes()) {
249249
String fieldName = "index_" + i;
250250
Schema.Field fieldSchema = fieldSchema(ksm, fieldName, componentType, optional);
251251
if (fieldSchema != null) {
@@ -256,7 +256,7 @@ Schema buildTupleSchema(KeyspaceMetadata ksm, String typeName, TupleType tupleTy
256256
i++;
257257
}
258258
Schema tupleSchema = Schema.createRecord("Tuple_" + Integer.toHexString(
259-
tupleType.asCql(false, true).hashCode()
259+
tupleType.asCql(false, true).hashCode()
260260
), "CQL type " + typeName, ksm.getName().asInternal(), false, fieldSchemas);
261261
subSchemas.put(typeName, tupleSchema);
262262
return tupleSchema;
@@ -303,10 +303,10 @@ String stringify(DataType dataType, Object value) {
303303
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
304304
*/
305305
Object marshalCollectionValue(Object collectionValue) {
306-
if(collectionValue instanceof Instant) {
307-
return ((Instant)collectionValue).toEpochMilli();
306+
if (collectionValue instanceof Instant) {
307+
return ((Instant) collectionValue).toEpochMilli();
308308
}
309-
if(collectionValue instanceof TupleValue) {
309+
if (collectionValue instanceof TupleValue) {
310310
return buildTupleValue((TupleValue) collectionValue);
311311
}
312312
return collectionValue;
@@ -322,10 +322,10 @@ Object marshalCollectionValue(Object collectionValue) {
322322
*/
323323
Object marshalCollectionValue(Map.Entry<? super Object, ? super Object> entry) {
324324
Object collectionValue = entry.getValue();
325-
if(collectionValue instanceof Instant) {
326-
return ((Instant)collectionValue).toEpochMilli();
325+
if (collectionValue instanceof Instant) {
326+
return ((Instant) collectionValue).toEpochMilli();
327327
}
328-
if(collectionValue instanceof TupleValue) {
328+
if (collectionValue instanceof TupleValue) {
329329
return buildTupleValue((TupleValue) collectionValue);
330330
}
331331
return collectionValue;

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -651,17 +651,19 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
651651
// force udt values to be null by populating 1 item, using zudt.newValue() without explicitly setting
652652
// any field to non-null value will cause the udt column itself to be null in the C* table
653653
UdtValue zudtOptionalValues = zudt.newValue(dataSpecMap.get("text").cqlValue);
654-
TupleType tupleType = DataTypes.tupleOf(DataTypes.INT, DataTypes.TEXT);
655-
TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("int").cqlValue, dataSpecMap.get("text").cqlValue);
654+
TupleType tupleType = DataTypes.tupleOf(DataTypes.TEXT, DataTypes.TEXT, DataTypes.BIGINT, DataTypes.DOUBLE, DataTypes.TEXT);
655+
TupleValue tupleValue = tupleType.newValue(dataSpecMap.get("text").cqlValue, dataSpecMap.get("text").cqlValue, dataSpecMap.get("bigint").cqlValue, dataSpecMap.get("double").cqlValue, dataSpecMap.get("text").cqlValue);
656+
Map<String, TupleValue> mapOfTuple = new HashMap<>();
657+
mapOfTuple.put("a", tupleValue);
656658
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table3 (" +
657659
"xtext text, xascii ascii, xboolean boolean, xblob blob, xtimestamp timestamp, xtime time, xdate date, xuuid uuid, xtimeuuid timeuuid, xtinyint tinyint, xsmallint smallint, xint int, xbigint bigint, xvarint varint, xdecimal decimal, xdouble double, xfloat float, xinet4 inet, xinet6 inet, " +
658-
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ytuple frozen<tuple<int, text>>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
660+
"ytext text, yascii ascii, yboolean boolean, yblob blob, ytimestamp timestamp, ytime time, ydate date, yuuid uuid, ytimeuuid timeuuid, ytinyint tinyint, ysmallint smallint, yint int, ybigint bigint, yvarint varint, ydecimal decimal, ydouble double, yfloat float, yinet4 inet, yinet6 inet, yduration duration, yudt zudt, yudtoptional zudt, ylist list<text>, yset set<int>, ymap map<text, double>, ytuple frozen<tuple<text, text, bigint, double, text>>, ymapoftuple map<text, frozen<tuple<text, text, bigint, double, text>>>, ylistofmap list<frozen<map<text,double>>>, ysetofudt set<frozen<zudt>>," +
659661
"primary key (xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6)) " +
660662
"WITH CLUSTERING ORDER BY (xascii ASC, xboolean DESC, xblob ASC, xtimestamp DESC, xtime DESC, xdate ASC, xuuid DESC, xtimeuuid ASC, xtinyint DESC, xsmallint ASC, xint DESC, xbigint ASC, xvarint DESC, xdecimal ASC, xdouble DESC, xfloat ASC, xinet4 ASC, xinet6 DESC) AND cdc=true");
661663
cqlSession.execute("INSERT INTO " + ksName + ".table3 (" +
662664
"xtext, xascii, xboolean, xblob, xtimestamp, xtime, xdate, xuuid, xtimeuuid, xtinyint, xsmallint, xint, xbigint, xvarint, xdecimal, xdouble, xfloat, xinet4, xinet6, " +
663-
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ytuple, ylistofmap, ysetofudt" +
664-
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?,?)",
665+
"ytext, yascii, yboolean, yblob, ytimestamp, ytime, ydate, yuuid, ytimeuuid, ytinyint, ysmallint, yint, ybigint, yvarint, ydecimal, ydouble, yfloat, yinet4, yinet6, yduration, yudt, yudtoptional, ylist, yset, ymap, ytuple, ymapoftuple, ylistofmap, ysetofudt" +
666+
") VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?, ?,?,?, ?,?,?,?,?,?,?)",
665667
dataSpecMap.get("text").cqlValue,
666668
dataSpecMap.get("ascii").cqlValue,
667669
dataSpecMap.get("boolean").cqlValue,
@@ -710,6 +712,7 @@ public void testSchema(String ksName) throws InterruptedException, IOException {
710712
dataSpecMap.get("set").cqlValue,
711713
dataSpecMap.get("map").cqlValue,
712714
tupleValue,
715+
mapOfTuple,
713716
dataSpecMap.get("listofmap").cqlValue,
714717
ImmutableSet.of(zudtValue, zudtValue)
715718
);
@@ -766,6 +769,11 @@ void assertGenericMap(String field, Map<Utf8, Object> gm) {
766769
for(Map.Entry<String, Object> entry : expectedMap.entrySet())
767770
Assert.assertEquals(expectedMap.get(entry.getKey()), actualMap.get(entry.getKey()));
768771
return;
772+
case "mapoftuple":
773+
log.debug("field={} gm={}", field, gm);
774+
Assert.assertEquals("Incorrect size of map", gm.size(), 1);
775+
assertAvroTupleRecord((GenericData.Record) gm.get(new Utf8("a")));
776+
return;
769777
}
770778
Assert.assertTrue("Unexpected field="+field, false);
771779
}
@@ -817,14 +825,20 @@ void assertField(String fieldName, Object value) {
817825
if (fieldName.startsWith("index_")) {
818826
int idx = Integer.parseInt(fieldName.substring("index_".length()));
819827
if (idx == 0) {
820-
Assert.assertEquals(dataSpecMap.get("int").avroValue, value);
828+
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
821829
} else if (idx == 1) {
822830
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
831+
} else if (idx == 2) {
832+
Assert.assertEquals(dataSpecMap.get("bigint").avroValue, value);
833+
} else if (idx == 3) {
834+
Assert.assertEquals(dataSpecMap.get("double").avroValue, value);
835+
} else if (idx == 4) {
836+
Assert.assertEquals(dataSpecMap.get("text").avroValue, value);
823837
}
824838
return;
825839
}
826840
String vKey = fieldName.substring(1);
827-
if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt") && !vKey.equals("tuple")) {
841+
if (!vKey.equals("udt") && !vKey.equals("udtoptional") && ! vKey.equals("setofudt") && !vKey.equals("tuple") && !vKey.equals("mapoftuple")) {
828842
Assert.assertTrue("Unknown field " + vKey, dataSpecMap.containsKey(vKey));
829843
}
830844
if (value instanceof GenericRecord) {
@@ -897,6 +911,15 @@ void assertGenericRecords(String field, GenericRecord gr) {
897911
Assert.assertTrue("Unexpected field="+field, false);
898912
}
899913

914+
// tuple<text, text, bigint, double, text>
915+
void assertAvroTupleRecord(GenericData.Record record) {
916+
Assert.assertEquals(dataSpecMap.get("text").avroValue, record.get(0).toString());
917+
Assert.assertEquals(dataSpecMap.get("text").avroValue, record.get(1).toString());
918+
Assert.assertEquals(dataSpecMap.get("bigint").avroValue, record.get(2));
919+
Assert.assertEquals(dataSpecMap.get("double").avroValue, record.get(3));
920+
Assert.assertEquals(dataSpecMap.get("text").avroValue, record.get(4).toString());
921+
}
922+
900923
@SneakyThrows
901924
void assertJsonNode(String field, JsonNode node) {
902925
switch (field) {

0 commit comments

Comments
 (0)