Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ jobs:
needs: build
name: Test
runs-on: ubuntu-latest
timeout-minutes: 120
timeout-minutes: 360
strategy:
fail-fast: false
matrix:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.pulsar.common.schema.SchemaType;

import java.net.InetAddress;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -259,4 +260,35 @@ String stringify(DataType dataType, Object value) {
throw new UnsupportedOperationException("Unsupported type="+dataType.getProtocolCode()+" as key in a map");
}
}

/**
* Converts a collection value based on its type.
* If the value is an {@link Instant}, it is converted to its epoch millisecond representation.
* Otherwise, the value is returned as is.
*
* @param collectionValue the value to be marshaled; could be an {@link Instant} or any other object
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
*/
Object marshalCollectionValue(Object collectionValue) {
if(collectionValue instanceof Instant) {
return ((Instant)collectionValue).toEpochMilli();
}
return collectionValue;
}

/**
* Converts a collection value based on its type.
* If the value is an {@link Instant}, it is converted to its epoch millisecond representation.
* Otherwise, the value is returned as is.
*
* @param entry the value to be marshaled;
* @return the marshaled value; an epoch millisecond representation if the input is an {@link Instant}, or the original value otherwise
*/
Object marshalCollectionValue(Map.Entry<? super Object, ? super Object> entry) {
Object collectionValue = entry.getValue();
if(collectionValue instanceof Instant) {
return ((Instant)collectionValue).toEpochMilli();
}
return collectionValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.*;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -146,26 +141,28 @@ public byte[] toConnectData(Row row) {
case ProtocolConstants.DataType.LIST: {
ListType listType = (ListType) cm.getType();
Schema listSchema = subSchemas.get(fieldName);
List listValue = row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
List<Object> listValue = Objects.requireNonNull(row.getList(fieldName, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()))
.stream().map(this::marshalCollectionValue).collect(Collectors.toList());
log.debug("field={} listSchema={} listValue={}", fieldName, listSchema, listValue);
genericRecordBuilder.put(fieldName, buildArrayValue(listSchema, listValue));
}
break;
case ProtocolConstants.DataType.SET: {
SetType setType = (SetType) cm.getType();
Schema setSchema = subSchemas.get(fieldName);
Set setValue = row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
Set<Object> setValue = Objects.requireNonNull(row.getSet(fieldName, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()))
.stream().map(this::marshalCollectionValue).collect(Collectors.toSet());
log.debug("field={} setSchema={} setValue={}", fieldName, setSchema, setValue);
genericRecordBuilder.put(fieldName, buildArrayValue(setSchema, setValue));
}
break;
case ProtocolConstants.DataType.MAP: {
MapType mapType = (MapType) cm.getType();
Schema mapSchema = subSchemas.get(fieldName);
Map<String, Object> mapValue = row.getMap(fieldName,
Map<String, Object> mapValue = Objects.requireNonNull(row.getMap(fieldName,
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), Map.Entry::getValue));
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()))
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), this::marshalCollectionValue));
log.debug("field={} mapSchema={} mapValue={}", fieldName, mapSchema, mapValue);
genericRecordBuilder.put(fieldName, mapValue);
}
Expand All @@ -174,7 +171,7 @@ public byte[] toConnectData(Row row) {
if (cm.getType() instanceof CqlVectorType) {
Schema vectorSchema = subSchemas.get(fieldName);
CqlVector<?> vector = row.getCqlVector(fieldName);
log.debug("field={} listSchema={} listValue={}", fieldName, vectorSchema, vector);
log.debug("field={} vectorSchema={} vectorValue={}", fieldName, vectorSchema, vector);
List<Object> vectorValue = new ArrayList<>();
vector.getValues().forEach(vectorValue::add);
genericRecordBuilder.put(fieldName, buildArrayValue(vectorSchema, vectorValue));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import com.datastax.oss.driver.api.core.metadata.schema.TableMetadata;
import com.datastax.oss.driver.api.core.type.DataType;
import com.datastax.oss.driver.api.core.type.DataTypes;
import com.datastax.oss.driver.api.core.type.ListType;
import com.datastax.oss.driver.api.core.type.MapType;
import com.datastax.oss.driver.api.core.type.SetType;
Expand All @@ -52,11 +51,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

@Slf4j
Expand Down Expand Up @@ -133,29 +128,35 @@ JsonNode toJson(GettableById record, CqlIdentifier identifier, DataType dataType
ListType listType = (ListType) dataType;
String path = getSubSchemaPath(record, identifier);
org.apache.avro.Schema listSchema = subSchemas.get(path);
List listValue = record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType());
List<? super Object> listValue = Objects.requireNonNull(record.getList(identifier, CodecRegistry.DEFAULT.codecFor(listType.getElementType()).getJavaType().getRawType()))
.stream().map(this::marshalCollectionValue).collect(Collectors.toList());
log.debug("field={} listSchema={} listValue={}", identifier, listSchema, listValue);
return createArrayNode(listSchema, listValue);
}
case ProtocolConstants.DataType.SET: {
SetType setType = (SetType) dataType;
String path = getSubSchemaPath(record, identifier);
org.apache.avro.Schema setSchema = subSchemas.get(path);
Set setValue = record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType());
Set<Object> setValue = Objects.requireNonNull(record.getSet(identifier, CodecRegistry.DEFAULT.codecFor(setType.getElementType()).getJavaType().getRawType()))
.stream().map(this::marshalCollectionValue).collect(Collectors.toSet());
log.debug("field={} setSchema={} setValue={}", identifier, setSchema, setValue);
return createArrayNode(setSchema, setValue);
}
case ProtocolConstants.DataType.MAP: {
MapType mapType = (MapType) dataType;
String path = getSubSchemaPath(record, identifier);
org.apache.avro.Schema mapSchema = subSchemas.get(path);
Map<String, JsonNode> map = record.getMap(identifier,
Map<String, JsonNode> map = Objects.requireNonNull(record.getMap(identifier,
CodecRegistry.DEFAULT.codecFor(mapType.getKeyType()).getJavaType().getRawType(),
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType())
.entrySet().stream().collect(Collectors.toMap(e -> stringify(mapType.getKeyType(), e.getKey()), e -> toJson(mapSchema.getValueType(), e.getValue())));
CodecRegistry.DEFAULT.codecFor(mapType.getValueType()).getJavaType().getRawType()))
.entrySet().stream().collect(
Collectors.toMap(
e -> stringify(mapType.getKeyType(), e.getKey()),
e -> toJson(mapSchema.getValueType(), marshalCollectionValue(e.getValue())))
);
log.debug("field={} mapSchema={} mapValue={}", identifier, mapSchema, map);
ObjectNode objectNode = jsonNodeFactory.objectNode();
map.forEach((k,v)->objectNode.set(k, v));
map.forEach(objectNode::set);
return objectNode;
}
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonReader;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.com.fasterxml.jackson.databind.JsonNode;
Expand Down Expand Up @@ -69,14 +71,7 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.*;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -196,6 +191,11 @@ public void testBatchInsert() throws InterruptedException, IOException {
testBatchInsert("batchinsert");
}

