Skip to content

Commit 9392aae

Browse files
committed
Support CQL Vector type (upgrade Core Driver to 4.16.0 and DSBulk to 1.11.0-SNAPSHOT)
1 parent 4e73b72 commit 9392aae

File tree

4 files changed

+33
-8
lines changed

4 files changed

+33
-8
lines changed

common/src/main/java/com/datastax/oss/common/sink/config/TopicConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.oss.common.sink.config;
1717

1818
import com.datastax.oss.common.sink.ConfigException;
19+
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
1920
import com.datastax.oss.driver.shaded.guava.common.base.Splitter;
2021
import com.datastax.oss.dsbulk.codecs.api.ConversionContext;
2122
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
@@ -139,7 +140,7 @@ public String toString() {
139140
}
140141

141142
@NonNull
142-
public ConvertingCodecFactory createCodecFactory() {
143+
public ConvertingCodecFactory createCodecFactory(DefaultCodecRegistry defaultCodecRegistry) {
143144
ConversionContext context =
144145
new TextConversionContext()
145146
.setLocale(
@@ -150,7 +151,7 @@ public ConvertingCodecFactory createCodecFactory() {
150151
.setTimeZone(ZoneId.of(getString(getTopicSettingPath(topicName, TIMEZONE_OPT))))
151152
.setTimeUnit(
152153
TimeUnit.valueOf(getString(getTopicSettingPath(topicName, TIME_UNIT_OPT))));
153-
return new ConvertingCodecFactory(context);
154+
return new ConvertingCodecFactory(defaultCodecRegistry, context);
154155
}
155156

156157
/**

common/src/main/java/com/datastax/oss/common/sink/state/LifeCycleManager.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,23 @@
5353
import com.datastax.oss.driver.api.core.metadata.schema.ColumnMetadata;
5454
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
5555
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
56+
import com.datastax.oss.driver.api.core.type.CqlVectorType;
57+
import com.datastax.oss.driver.api.core.type.DataType;
5658
import com.datastax.oss.driver.api.core.type.DataTypes;
59+
import com.datastax.oss.driver.api.core.type.codec.TypeCodec;
60+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
5761
import com.datastax.oss.driver.internal.core.auth.PlainTextAuthProvider;
5862
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultDriverConfigLoader;
5963
import com.datastax.oss.driver.internal.core.config.typesafe.DefaultProgrammaticDriverConfigLoaderBuilder;
64+
import com.datastax.oss.driver.internal.core.type.codec.CqlVectorCodec;
65+
import com.datastax.oss.driver.internal.core.type.codec.registry.DefaultCodecRegistry;
6066
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
6167
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
6268
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
6369
import com.typesafe.config.Config;
6470
import com.typesafe.config.ConfigFactory;
6571
import edu.umd.cs.findbugs.annotations.NonNull;
72+
import edu.umd.cs.findbugs.annotations.Nullable;
6673
import java.net.InetSocketAddress;
6774
import java.nio.file.Path;
6875
import java.util.Collection;
@@ -89,6 +96,21 @@ public class LifeCycleManager {
8996
private static final ConcurrentMap<String, InstanceState> INSTANCE_STATES =
9097
new ConcurrentHashMap<>();
9198
private static MetricRegistry metricRegistry = new MetricRegistry();
99+
private static final DefaultCodecRegistry CODEC_REGISTRY =
100+
new DefaultCodecRegistry("default-registry") {
101+
102+
protected TypeCodec<?> createCodec(
103+
@Nullable DataType cqlType,
104+
@Nullable GenericType<?> javaType,
105+
boolean isJavaCovariant) {
106+
if (cqlType instanceof CqlVectorType) {
107+
log.info("Automatically Registering codec for CqlVectorType {}", cqlType);
108+
CqlVectorType vectorType = (CqlVectorType) cqlType;
109+
return new CqlVectorCodec<>(vectorType, codecFor(vectorType.getSubtype()));
110+
}
111+
return super.createCodec(cqlType, javaType, isJavaCovariant);
112+
}
113+
};
92114

93115
/** This is a utility class that no one should instantiate. */
94116
private LifeCycleManager() {}
@@ -420,7 +442,8 @@ private static InstanceState buildInstanceState(CqlSession session, CassandraSin
420442
.stream()
421443
.map(
422444
topicConfig -> {
423-
ConvertingCodecFactory codecFactory = topicConfig.createCodecFactory();
445+
ConvertingCodecFactory codecFactory =
446+
topicConfig.createCodecFactory(CODEC_REGISTRY);
424447
TopicState topicState = new TopicState(codecFactory);
425448
topicStates.put(topicConfig.getTopicName(), topicState);
426449

@@ -485,6 +508,7 @@ public static CqlSession buildCqlSession(
485508
SslConfig sslConfig = config.getSslConfig();
486509
CqlSessionBuilder builder =
487510
new SessionBuilder(sslConfig)
511+
.withCodecRegistry(CODEC_REGISTRY)
488512
.withApplicationVersion(version)
489513
.withApplicationName(applicationName)
490514
.withClientId(generateClientId(config.getInstanceName()));

common/src/test/java/com/datastax/oss/common/sink/RecordMapperTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -59,11 +59,11 @@
5959
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
6060
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap;
6161
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableSet;
62+
import com.datastax.oss.dsbulk.codecs.api.format.temporal.CqlTemporalFormat;
63+
import com.datastax.oss.dsbulk.codecs.api.format.temporal.TemporalFormat;
64+
import com.datastax.oss.dsbulk.codecs.api.format.temporal.ZonedTemporalFormat;
6265
import com.datastax.oss.dsbulk.codecs.api.util.CodecUtils;
63-
import com.datastax.oss.dsbulk.codecs.api.util.CqlTemporalFormat;
6466
import com.datastax.oss.dsbulk.codecs.api.util.OverflowStrategy;
65-
import com.datastax.oss.dsbulk.codecs.api.util.TemporalFormat;
66-
import com.datastax.oss.dsbulk.codecs.api.util.ZonedTemporalFormat;
6767
import com.datastax.oss.dsbulk.codecs.text.string.StringToIntegerCodec;
6868
import com.datastax.oss.dsbulk.codecs.text.string.StringToLongCodec;
6969
import com.datastax.oss.protocol.internal.response.result.ColumnSpec;

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
<java.release.version>8</java.release.version>
4141
<kafka.connect.version>2.4.0</kafka.connect.version>
4242
<caffeine.version>2.6.2</caffeine.version>
43-
<oss.driver.version>4.6.0</oss.driver.version>
44-
<dsbulk.version>1.6.0</dsbulk.version>
43+
<oss.driver.version>4.16.0</oss.driver.version>
44+
<dsbulk.version>1.11.0-SNAPSHOT</dsbulk.version>
4545
<reactive-streams.version>1.0.3</reactive-streams.version>
4646
<guava.version>25.1-jre</guava.version>
4747
<slf4j.version>1.7.25</slf4j.version>

0 commit comments

Comments
 (0)