Skip to content

Commit aa12550

Browse files
Fix - Cassandra CDC Connector Fails with ClassCastException for Timestamp Values in Collection Columns (#200)
* feat(AbstractNativeConverter): add handling for Instant type in collection value marshaling (cherry picked from commit e84e9d7) * refactor(NativeAvroConverter, NativeJsonConverter): streamline collection value marshaling with null safety and custom mapping (cherry picked from commit eb05e8a) * feat(PulsarCassandraSourceTests): add test for timestamp handling in collection fields (cherry picked from commit d443414) * ci: extend test job timeout to 360 minutes (cherry picked from commit 4a94cdb) * refactor(ChaosNetworkContainer): update tc-image to use ghcr.io/alexei-led/pumba-debian-nettools (cherry picked from commit 354338a) --------- Co-authored-by: Arkadip <[email protected]>
1 parent f9ad28d commit aa12550

File tree

6 files changed

+170
-34
lines changed

6 files changed

+170
-34
lines changed

.github/workflows/ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ jobs:
3333
needs: build
3434
name: Test
3535
runs-on: ubuntu-latest
36-
timeout-minutes: 120
36+
timeout-minutes: 360
3737
strategy:
3838
fail-fast: false
3939
matrix:

connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.apache.pulsar.common.schema.SchemaType;
3838

3939
import java.net.InetAddress;
40+
import java.time.Instant;
4041
import java.util.ArrayList;
4142
import java.util.HashMap;
4243
import java.util.List;
@@ -259,4 +260,35 @@ String stringify(DataType dataType, Object value) {
259260
throw new UnsupportedOperationException("Unsupported type="+dataType.getProtocolCode()+" as key in a map");
260261
}
261262
}
263+
264+
/**
265+
* Converts a collection value based on its type.
266+
* If the value is an {@link Instant}, it is converted to its epoch millisecond representation.
267+
* Otherwise, the value is returned as is.
268+
*
269+
* @param collectionValue the value to be marshaled; could be an {@link Instant} or any other object
270+
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
271+
*/
272+
Object marshalCollectionValue(Object collectionValue) {
273+
if(collectionValue instanceof Instant) {
274+
return ((Instant)collectionValue).toEpochMilli();
275+
}
276+
return collectionValue;
277+
}
278+
279+
/**
280+
* Converts a collection value based on its type.
281+
* If the value is an {@link Instant}, it is converted to its epoch millisecond representation.
282+
* Otherwise, the value is returned as is.
283+
*
284+
* @param entry the value to be marshaled;
285+
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
286+
*/
287+
Object marshalCollectionValue(Map.Entry<? super Object, ? super Object> entry) {
288+
Object collectionValue = entry.getValue();
289+
if(collectionValue instanceof Instant) {
290+
return ((Instant)collectionValue).toEpochMilli();
291+
}
292+
return collectionValue;
293+
}
262294
}

connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,7 @@
5757
import java.time.Instant;
5858
import java.time.LocalDate;
5959
import java.time.LocalTime;
60-
import java.util.ArrayList;
61-
import java.util.Collection;
62-
import java.util.List;
63-
import java.util.Map;
64-
import java.util.Set;
65-
import java.util.UUID;
60+
import java.util.*;
6661
import java.util.stream.Collectors;
6762