@Test
public void testTimestampInCollection() throws InterruptedException, IOException {
testTimestampInCollection("ks1");
}

void deployConnector(String ksName, String tableName) throws IOException, InterruptedException {
String config = String.format(Locale.ROOT, "{\"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\":\"%s\", \"%s\": \"%s\", \"%s\":\"%s\", \"%s\":\"%s\"}",
CassandraSourceConnectorConfig.CONTACT_POINTS_OPT, "cassandra-1",
Expand Down Expand Up @@ -989,6 +989,73 @@ public void testBatchInsert(String ksName) throws InterruptedException, IOExcept
}
}

@SuppressWarnings("unchecked")
public void testTimestampInCollection(String ksName) throws InterruptedException, IOException {
try {
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table7 (a text, b timestamp, c list<timestamp>, d map<text, timestamp>, e set<timestamp>, PRIMARY KEY(a)) WITH cdc=true");
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('1', '1990-04-04 08:52:01.581', ['1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'], {'key113606': '1990-04-04 08:52:01.581'}, {'1990-04-04 08:52:01.581', '1990-04-04 08:52:01.581'})");
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('2', '1999-11-07 05:30:00.780', ['1999-11-07 05:30:00.780', '1999-11-07 05:30:00.780'], {'key113606': '1999-11-07 05:30:00.780'}, {'1999-11-07 05:30:00.780'})");
cqlSession.execute("INSERT INTO " + ksName + ".table7 (a,b,c,d,e) VALUES('3', '2025-07-17 18:02:01.871', ['2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'], {'key113606': '2025-07-17 18:02:01.871'}, {'2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871', '2025-07-17 18:02:01.871'})");
}
deployConnector(ksName, "table7");
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
.topic(String.format(Locale.ROOT, "data-%s.table7", ksName))
.subscriptionName("sub1")
.subscriptionType(SubscriptionType.Key_Shared)
.subscriptionMode(SubscriptionMode.Durable)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()) {
Message<GenericRecord> msg;
int receivedCount = 0;
List<Long> data = Arrays.asList(639219121581L, 941952600780L, 1752775321871L);
while ((msg = consumer.receive(60, TimeUnit.SECONDS)) != null &&
receivedCount < 4) {
GenericRecord record = msg.getValue();
assertEquals(this.schemaType, record.getSchemaType());
Object key = getKey(msg);
GenericRecord value = getValue(record);
// assert key fields
assertEquals(Integer.toString(receivedCount+1) , getAndAssertKeyFieldAsString(key, "a"));
// assert value fields
assertEquals(data.get(receivedCount), value.getField("b"));
if (value.getField("c") instanceof GenericJsonRecord){
JsonNode arrayNode = ((GenericJsonRecord) value.getField("c")).getJsonNode();
assertTrue(arrayNode.isArray());
assertEquals(2, arrayNode.size());
assertEquals(data.get(receivedCount), arrayNode.get(0).asLong());
assertEquals(data.get(receivedCount), arrayNode.get(1).asLong());
}
else {
assertEquals(Arrays.asList(data.get(receivedCount), data.get(receivedCount)), value.getField("c"));
}
if (value instanceof GenericJsonRecord){
JsonNode arrayNode = ((GenericJsonRecord) value.getField("e")).getJsonNode();;
assertTrue(arrayNode.isArray());
assertEquals(1, arrayNode.size());
assertEquals(data.get(receivedCount), arrayNode.get(0).asLong());
}
else {
assertEquals(Collections.singletonList(data.get(receivedCount)), value.getField("e"));
}

Map<String, Object> expectedMap = new HashMap<>();
expectedMap.put("key113606", data.get(receivedCount));
assertMapsEqual(expectedMap, value.getField("d"));
consumer.acknowledge(msg);
receivedCount++;
}
}
}
} finally {
dumpFunctionLogs("cassandra-source-" + ksName + "-table7");
undeployConnector(ksName, "table7");
}
}

