Skip to content

Commit 30a2999

Browse files
msmygitMadhavan
andauthored
STREAM-770 Add support for Tuple CQL data type (#69)
* dependencies update * STREAM-770: Add support for Tuple cql data type * Remove provided and test scopes for org.apache.pulsar:pulsar-client-api to avoid warnings * Revert dsbulk upgrade to 1.11.0 to see if it would run tests okay * Redo dsbulk version upgrade * Attempt to add more unit tests * Tweak the nested tuple type unit tests * Add support for nested tuple types scenario --------- Co-authored-by: Madhavan <cxo@ibm.com>
1 parent 7345b26 commit 30a2999

File tree

14 files changed

+617
-60
lines changed

14 files changed

+617
-60
lines changed

.github/dependabot.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
version: 2
2+
updates:
3+
- package-ecosystem: "github-actions"
4+
directory: "/"
5+
schedule:
6+
interval: "weekly"
7+
groups:
8+
github-actions:
9+
patterns:
10+
- "*"

.github/workflows/ci-integration.yml

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,33 @@ jobs:
1818
- "datastax/lunastreaming:2.10_2.3"
1919
- "apachepulsar/pulsar:2.10.2"
2020
cassandra:
21-
- cassandra_version: 3.0.27
22-
ccm_version: 3.0.27
21+
- cassandra_version: 3.0.32
22+
ccm_version: 3.0.32
2323
is_dse: false
24-
- cassandra_version: 3.11.13
25-
ccm_version: 3.11.13
24+
- cassandra_version: 3.11.19
25+
ccm_version: 3.11.19
2626
is_dse: false
27-
- cassandra_version: 4.0.4
28-
ccm_version: 4.0.4
27+
- cassandra_version: 4.0.19
28+
ccm_version: 4.0.19
2929
is_dse: false
3030
- cassandra_version: dse-5.1
31-
ccm_version: 5.1.31
32-
is_dse: true
33-
- cassandra_version: dse-6.0
34-
ccm_version: 6.0.18
35-
is_dse: true
36-
- cassandra_version: dse-6.7
37-
ccm_version: 6.7.17
31+
ccm_version: 5.1.48
3832
is_dse: true
33+
# DSE 6.0 and 6.7 are commented out because they are EOL and no longer supported by DataStax/IBM
34+
# - cassandra_version: dse-6.0
35+
# ccm_version: 6.0.18
36+
# is_dse: true
37+
# - cassandra_version: dse-6.7
38+
# ccm_version: 6.7.17
39+
# is_dse: true
3940
- cassandra_version: dse-6.8
40-
ccm_version: 6.8.24
41+
ccm_version: 6.8.61
4142
is_dse: true
4243

4344
steps:
44-
- uses: actions/checkout@v2
45+
- uses: actions/checkout@v6
4546
- name: Set up JDK 8
46-
uses: actions/setup-java@v2
47+
uses: actions/setup-java@v5
4748
with:
4849
java-version: 8
4950
distribution: 'temurin'
@@ -52,7 +53,7 @@ jobs:
5253
export JDK8_PATH=$JAVA_HOME
5354
echo "JDK8_PATH=$JDK8_PATH" >> $GITHUB_ENV
5455
- name: Set up JDK 11
55-
uses: actions/setup-java@v2
56+
uses: actions/setup-java@v5
5657
with:
5758
java-version: 11
5859
distribution: 'temurin'
@@ -67,7 +68,7 @@ jobs:
6768
psutil
6869
""" > requirements.txt
6970
- name: Set up Python
70-
uses: MatteoH2O1999/setup-python@v1
71+
uses: MatteoH2O1999/setup-python@v6
7172
with:
7273
python-version: '2.7'
7374
cache: 'pip'
@@ -91,7 +92,7 @@ jobs:
9192
#!/bin/bash
9293
pip install ccm
9394
- name: Cache local Maven repository
94-
uses: actions/cache@v3
95+
uses: actions/cache@v5
9596
with:
9697
path: ~/.m2/repository
9798
key: ${{ runner.os }}-maven-${{ hashFiles('**/pom.xml') }}

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<java.release.version>8</java.release.version>
4545
<caffeine.version>2.6.2</caffeine.version>
4646
<oss.driver.version>4.16.0</oss.driver.version>
47-
<dsbulk.version>1.10.0</dsbulk.version>
47+
<dsbulk.version>1.11.0</dsbulk.version>
4848
<reactive-streams.version>1.0.3</reactive-streams.version>
4949
<pulsar.version>2.8.2</pulsar.version>
5050
<slf4j.version>1.7.25</slf4j.version>

pulsar-impl/pom.xml

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,6 @@
121121
<artifactId>pulsar-functions-api</artifactId>
122122
<scope>provided</scope>
123123
</dependency>
124-
<dependency>
125-
<groupId>org.apache.pulsar</groupId>
126-
<artifactId>pulsar-client-api</artifactId>
127-
<scope>provided</scope>
128-
</dependency>
129-
<dependency>
130-
<groupId>org.apache.pulsar</groupId>
131-
<artifactId>pulsar-client-api</artifactId>
132-
<scope>test</scope>
133-
</dependency>
134124
<dependency>
135125
<groupId>org.apache.pulsar</groupId>
136126
<artifactId>pulsar-common</artifactId>
@@ -206,7 +196,7 @@
206196
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
207197
<resource>reference.conf</resource>
208198
</transformer>
209-
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
199+
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
210200
</transformers>
211201
<relocations>
212202
<relocation>

pulsar-impl/src/main/java/com/datastax/oss/pulsar/sink/codecs/PulsarCodecProvider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import com.datastax.oss.common.sink.AbstractStruct;
1919
import com.datastax.oss.driver.api.core.type.DataType;
20+
import com.datastax.oss.driver.api.core.type.TupleType;
2021
import com.datastax.oss.driver.api.core.type.UserDefinedType;
2122
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
2223
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodec;
@@ -36,6 +37,14 @@ public class PulsarCodecProvider implements ConvertingCodecProvider {
3637
@NonNull GenericType<?> externalJavaType,
3738
@NonNull ConvertingCodecFactory codecFactory,
3839
boolean rootCodec) {
40+
// Handle Tuple types
41+
if (cqlType instanceof TupleType
42+
&& (externalJavaType.equals(GenericType.of(PulsarStruct.class))
43+
|| externalJavaType.equals(GenericType.of(AbstractStruct.class)))) {
44+
return Optional.of(new StructToTupleCodec(codecFactory, (TupleType) cqlType));
45+
}
46+
47+
// Handle UDT types
3948
if (cqlType instanceof UserDefinedType
4049
&& (externalJavaType.equals(GenericType.of(PulsarStruct.class))
4150
|| externalJavaType.equals(GenericType.of(AbstractStruct.class)))) {
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright DataStax, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.datastax.oss.pulsar.sink.codecs;
17+
18+
import com.datastax.oss.common.sink.AbstractField;
19+
import com.datastax.oss.common.sink.AbstractSchema;
20+
import com.datastax.oss.common.sink.record.StructDataMetadata;
21+
import com.datastax.oss.driver.api.core.data.TupleValue;
22+
import com.datastax.oss.driver.api.core.type.DataType;
23+
import com.datastax.oss.driver.api.core.type.TupleType;
24+
import com.datastax.oss.driver.api.core.type.reflect.GenericType;
25+
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodec;
26+
import com.datastax.oss.dsbulk.codecs.api.ConvertingCodecFactory;
27+
import com.datastax.oss.sink.pulsar.PulsarStruct;
28+
import java.util.List;
29+
import java.util.Set;
30+
import java.util.stream.Collectors;
31+
32+
/** Codec to convert a Pulsar Struct to a CQL Tuple. */
33+
public class StructToTupleCodec extends ConvertingCodec<PulsarStruct, TupleValue> {
34+
35+
private final ConvertingCodecFactory codecFactory;
36+
private final TupleType tupleType;
37+
38+
StructToTupleCodec(ConvertingCodecFactory codecFactory, TupleType cqlType) {
39+
super(codecFactory.getCodecRegistry().codecFor(cqlType), PulsarStruct.class);
40+
this.codecFactory = codecFactory;
41+
this.tupleType = cqlType;
42+
}
43+
44+
@Override
45+
public TupleValue externalToInternal(PulsarStruct external) {
46+
if (external == null) {
47+
return null;
48+
}
49+
50+
List<DataType> componentTypes = tupleType.getComponentTypes();
51+
int size = componentTypes.size();
52+
AbstractSchema schema = external.schema();
53+
StructDataMetadata structMetadata = new StructDataMetadata(schema);
54+
55+
Set<String> structFieldNames =
56+
schema.fields().stream().map(AbstractField::name).collect(Collectors.toSet());
57+
58+
if (structFieldNames.size() != size) {
59+
throw new IllegalArgumentException(
60+
String.format("Expecting %d tuple fields, got %d", size, structFieldNames.size()));
61+
}
62+
63+
TupleValue tupleValue = tupleType.newValue();
64+
65+
// Tuple fields are named index_0, index_1, etc. following CDC convention
66+
for (int i = 0; i < size; i++) {
67+
String fieldName = "index_" + i;
68+
DataType componentType = componentTypes.get(i);
69+
70+
// Check if field exists in the struct
71+
if (!schema.fields().stream().anyMatch(f -> f.name().equals(fieldName))) {
72+
throw new IllegalArgumentException(
73+
String.format(
74+
"Field %s not found in input struct for tuple component %d", fieldName, i));
75+
}
76+
77+
@SuppressWarnings("unchecked")
78+
GenericType<Object> fieldType =
79+
(GenericType<Object>) structMetadata.getFieldType(fieldName, componentType);
80+
ConvertingCodec<Object, Object> fieldCodec =
81+
codecFactory.createConvertingCodec(componentType, fieldType, false);
82+
Object fieldValue = external.get(fieldName);
83+
Object convertedValue = fieldCodec.externalToInternal(fieldValue);
84+
tupleValue = tupleValue.set(i, convertedValue, fieldCodec.getInternalJavaType());
85+
}
86+
87+
return tupleValue;
88+
}
89+
90+
@Override
91+
public PulsarStruct internalToExternal(TupleValue internal) {
92+
if (internal == null) {
93+
return null;
94+
}
95+
throw new UnsupportedOperationException(
96+
"This codec does not support converting from TupleValue to Struct");
97+
}
98+
}

pulsar-impl/src/main/java/com/datastax/oss/sink/pulsar/AvroTypeUtil.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_DECIMAL;
1919
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_DURATION;
20+
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_TUPLE;
2021
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.CQL_VARINT;
2122
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.DATE;
2223
import static com.datastax.oss.sink.pulsar.CqlLogicalTypes.TIMESTAMP_MILLIS;
@@ -220,11 +221,13 @@ public static void enableDecodeCDCDataTypes(boolean decodeCDCDataTypes) {
220221
LogicalTypes.register(CQL_DECIMAL, schema -> new CqlLogicalTypes.CqlDecimalLogicalType());
221222
LogicalTypes.register(CQL_DURATION, schema -> new CqlLogicalTypes.CqlDurationLogicalType());
222223
LogicalTypes.register(CQL_VARINT, schema -> new CqlLogicalTypes.CqlVarintLogicalType());
224+
LogicalTypes.register(CQL_TUPLE, schema -> new CqlLogicalTypes.CqlTupleLogicalType());
223225

224226
// Register logical type converters
225227
logicalTypeConverters.put(CQL_DECIMAL, new CqlLogicalTypes.CqlDecimalConversion());
226228
logicalTypeConverters.put(CQL_DURATION, new CqlLogicalTypes.CqlDurationConversion());
227229
logicalTypeConverters.put(CQL_VARINT, new CqlLogicalTypes.CqlVarintConversion());
230+
logicalTypeConverters.put(CQL_TUPLE, new CqlLogicalTypes.CqlTupleConversion());
228231
logicalTypeConverters.put(DATE, new CqlLogicalTypes.DateConversion());
229232
logicalTypeConverters.put(TIME_MICROS, new CqlLogicalTypes.TimeConversion());
230233
logicalTypeConverters.put(TIMESTAMP_MILLIS, new CqlLogicalTypes.TimestampConversion());
@@ -235,11 +238,68 @@ public static void enableDecodeCDCDataTypes(boolean decodeCDCDataTypes) {
235238
logicalTypeConverters.remove(CQL_DECIMAL);
236239
logicalTypeConverters.remove(CQL_DURATION);
237240
logicalTypeConverters.remove(CQL_VARINT);
241+
logicalTypeConverters.remove(CQL_TUPLE);
238242
logicalTypeConverters.remove(DATE);
239243
logicalTypeConverters.remove(TIME_MICROS);
240244
logicalTypeConverters.remove(TIMESTAMP_MILLIS);
241245
}
242246

243247
AvroTypeUtil.decodeCDCDataTypes = decodeCDCDataTypes;
244248
}
249+
250+
/**
251+
* Check if a GenericRecord represents a tuple by examining its schema name. Tuples follow the CDC
252+
* naming convention: Tuple_<hashcode>
253+
*/
254+
public static boolean isTupleRecord(GenericRecord record) {
255+
if (record == null) {
256+
return false;
257+
}
258+
Object nativeObject = record.getNativeObject();
259+
if (nativeObject instanceof org.apache.avro.generic.GenericRecord) {
260+
org.apache.avro.generic.GenericRecord avroRecord =
261+
(org.apache.avro.generic.GenericRecord) nativeObject;
262+
return avroRecord.getSchema().getName().startsWith("Tuple_");
263+
}
264+
return false;
265+
}
266+
267+
/** Check if an Avro schema represents a tuple type. */
268+
public static boolean isTupleSchema(Schema schema) {
269+
if (schema == null) {
270+
return false;
271+
}
272+
// Handle union types
273+
if (schema.isUnion()) {
274+
return schema
275+
.getTypes()
276+
.stream()
277+
.anyMatch(s -> s.getType() == Schema.Type.RECORD && s.getName().startsWith("Tuple_"));
278+
}
279+
// Handle direct record types
280+
return schema.getType() == Schema.Type.RECORD && schema.getName().startsWith("Tuple_");
281+
}
282+
283+
/**
284+
* Check if a schema contains tuple types in its structure. This checks maps, arrays, and nested
285+
* structures.
286+
*/
287+
public static boolean containsTuples(Schema schema) {
288+
if (schema == null) {
289+
return false;
290+
}
291+
292+
switch (schema.getType()) {
293+
case RECORD:
294+
return schema.getName().startsWith("Tuple_");
295+
case ARRAY:
296+
return containsTuples(schema.getElementType());
297+
case MAP:
298+
return containsTuples(schema.getValueType());
299+
case UNION:
300+
return schema.getTypes().stream().anyMatch(AvroTypeUtil::containsTuples);
301+
default:
302+
return false;
303+
}
304+
}
245305
}

pulsar-impl/src/main/java/com/datastax/oss/sink/pulsar/CqlLogicalTypes.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,31 @@ public class CqlLogicalTypes {
3232
public static final String CQL_VARINT = "cql_varint";
3333
public static final String CQL_DECIMAL = "cql_decimal";
3434
public static final String CQL_DURATION = "cql_duration";
35+
public static final String CQL_TUPLE = "cql_tuple";
3536

3637
public static final String DATE = LogicalTypes.date().getName();
3738
public static final String TIME_MICROS = LogicalTypes.timeMicros().getName();
3839
public static final String TIMESTAMP_MILLIS = LogicalTypes.timestampMillis().getName();
3940

41+
public static class CqlTupleConversion extends Conversion<IndexedRecord> {
42+
@Override
43+
public Class<IndexedRecord> getConvertedType() {
44+
return IndexedRecord.class;
45+
}
46+
47+
@Override
48+
public String getLogicalTypeName() {
49+
return CQL_TUPLE;
50+
}
51+
52+
@Override
53+
public IndexedRecord fromRecord(IndexedRecord value, Schema schema, LogicalType type) {
54+
// For tuple support, we return the IndexedRecord as-is
55+
// The actual conversion to TupleValue will be handled by StructToTupleCodec
56+
return value;
57+
}
58+
}
59+
4060
public static class CqlDurationConversion extends Conversion<CqlDuration> {
4161
@Override
4262
public Class<CqlDuration> getConvertedType() {
@@ -165,4 +185,10 @@ public CqlVarintLogicalType() {
165185
super(CQL_VARINT);
166186
}
167187
}
188+
189+
static class CqlTupleLogicalType extends LogicalType {
190+
public CqlTupleLogicalType() {
191+
super(CQL_TUPLE);
192+
}
193+
}
168194
}

0 commit comments

Comments
 (0)