diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 48b95fca..af4fa504 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: needs: build name: Test runs-on: ubuntu-latest - timeout-minutes: 120 + timeout-minutes: 360 strategy: fail-fast: false matrix: diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java index c53ec5e8..ab948117 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/AbstractNativeConverter.java @@ -37,6 +37,7 @@ import org.apache.pulsar.common.schema.SchemaType; import java.net.InetAddress; +import java.time.Instant; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -259,4 +260,35 @@ String stringify(DataType dataType, Object value) { throw new UnsupportedOperationException("Unsupported type="+dataType.getProtocolCode()+" as key in a map"); } } + + /** + * Converts a collection value based on its type. + * If the value is an {@link Instant}, it is converted to its epoch millisecond representation. + * Otherwise, the value is returned as is. + * + * @param collectionValue the value to be marshaled; could be an {@link Instant} or any other object + * @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise + */ + Object marshalCollectionValue(Object collectionValue) { + if(collectionValue instanceof Instant) { + return ((Instant)collectionValue).toEpochMilli(); + } + return collectionValue; + } + + /** + * Converts a collection value based on its type. + * If the value is an {@link Instant}, it is converted to its epoch millisecond representation. + * Otherwise, the value is returned as is. + * + * @param entry the value to be marshaled; + * @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise + */ + Object marshalCollectionValue(Map.Entry entry) { + Object collectionValue = entry.getValue(); + if(collectionValue instanceof Instant) { + return ((Instant)collectionValue).toEpochMilli(); + } + return collectionValue; + } } diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java index 469dae44..4bf20fed 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeAvroConverter.java @@ -57,12 +57,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; +import java.util.*; import java.util.stream.Collectors; /** @@ -146,7 +141,8 @@ public byte[] toConnectData(Row row) { case ProtocolConstants.DataType.LIST: { ListType listType = (ListType) cm.getType(); Schema listSchema = subSchemas.get(fieldName); - List listValue = row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()); + List listValue = Objects.requireNonNull(row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType())) + .stream().map(this::marshalCollectionValue).collect(Collectors.toList()); log.debug("field={} listSchema={} listValue={}", fieldName, listSchema, listValue); genericRecordBuilder.put(fieldName, buildArrayValue(listSchema, listValue)); } @@ -154,7 +150,8 @@ public byte[] toConnectData(Row row) { case ProtocolConstants.DataType.SET: { SetType setType = (SetType) cm.getType(); Schema setSchema = subSchemas.get(fieldName); - Set setValue = row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()); + Set setValue = Objects.requireNonNull(row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType())) + .stream().map(this::marshalCollectionValue).collect(Collectors.toSet()); log.debug("field={} setSchema={} setValue={}", fieldName, setSchema, setValue); genericRecordBuilder.put(fieldName, buildArrayValue(setSchema, setValue)); } @@ -162,10 +159,10 @@ public byte[] toConnectData(Row row) { case ProtocolConstants.DataType.MAP: { MapType mapType = (MapType) cm.getType(); Schema mapSchema = subSchemas.get(fieldName); - Map mapValue = row.getMap(fieldName, + Map mapValue = Objects.requireNonNull(row.getMap(fieldName, CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(), - CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()) - .entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue)); + CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())) + .entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), this::marshalCollectionValue)); log.debug("field={} mapSchema={} mapValue={}", fieldName, mapSchema, mapValue); genericRecordBuilder.put(fieldName, mapValue); } @@ -174,7 +171,7 @@ public byte[] toConnectData(Row row) { if (cm.getType() instanceof CqlVectorType) { Schema vectorSchema = subSchemas.get(fieldName); CqlVector vector = row.getCqlVector(fieldName); - log.debug("field={} listSchema={} listValue={}", fieldName, vectorSchema, vector); + log.debug("field={} vectorSchema={} vectorValue={}", fieldName, vectorSchema, vector); List vectorValue = new ArrayList<>(); vector.getValues().forEach(vectorValue::add); genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue)); diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java index b9fa0803..b5f8359a 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/converters/NativeJsonConverter.java @@ -26,7 +26,6 @@ import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata; import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata; import com.datastax.oss.driver.api.core.type.DataType; -import com.datastax.oss.driver.api.core.type.DataTypes; import com.datastax.oss.driver.api.core.type.ListType; import com.datastax.oss.driver.api.core.type.MapType; import com.datastax.oss.driver.api.core.type.SetType; @@ -52,11 +51,7 @@ import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; @Slf4j @@ -133,7 +128,8 @@ JsonNode toJson(GettableById record, CqlIdentifier identifier, DataType dataType ListType listType = (ListType) dataType; String path = getSubSchemaPath(record, identifier); org.apache.avro.Schema listSchema = subSchemas.get(path); - List listValue = record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()); + List listValue = Objects.requireNonNull(record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType())) + .stream().map(this::marshalCollectionValue).collect(Collectors.toList()); log.debug("field={} listSchema={} listValue={}", identifier, listSchema, listValue); return createArrayNode(listSchema, listValue); } @@ -141,7 +137,8 @@ JsonNode toJson(GettableById record, CqlIdentifier identifier, DataType dataType SetType setType = (SetType) dataType; String path = getSubSchemaPath(record, identifier); org.apache.avro.Schema setSchema = subSchemas.get(path); - Set setValue = record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()); + Set setValue = Objects.requireNonNull(record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType())) + .stream().map(this::marshalCollectionValue).collect(Collectors.toSet()); log.debug("field={} setSchema={} setValue={}", identifier, setSchema, setValue); return createArrayNode(setSchema, setValue); } @@ -149,13 +146,17 @@ JsonNode toJson(GettableById record, CqlIdentifier identifier, DataType dataType MapType mapType = (MapType) dataType; String path = getSubSchemaPath(record, identifier); org.apache.avro.Schema mapSchema = subSchemas.get(path); - Map map = record.getMap(identifier, + Map map = Objects.requireNonNull(record.getMap(identifier, CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(), - CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()) - .entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), e -> toJson(mapSchema.getValueType(), e.getValue()))); + CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())) + .entrySet().stream().collect( + Collectors.toMap( + e -> stringify(mapType.getKeyType(), e.getKey()), + e -> toJson(mapSchema.getValueType(), marshalCollectionValue(e.getValue()))) + ); log.debug("field={} mapSchema={} mapValue={}", identifier, mapSchema, map); ObjectNode objectNode = jsonNodeFactory.objectNode(); - map.forEach((k,v)->objectNode.set(k, v)); + map.forEach(objectNode::set); return objectNode; } default: diff --git a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java index 286e0ce7..337accfc 100644 --- a/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java +++ b/connector/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java @@ -41,6 +41,8 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.client.api.schema.Field; import org.apache.pulsar.client.api.schema.GenericRecord; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonReader; +import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord; import org.apache.pulsar.common.schema.KeyValue; import org.apache.pulsar.common.schema.SchemaType; import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JsonNode; @@ -69,14 +71,7 @@ import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.LocalDate; -import java.util.Collection; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Locale; -import java.util.Map; -import java.util.Optional; -import java.util.Set; +import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -196,6 +191,11 @@ public void testBatchInsert() throws InterruptedException, IOException { testBatchInsert("batchinsert"); } + @Test + public void testTimestampInCollection() throws InterruptedException, IOException { + testTimestampInCollection("ks1"); + } + void deployConnector(String ksName, String tableName) throws IOException, InterruptedException { String config = String.format(Locale.ROOT, "{\"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\": \"%s\", \"%s\":\"%s\", \"%s\":\"%s\"}", CassandraSourceConnectorConfig.CONTACT_POINTS_OPT, "cassandra-1", @@ -989,6 +989,73 @@ public void testBatchInsert(String ksName) throws InterruptedException, IOExcept } } + @SuppressWarnings("unchecked") + public void testTimestampInCollection(String ksName) throws InterruptedException, IOException { + try { + try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) { + cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName + + " WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};"); + cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table7 (a text, b timestamp, c list, d map, e set, PRIMARY KEY(a)) WITH cdc=true"); + cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('1', '1990-04-04 08:52:01.581', ['1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'], {'key113606': '1990-04-04 08:52:01.581'}, {'1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'})"); + cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('2', '1999-11-07 05:30:00.780', ['1999-11-07 05:30:00.780', '1999-11-07 05:30:00.780'], {'key113606': '1999-11-07 05:30:00.780'}, {'1999-11-07 05:30:00.780'})"); + cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('3', '2025-07-17 18:02:01.871', ['2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'], {'key113606': '2025-07-17 18:02:01.871'}, {'2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'})"); + } + deployConnector(ksName, "table7"); + try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) { + try (Consumer consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME()) + .topic(String.format(Locale.ROOT, "data-%s.table7", ksName)) + .subscriptionName("sub1") + .subscriptionType(SubscriptionType.Key_Shared) + .subscriptionMode(SubscriptionMode.Durable) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe()) { + Message msg; + int receivedCount = 0; + List data = Arrays.asList(639219121581L, 941952600780L, 1752775321871L); + while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null && + receivedCount < 4) { + GenericRecord record = msg.getValue(); + assertEquals(this.schemaType, record.getSchemaType()); + Object key = getKey(msg); + GenericRecord value = getValue(record); + // assert key fields + assertEquals(Integer.toString(receivedCount+1) , getAndAssertKeyFieldAsString(key, "a")); + // assert value fields + assertEquals(data.get(receivedCount), value.getField("b")); + if (value.getField("c") instanceof GenericJsonRecord){ + JsonNode arrayNode = ((GenericJsonRecord) value.getField("c")).getJsonNode(); + assertTrue(arrayNode.isArray()); + assertEquals(2, arrayNode.size()); + assertEquals(data.get(receivedCount), arrayNode.get(0).asLong()); + assertEquals(data.get(receivedCount), arrayNode.get(1).asLong()); + } + else { + assertEquals(Arrays.asList(data.get(receivedCount), data.get(receivedCount)), value.getField("c")); + } + if (value instanceof GenericJsonRecord){ + JsonNode arrayNode = ((GenericJsonRecord) value.getField("e")).getJsonNode();; + assertTrue(arrayNode.isArray()); + assertEquals(1, arrayNode.size()); + assertEquals(data.get(receivedCount), arrayNode.get(0).asLong()); + } + else { + assertEquals(Collections.singletonList(data.get(receivedCount)), value.getField("e")); + } + + Map expectedMap = new HashMap<>(); + expectedMap.put("key113606", data.get(receivedCount)); + assertMapsEqual(expectedMap, value.getField("d")); + consumer.acknowledge(msg); + receivedCount++; + } + } + } + } finally { + dumpFunctionLogs("cassandra-source-" + ksName + "-table7"); + undeployConnector(ksName, "table7"); + } + } + @Test public void testReadTimeout() throws InterruptedException, IOException { final String ksName = "ksx"; @@ -1217,6 +1284,45 @@ private String getAndAssertKeyFieldAsString(Object key, String fieldName) { throw new RuntimeException("unknown key type " + key.getClass().getName()); } + @SuppressWarnings("unchecked") + private void assertMapsEqual(Map expected, Object actual) { + if (actual instanceof GenericJsonRecord) { + JsonNode node = ((GenericJsonRecord) actual).getJsonNode(); + assertEquals(expected.size(), node.size(), "Maps have different sizes"); + for (Map.Entry entry : expected.entrySet()) { + assertTrue(node.has(entry.getKey()), "Missing key: " + entry.getKey()); + assertEquals( + expected.get(entry.getKey()), + node.get(entry.getKey()).asLong(), + "Values differ for key: " + entry.getKey() + ); + } + } + else if (actual instanceof Map){ + Map actualMap = (Map) actual; + assertEquals(expected.size(), actualMap.size(), "Maps have different sizes"); + for (Map.Entry entry : expected.entrySet()) { + String expectedKey = entry.getKey(); + assertTrue(actualMap.keySet().stream() + .map(Utf8::toString) + .anyMatch(str -> str.equals(expectedKey)), "Missing key: " + expectedKey); + assertEquals( + expected.get(entry.getKey()), + actualMap.entrySet().stream() + .filter(e -> e.getKey().toString().equals(expectedKey)) + .findFirst() + .map(Map.Entry::getValue) + .orElse(null), + "Values differ for key: " + entry.getKey() + ); + } + } + else { + throw new RuntimeException("Unknown type of GenericRecord: " + actual.getClass().getName()); + } + + } + private void assertKeyFieldIsNull(Object key, String fieldName) { if (key instanceof GenericRecord) { assertNull(((GenericRecord) key).getField(fieldName)); diff --git a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java index fda04663..a08c7249 100644 --- a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java +++ b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java @@ -34,7 +34,7 @@ public class ChaosNetworkContainer> ext public ChaosNetworkContainer(String targetContainer, String pause) { super(PUMBA_IMAGE); - setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer); + setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer); addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE); setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1)); withLogConsumer(o -> {