@Test
public void testReadTimeout() throws InterruptedException, IOException {
final String ksName = "ksx";
Expand Down Expand Up @@ -1217,6 +1284,45 @@ private String getAndAssertKeyFieldAsString(Object key, String fieldName) {
throw new RuntimeException("unknown key type " + key.getClass().getName());
}

@SuppressWarnings("unchecked")
private void assertMapsEqual(Map<String, Object> expected, Object actual) {
if (actual instanceof GenericJsonRecord) {
JsonNode node = ((GenericJsonRecord) actual).getJsonNode();
assertEquals(expected.size(), node.size(), "Maps have different sizes");
for (Map.Entry<String, Object> entry : expected.entrySet()) {
assertTrue(node.has(entry.getKey()), "Missing key: " + entry.getKey());
assertEquals(
expected.get(entry.getKey()),
node.get(entry.getKey()).asLong(),
"Values differ for key: " + entry.getKey()
);
}
}
else if (actual instanceof Map){
Map<org.apache.pulsar.shade.org.apache.avro.util.Utf8, Object> actualMap = (Map<org.apache.pulsar.shade.org.apache.avro.util.Utf8, Object>) actual;
assertEquals(expected.size(), actualMap.size(), "Maps have different sizes");
for (Map.Entry<String, Object> entry : expected.entrySet()) {
String expectedKey = entry.getKey();
assertTrue(actualMap.keySet().stream()
.map(Utf8::toString)
.anyMatch(str -> str.equals(expectedKey)), "Missing key: " + expectedKey);
assertEquals(
expected.get(entry.getKey()),
actualMap.entrySet().stream()
.filter(e -> e.getKey().toString().equals(expectedKey))
.findFirst()
.map(Map.Entry::getValue)
.orElse(null),
"Values differ for key: " + entry.getKey()
);
}
}
else {
throw new RuntimeException("Unknown type of GenericRecord: " + actual.getClass().getName());
}

}

private void assertKeyFieldIsNull(Object key, String fieldName) {
if (key instanceof GenericRecord) {
assertNull(((GenericRecord) key).getField(fieldName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class ChaosNetworkContainer<SELF extends ChaosNetworkContainer<SELF>> ext

public ChaosNetworkContainer(String targetContainer, String pause) {
super(PUMBA_IMAGE);
setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer);
setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer);
addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE);
setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1));
withLogConsumer(o -> {
Expand Down