Skip to content

Commit 3fd76e0

Browse files
authored
Add VectorType support to CDC connector (#170)
* Add VectorType support to CDC connector * Optimize imports
1 parent d974508 commit 3fd76e0

File tree

3 files changed

+29
-1
lines changed

3 files changed

+29
-1
lines changed

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2323
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2424
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
25+
import com.datastax.oss.driver.api.core.type.CqlVectorType;
2526
import com.datastax.oss.driver.api.core.type.DataType;
2627
import com.datastax.oss.driver.api.core.type.ListType;
2728
import com.datastax.oss.driver.api.core.type.MapType;
@@ -74,6 +75,13 @@ public AbstractNativeConverter(KeyspaceMetadata ksm, TableMetadata tm, List<Colu
7475
subSchemas.put(field.name(), collectionSchema);
7576
log.info("Add collection schema {}={}", field.name(), collectionSchema);
7677
break;
78+
case ProtocolConstants.DataType.CUSTOM:
79+
if (cm.getType() instanceof CqlVectorType) {
80+
Schema vectorSchema = dataTypeSchema(ksm, cm.getType());
81+
subSchemas.put(field.name(), vectorSchema);
82+
log.info("Add vector schema {}={}", field.name(), vectorSchema);
83+
}
84+
break;
7785
}
7886
}
7987
}
@@ -121,6 +129,8 @@ boolean isSupportedCqlType(DataType dataType) {
121129
case ProtocolConstants.DataType.SET:
122130
case ProtocolConstants.DataType.MAP:
123131
return true;
132+
case ProtocolConstants.DataType.CUSTOM:
133+
return dataType instanceof CqlVectorType;
124134
}
125135
return false;
126136
}
@@ -189,6 +199,11 @@ Schema dataTypeSchema(KeyspaceMetadata ksm, DataType dataType) {
189199
case ProtocolConstants.DataType.MAP:
190200
MapType mapType = (MapType) dataType;
191201
return org.apache.avro.Schema.createMap(dataTypeSchema(ksm, mapType.getValueType()));
202+
case ProtocolConstants.DataType.CUSTOM:
203+
if (dataType instanceof CqlVectorType) {
204+
CqlVectorType vectorType = (CqlVectorType) dataType;
205+
return org.apache.avro.Schema.createArray(dataTypeSchema(ksm, vectorType.getSubtype()));
206+
}
192207
default:
193208
throw new UnsupportedOperationException("Ignoring unsupported type=" + dataType.asCql(false, true));
194209
}

connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,12 @@
2020
import com.datastax.oss.driver.api.core.cql.ColumnDefinition;
2121
import com.datastax.oss.driver.api.core.cql.Row;
2222
import com.datastax.oss.driver.api.core.data.CqlDuration;
23+
import com.datastax.oss.driver.api.core.data.CqlVector;
2324
import com.datastax.oss.driver.api.core.data.UdtValue;
2425
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
2526
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2627
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
28+
import com.datastax.oss.driver.api.core.type.CqlVectorType;
2729
import com.datastax.oss.driver.api.core.type.DataType;
2830
import com.datastax.oss.driver.api.core.type.ListType;
2931
import com.datastax.oss.driver.api.core.type.MapType;
@@ -168,6 +170,17 @@ public byte[] toConnectData(Row row) {
168170
genericRecordBuilder.put(fieldName, mapValue);
169171
}
170172
break;
173+
case ProtocolConstants.DataType.CUSTOM: {
174+
if (cm.getType() instanceof CqlVectorType) {
175+
Schema vectorSchema = subSchemas.get(fieldName);
176+
CqlVector<?> vector = row.getCqlVector(fieldName);
177+
log.debug("field={} listSchema={} listValue={}", fieldName, vectorSchema, vector);
178+
List<Object> vectorValue = new ArrayList<>();
179+
vector.getValues().forEach(vectorValue::add);
180+
genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue));
181+
}
182+
}
183+
break;
171184
default:
172185
log.debug("Ignoring unsupported column name={} type={}", cm.getName(), cm.getType().asCql(false, true));
173186
}

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ releasesRepoUrl=https://repo.datastax.com/artifactory/datastax-public-releases-l
99
# deps version
1010
avroVersion=1.10.2
1111
lombokVersion=1.18.20
12-
ossDriverVersion=4.15.0
12+
ossDriverVersion=4.16.0
1313
cassandra3Version=3.11.10
1414
cassandra4Version=4.0.4
1515
dse4Version=6.8.23

0 commit comments

Comments
 (0)