6863
/**
@@ -146,26 +141,28 @@ public byte[] toConnectData(Row row) {
146141
case ProtocolConstants.DataType.LIST: {
147142
ListType listType = (ListType) cm.getType();
148143
Schema listSchema = subSchemas.get(fieldName);
149-
List listValue = row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
144+
List<Object> listValue = Objects.requireNonNull(row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()))
145+
.stream().map(this::marshalCollectionValue).collect(Collectors.toList());
150146
log.debug("field={} listSchema={} listValue={}", fieldName, listSchema, listValue);
151147
genericRecordBuilder.put(fieldName, buildArrayValue(listSchema, listValue));
152148
}
153149
break;
154150
case ProtocolConstants.DataType.SET: {
155151
SetType setType = (SetType) cm.getType();
156152
Schema setSchema = subSchemas.get(fieldName);
157-
Set setValue = row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
153+
Set<Object> setValue = Objects.requireNonNull(row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()))
154+
.stream().map(this::marshalCollectionValue).collect(Collectors.toSet());
158155
log.debug("field={} setSchema={} setValue={}", fieldName, setSchema, setValue);
159156
genericRecordBuilder.put(fieldName, buildArrayValue(setSchema, setValue));
160157
}
161158
break;
162159
case ProtocolConstants.DataType.MAP: {
163160
MapType mapType = (MapType) cm.getType();
164161
Schema mapSchema = subSchemas.get(fieldName);
165-
Map<String, Object> mapValue = row.getMap(fieldName,
162+
Map<String, Object> mapValue = Objects.requireNonNull(row.getMap(fieldName,
166163
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
167-
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
168-
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue));
164+
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()))
165+
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), this::marshalCollectionValue));
169166
log.debug("field={} mapSchema={} mapValue={}", fieldName, mapSchema, mapValue);
170167
genericRecordBuilder.put(fieldName, mapValue);
171168
}
@@ -174,7 +171,7 @@ public byte[] toConnectData(Row row) {
174171
if (cm.getType() instanceof CqlVectorType) {
175172
Schema vectorSchema = subSchemas.get(fieldName);
176173
CqlVector<?> vector = row.getCqlVector(fieldName);
177-
log.debug("field={} listSchema={} listValue={}", fieldName, vectorSchema, vector);
174+
log.debug("field={} vectorSchema={} vectorValue={}", fieldName, vectorSchema, vector);
178175
List<Object> vectorValue = new ArrayList<>();
179176
vector.getValues().forEach(vectorValue::add);
180177
genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue));

connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
2727
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
2828
import com.datastax.oss.driver.api.core.type.DataType;
29-
import com.datastax.oss.driver.api.core.type.DataTypes;
3029
import com.datastax.oss.driver.api.core.type.ListType;
3130
import com.datastax.oss.driver.api.core.type.MapType;
3231
import com.datastax.oss.driver.api.core.type.SetType;
@@ -52,11 +51,7 @@
5251
import java.math.BigDecimal;
5352
import java.math.BigInteger;
5453
import java.nio.ByteBuffer;
55-
import java.util.Collection;
56-
import java.util.HashMap;
57-
import java.util.List;
58-
import java.util.Map;
59-
import java.util.Set;
54+
import java.util.*;
6055
import java.util.stream.Collectors;
6156

6257
@Slf4j
@@ -133,29 +128,35 @@ JsonNode toJson(GettableById record, CqlIdentifier identifier, DataType dataType
133128
ListType listType = (ListType) dataType;
134129
String path = getSubSchemaPath(record, identifier);
135130
org.apache.avro.Schema listSchema = subSchemas.get(path);
136-
List listValue = record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
131+
List<? super Object> listValue = Objects.requireNonNull(record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()))
132+
.stream().map(this::marshalCollectionValue).collect(Collectors.toList());
137133
log.debug("field={} listSchema={} listValue={}", identifier, listSchema, listValue);
138134
return createArrayNode(listSchema, listValue);
139135
}
140136
case ProtocolConstants.DataType.SET: {
141137
SetType setType = (SetType) dataType;
142138
String path = getSubSchemaPath(record, identifier);
143139
org.apache.avro.Schema setSchema = subSchemas.get(path);
144-
Set setValue = record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
140+
Set<Object> setValue = Objects.requireNonNull(record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()))
141+
.stream().map(this::marshalCollectionValue).collect(Collectors.toSet());
145142
log.debug("field={} setSchema={} setValue={}", identifier, setSchema, setValue);
146143
return createArrayNode(setSchema, setValue);
147144
}
148145
case ProtocolConstants.DataType.MAP: {
149146
MapType mapType = (MapType) dataType;
150147
String path = getSubSchemaPath(record, identifier);
151148
org.apache.avro.Schema mapSchema = subSchemas.get(path);
152-
Map<String, JsonNode> map = record.getMap(identifier,
149+
Map<String, JsonNode> map = Objects.requireNonNull(record.getMap(identifier,
153150
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
154-
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
155-
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), e -> toJson(mapSchema.getValueType(), e.getValue())));
151+
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()))
152+
.entrySet().stream().collect(
153+
Collectors.toMap(
154+
e -> stringify(mapType.getKeyType(), e.getKey()),
155+
e -> toJson(mapSchema.getValueType(), marshalCollectionValue(e.getValue())))
156+
);
156157
log.debug("field={} mapSchema={} mapValue={}", identifier, mapSchema, map);
157158
ObjectNode objectNode = jsonNodeFactory.objectNode();
158-
map.forEach((k,v)->objectNode.set(k, v));
159+
map.forEach(objectNode::set);
159160
return objectNode;
160161
}
161162
default:

connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 114 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.apache.pulsar.client.api.SubscriptionType;
4242
import org.apache.pulsar.client.api.schema.Field;
4343
import org.apache.pulsar.client.api.schema.GenericRecord;
44+
import org.apache.pulsar.client.impl.schema.generic.GenericJsonReader;
45+
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
4446
import org.apache.pulsar.common.schema.KeyValue;
4547
import org.apache.pulsar.common.schema.SchemaType;
4648
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JsonNode;
@@ -69,14 +71,7 @@
6971
import java.nio.charset.StandardCharsets;
7072
import java.time.Duration;
7173
import java.time.LocalDate;
72-
import java.util.Collection;
73-
import java.util.HashMap;
74-
import java.util.Iterator;
75-
import java.util.List;
76-
import java.util.Locale;
77-
import java.util.Map;
78-
import java.util.Optional;
79-
import java.util.Set;
74+
import java.util.*;
8075
import java.util.concurrent.Executors;
8176
import java.util.concurrent.TimeUnit;
8277
import java.util.concurrent.atomic.AtomicInteger;
@@ -196,6 +191,11 @@ public void testBatchInsert() throws InterruptedException, IOException {
196191
testBatchInsert("batchinsert");
197192
}
198193

194+
@Test
195+
public void testTimestampInCollection() throws InterruptedException, IOException {
196+
testTimestampInCollection("ks1");
197+
}
198+
199199
void deployConnector(String ksName, String tableName) throws IOException, InterruptedException {
200200
String config = String.format(Locale.ROOT, "{\"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\": \"%s\", \"%s\":\"%s\", \"%s\":\"%s\"}",
201201
CassandraSourceConnectorConfig.CONTACT_POINTS_OPT, "cassandra-1",
@@ -989,6 +989,73 @@ public void testBatchInsert(String ksName) throws InterruptedException, IOExcept
989989
}
990990
}
991991

992+
@SuppressWarnings("unchecked")
993+
public void testTimestampInCollection(String ksName) throws InterruptedException, IOException {
994+
try {
995+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
996+
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
997+
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
998+
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table7 (a text, b timestamp, c list<timestamp>, d map<text, timestamp>, e set<timestamp>, PRIMARY KEY(a)) WITH cdc=true");
999+
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('1', '1990-04-04 08:52:01.581', ['1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'], {'key113606': '1990-04-04 08:52:01.581'}, {'1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'})");
1000+
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('2', '1999-11-07 05:30:00.780', ['1999-11-07 05:30:00.780', '1999-11-07 05:30:00.780'], {'key113606': '1999-11-07 05:30:00.780'}, {'1999-11-07 05:30:00.780'})");
1001+
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('3', '2025-07-17 18:02:01.871', ['2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'], {'key113606': '2025-07-17 18:02:01.871'}, {'2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'})");
1002+
}
1003+
deployConnector(ksName, "table7");
1004+
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
1005+
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
1006+
.topic(String.format(Locale.ROOT, "data-%s.table7", ksName))
1007+
.subscriptionName("sub1")
1008+
.subscriptionType(SubscriptionType.Key_Shared)
1009+
.subscriptionMode(SubscriptionMode.Durable)
1010+
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
1011+
.subscribe()) {
1012+
Message<GenericRecord> msg;
1013+
int receivedCount = 0;
1014+
List<Long> data = Arrays.asList(639219121581L, 941952600780L, 1752775321871L);
1015+
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
1016+
receivedCount < 4) {
1017+
GenericRecord record = msg.getValue();
1018+
assertEquals(this.schemaType, record.getSchemaType());
1019+
Object key = getKey(msg);
1020+
GenericRecord value = getValue(record);
1021+
// assert key fields
1022+
assertEquals(Integer.toString(receivedCount+1) , getAndAssertKeyFieldAsString(key, "a"));
1023+
// assert value fields
1024+
assertEquals(data.get(receivedCount), value.getField("b"));
1025+
if (value.getField("c") instanceof GenericJsonRecord){
1026+
JsonNode arrayNode = ((GenericJsonRecord) value.getField("c")).getJsonNode();
1027+
assertTrue(arrayNode.isArray());
1028+
assertEquals(2, arrayNode.size());
1029+
assertEquals(data.get(receivedCount), arrayNode.get(0).asLong());
1030+
assertEquals(data.get(receivedCount), arrayNode.get(1).asLong());
1031+
}
1032+
else {
1033+
assertEquals(Arrays.asList(data.get(receivedCount), data.get(receivedCount)), value.getField("c"));
1034+
}
1035+
if (value instanceof GenericJsonRecord){
1036+
JsonNode arrayNode = ((GenericJsonRecord) value.getField("e")).getJsonNode();;
1037+
assertTrue(arrayNode.isArray());
1038+
assertEquals(1, arrayNode.size());
1039+
assertEquals(data.get(receivedCount), arrayNode.get(0).asLong());
1040+
}
1041+
else {
1042+
assertEquals(Collections.singletonList(data.get(receivedCount)), value.getField("e"));
1043+
}
1044+
1045+
Map<String, Object> expectedMap = new HashMap<>();
1046+
expectedMap.put("key113606", data.get(receivedCount));
1047+
assertMapsEqual(expectedMap, value.getField("d"));
1048+
consumer.acknowledge(msg);
1049+
receivedCount++;
1050+
}
1051+
}
1052+
}
1053+
} finally {
1054+
dumpFunctionLogs("cassandra-source-" + ksName + "-table7");
1055+
undeployConnector(ksName, "table7");
1056+
}
1057+
}
1058+
9921059
@Test
9931060
public void testReadTimeout() throws InterruptedException, IOException {
9941061
final String ksName = "ksx";
@@ -1217,6 +1284,45 @@ private String getAndAssertKeyFieldAsString(Object key, String fieldName) {
12171284
throw new RuntimeException("unknown key type " + key.getClass().getName());
12181285
}
12191286

1287+
@SuppressWarnings("unchecked")
1288+
private void assertMapsEqual(Map<String, Object> expected, Object actual) {
1289+
if (actual instanceof GenericJsonRecord) {
1290+
JsonNode node = ((GenericJsonRecord) actual).getJsonNode();
1291+
assertEquals(expected.size(), node.size(), "Maps have different sizes");
1292+
for (Map.Entry<String, Object> entry : expected.entrySet()) {
1293+
assertTrue(node.has(entry.getKey()), "Missing key: " + entry.getKey());
1294+
assertEquals(
1295+
expected.get(entry.getKey()),
1296+
node.get(entry.getKey()).asLong(),
1297+
"Values differ for key: " + entry.getKey()
1298+
);
1299+
}
1300+
}
1301+
else if (actual instanceof Map){
1302+
Map<org.apache.pulsar.shade.org.apache.avro.util.Utf8, Object> actualMap = (Map<org.apache.pulsar.shade.org.apache.avro.util.Utf8, Object>) actual;
1303+
assertEquals(expected.size(), actualMap.size(), "Maps have different sizes");
1304+
for (Map.Entry<String, Object> entry : expected.entrySet()) {
1305+
String expectedKey = entry.getKey();
1306+
assertTrue(actualMap.keySet().stream()
1307+
.map(Utf8::toString)
1308+
.anyMatch(str -> str.equals(expectedKey)), "Missing key: " + expectedKey);
1309+
assertEquals(
1310+
expected.get(entry.getKey()),
1311+
actualMap.entrySet().stream()
1312+
.filter(e -> e.getKey().toString().equals(expectedKey))
1313+
.findFirst()
1314+
.map(Map.Entry::getValue)
1315+
.orElse(null),
1316+
"Values differ for key: " + entry.getKey()
1317+
);
1318+
}
1319+
}
1320+
else {
1321+
throw new RuntimeException("Unknown type of GenericRecord: " + actual.getClass().getName());
1322+
}
1323+
1324+
}
1325+
12201326
private void assertKeyFieldIsNull(Object key, String fieldName) {
12211327
if (key instanceof GenericRecord) {
12221328
assertNull(((GenericRecord) key).getField(fieldName));

testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ public class ChaosNetworkContainer<SELF extends ChaosNetworkContainer<SELF>> ext
3434

3535
public ChaosNetworkContainer(String targetContainer, String pause) {
3636
super(PUMBA_IMAGE);
37-
setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer);
37+
setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer);
3838
addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
3939
setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1));
4040
withLogConsumer(o -> {

0 commit comments

Comments
 (0)