diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 48b95fca..af4fa504 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -33,7 +33,7 @@ jobs: needs: build name: Test runs-on: ubuntu-latest - timeout-minutes: 120 + timeout-minutes: 360 strategy: fail-fast: false matrix: diff --git a/connector/build.gradle b/connector/build.gradle index 3b57fe8a..85e60689 100644 --- a/connector/build.gradle +++ b/connector/build.gradle @@ -32,6 +32,7 @@ sourceSets { dependencies { implementation project(':commons') implementation("com.github.ben-manes.caffeine:caffeine:${caffeineVersion}") + implementation("org.rocksdb:rocksdbjni:${rocksdbVersion}") implementation("io.vavr:vavr:${vavrVersion}") implementation "com.datastax.oss:java-driver-core:${ossDriverVersion}" implementation "com.datastax.oss:java-driver-query-builder:${ossDriverVersion}" diff --git a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java index f04d5ffe..6dad70e5 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java +++ b/connector/src/main/java/com/datastax/oss/cdc/CassandraSourceConnectorConfig.java @@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig { public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest"; public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity"; public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms"; + public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory"; + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; @@ -195,6 +197,13 @@ public class CassandraSourceConnectorConfig { ConfigDef.Importance.HIGH, "The maximum number of digest per mutation cache entry, with a default set to 3", "CQL Read cache", 1, ConfigDef.Width.NONE, "CacheMaxDigest") + .define(CACHE_PERSISTENT_DIRECTORY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.HIGH, + "The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` " + + "If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache.", + "CQL Read cache", 1, ConfigDef.Width.NONE, "CachePersistentDirectory") .define(CACHE_MAX_CAPACITY_CONFIG, ConfigDef.Type.LONG, "32767", @@ -661,6 +670,10 @@ public long getCacheMaxDigests() { return globalConfig.getLong(CACHE_MAX_DIGESTS_CONFIG); } + public String getCachePersistentDirectory() { + return globalConfig.getString(CACHE_PERSISTENT_DIRECTORY_CONFIG); + } + public long getCacheMaxCapacity() { return globalConfig.getLong(CACHE_MAX_CAPACITY_CONFIG); } @@ -782,6 +795,7 @@ public String toString() { + " " + QUERY_BACKOFF_IN_MS_CONFIG + ": %d%n" + " " + QUERY_MAX_BACKOFF_IN_SEC_CONFIG + ": %d%n" + " " + CACHE_MAX_DIGESTS_CONFIG + ": %d%n" + + " " + CACHE_PERSISTENT_DIRECTORY_CONFIG + ": %s%n" + " " + CACHE_MAX_CAPACITY_CONFIG + ": %d%n" + " " + CACHE_EXPIRE_AFTER_MS_CONFIG + ": %d%n" + " " + CACHE_ONLY_IF_COORDINATOR_MATCH + ": %s%n" @@ -805,6 +819,7 @@ public String toString() { getQueryBackoffInMs(), getQueryMaxBackoffInSec(), getCacheMaxDigests(), + getCachePersistentDirectory(), getCacheMaxCapacity(), getCacheExpireAfterMs(), getCacheOnlyIfCoordinatorMatch(), diff --git a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java similarity index 90% rename from connector/src/main/java/com/datastax/oss/cdc/MutationCache.java rename to connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java index ff5d4a8c..b7c74006 100644 --- a/connector/src/main/java/com/datastax/oss/cdc/MutationCache.java +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/InMemoryCache.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.datastax.oss.cdc; +package com.datastax.oss.cdc.cache; import com.github.benmanes.caffeine.cache.Cache; import com.github.benmanes.caffeine.cache.Caffeine; @@ -24,19 +24,15 @@ import java.util.List; import java.util.concurrent.TimeUnit; -/** - * Keep MD5 digests to deduplicate Cassandra mutations - */ -public class MutationCache { - +public class InMemoryCache implements MutationCache { Cache> mutationCache; /** - * Max number of cached digest per cached entry. + * Max number of cached digests per cached entry. */ long maxDigests; - public MutationCache(long maxDigests, long maxCapacity, Duration expireAfter) { + public InMemoryCache(long maxDigests, long maxCapacity, Duration expireAfter) { this.maxDigests = maxDigests; mutationCache = Caffeine.newBuilder() .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java new file mode 100644 index 00000000..0d83079f --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/MutationCache.java @@ -0,0 +1,56 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.stats.CacheStats; + +import java.util.List; + +public interface MutationCache { + /** + * Add a mutation MD5 digest to the cache for the given mutation key. + * @param mutationKey the key for the mutation, typically a partition key or a unique identifier + * @param md5Digest the MD5 digest of the mutation to be added + * @return a list of MD5 digests for the given mutation key, which may include the newly added digest + */ + List addMutationMd5(K mutationKey, String md5Digest); + + /** + * Retrieve the list of MD5 digests for the given mutation key. + * @param mutationKey the key for the mutation + * @return a list of MD5 digests associated with the mutation key, or an empty list if none exist + */ + List getMutationCRCs(K mutationKey); + + /** + * Check if a mutation with the given key and MD5 digest has already been processed. + * @param mutationKey the key for the mutation + * @param md5Digest the MD5 digest of the mutation + * @return true if the mutation has been processed, false otherwise + */ + boolean isMutationProcessed(K mutationKey, String md5Digest); + + /** + * Gives the current statistics of the cache, such as hit rate, miss rate, and size. + * Caffeine Statistics wiki + */ + CacheStats stats(); + + /** + * Gives the estimated size of the cache. + */ + long estimatedSize(); +} diff --git a/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java new file mode 100644 index 00000000..544b6845 --- /dev/null +++ b/connector/src/main/java/com/datastax/oss/cdc/cache/PersistentCache.java @@ -0,0 +1,142 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.stats.CacheStats; +import org.rocksdb.Options; +import org.rocksdb.RocksDBException; +import org.rocksdb.TtlDB; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +public class PersistentCache implements MutationCache { + /** + * The mutation cache + */ + private final Cache> mutationCache; + + private final TtlDB rocksDB; + /** + * Max number of cached digests per cached entry. + */ + long maxDigests; + + private final Function keySerializer; + + static { + TtlDB.loadLibrary(); + } + + public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, String dbPath, Function keySerializer) throws RocksDBException { + this.maxDigests = maxDigests; + this.keySerializer = keySerializer; + + Options options = new Options().setCreateIfMissing(true); + this.rocksDB = TtlDB.open(options, dbPath, (int) expireAfter.getSeconds(), false); + + this.mutationCache = Caffeine.newBuilder() + .expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) + .maximumSize(maxCapacity) + .recordStats() + .removalListener((K key, List value, RemovalCause cause) -> { + try { + // If the removal cause is not SIZE, we delete the key from RocksDB + // This is to avoid deleting the key when it is removed due to the size limit + if (cause != RemovalCause.SIZE ) { + rocksDB.delete(this.keySerializer.apply(key)); + } + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }) + .build(); + } + + + private List valueDeserializer(byte[] data){ + return data == null || data.length == 0 ? null : List.of(new String(data).split(",")); + } + + private byte[] valueSerializer(List data){ + return data.stream() + .reduce((s1, s2) -> s1 + "," + s2) + .orElse("") + .getBytes(); + } + + public List getMutationCRCs(K mutationKey) { + return mutationCache.get(mutationKey, k -> { + try { + return valueDeserializer(rocksDB.get(this.keySerializer.apply(k))); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }); + } + + public void putMutationCRCs(K key, List value) { + mutationCache.asMap().compute(key, (k, v) -> { + try { + rocksDB.put(keySerializer.apply(k), valueSerializer(value)); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + return value; + }); + } + + public List addMutationMd5(K mutationKey, String md5Digest) { + List crcs = getMutationCRCs(mutationKey); + if(crcs == null) { + crcs = new ArrayList<>(1); + crcs.add(md5Digest); + } else { + if (!crcs.contains(md5Digest)) { + if (crcs.size() >= maxDigests) { + // remove the oldest digest + crcs.remove(0); + } + crcs.add(md5Digest); + } + } + putMutationCRCs(mutationKey, crcs); + return crcs; + } + + public boolean isMutationProcessed(K mutationKey, String md5Digest) { + List digests = getMutationCRCs(mutationKey); + return digests != null && digests.contains(md5Digest); + } + + public CacheStats stats() { + return mutationCache.stats(); + } + + public long estimatedSize() { + return mutationCache.estimatedSize(); + } + + public void close() { + rocksDB.close(); + } +} diff --git a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java index 21716a59..d32c49cd 100644 --- a/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java +++ b/connector/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java @@ -20,9 +20,11 @@ import com.datastax.oss.cdc.ConfigUtil; import com.datastax.oss.cdc.Constants; import com.datastax.oss.cdc.CqlLogicalTypes; -import com.datastax.oss.cdc.MutationCache; import com.datastax.oss.cdc.MutationValue; import com.datastax.oss.cdc.Version; +import com.datastax.oss.cdc.cache.MutationCache; +import com.datastax.oss.cdc.cache.InMemoryCache; +import com.datastax.oss.cdc.cache.PersistentCache; import com.datastax.oss.driver.api.core.ConsistencyLevel; import com.datastax.oss.driver.api.core.cql.PreparedStatement; import com.datastax.oss.driver.api.core.cql.Row; @@ -66,6 +68,7 @@ import org.apache.pulsar.io.core.SourceContext; import org.apache.pulsar.io.core.annotations.Connector; import org.apache.pulsar.io.core.annotations.IOType; +import org.rocksdb.RocksDBException; import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; @@ -312,10 +315,8 @@ public void open(Map config, SourceContext sourceContext) { consumerBuilder.keySharedPolicy(KeySharedPolicy.autoSplitHashRange()); } this.consumer = consumerBuilder.subscribe(); - this.mutationCache = new MutationCache<>( - this.config.getCacheMaxDigests(), - this.config.getCacheMaxCapacity(), - Duration.ofMillis(this.config.getCacheExpireAfterMs())); + this.mutationCache = this.getMutationCache(); + log.info("Starting source connector topic={} subscription={} query.executors={}", dirtyTopicName, this.config.getEventsSubscriptionName(), @@ -326,6 +327,30 @@ public void open(Map config, SourceContext sourceContext) { } } + /** + * Get the mutation cache implementation. + * @return the mutation cache implementation + * @throws RocksDBException if the mutation cache cannot be created due to RocksDB issues + */ + private MutationCache getMutationCache() throws RocksDBException { + if(this.config.getCachePersistentDirectory() != null){ + return new PersistentCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs()), + this.config.getCachePersistentDirectory() + "/" + + this.sourceContext.getSourceName() + "-" + this.sourceContext.getInstanceId(), + String::getBytes + ); + } + else { + return new InMemoryCache<>( + this.config.getCacheMaxDigests(), + this.config.getCacheMaxCapacity(), + Duration.ofMillis(this.config.getCacheExpireAfterMs())); + } + } + void maybeInitCassandraClient() throws InvocationTargetException, NoSuchMethodException, IllegalAccessException, InstantiationException { if (this.cassandraClient == null) { synchronized (this) { diff --git a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java deleted file mode 100644 index e473875f..00000000 --- a/connector/src/test/java/com/datastax/oss/cdc/MutationCacheTests.java +++ /dev/null @@ -1,54 +0,0 @@ -/** - * Copyright DataStax, Inc 2021. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.datastax.oss.cdc; - -import org.junit.jupiter.api.Test; - -import java.time.Duration; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -public class MutationCacheTests { - - @Test - public final void testMaxDigests() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); - mutationCache.addMutationMd5("mutation1","digest1"); - mutationCache.addMutationMd5("mutation1","digest2"); - mutationCache.addMutationMd5("mutation1","digest3"); - mutationCache.addMutationMd5("mutation1","digest4"); - assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); - } - - @Test - public final void testIsProcessed() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofHours(1)); - assertEquals(false,mutationCache.isMutationProcessed("mutation1","digest1")); - mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); - } - - @Test - public final void testExpireAfter() throws Exception { - MutationCache mutationCache = new MutationCache(3, 10, Duration.ofSeconds(1)); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); - mutationCache.addMutationMd5("mutation1","digest1"); - assertEquals(true, mutationCache.isMutationProcessed("mutation1","digest1")); - Thread.sleep(2000); - assertEquals(false, mutationCache.isMutationProcessed("mutation1","digest1")); - } - -} diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java new file mode 100644 index 00000000..9fa1918c --- /dev/null +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/MutationCacheTests.java @@ -0,0 +1,85 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import com.github.benmanes.caffeine.cache.Caffeine; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.*; + +public class MutationCacheTests { + + @Test + public final void testMaxDigests() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + mutationCache.addMutationMd5("mutation1","digest4"); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testIsProcessed() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + mutationCache.addMutationMd5("mutation1","digest1"); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); + } + + @Test + public final void testExpireAfter() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofSeconds(1)); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + mutationCache.addMutationMd5("mutation1","digest1"); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); + Thread.sleep(2000); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + } + + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new InMemoryCache<>(3, 10, Duration.ofHours(1)); + + // Access and modify the private field using reflection + Field field = InMemoryCache.class.getDeclaredField("mutationCache"); + field.setAccessible(true); + field.set(mutationCache, Caffeine.newBuilder() + .expireAfterWrite(Duration.ofHours(1).getSeconds(), TimeUnit.SECONDS) + .maximumSize(10) + .recordStats() + .executor(Runnable::run) // https://github.com/ben-manes/caffeine/wiki/Testing + .build() + ); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + int count = 0; + for (int i = 0; i < 20; i++) { + if(mutationCache.getMutationCRCs("mutation" + i) != null) { + count++; + } + } + assertEquals(10, count); + } + +} diff --git a/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java new file mode 100644 index 00000000..fbb7f1c5 --- /dev/null +++ b/connector/src/test/java/com/datastax/oss/cdc/cache/PersistentCacheTests.java @@ -0,0 +1,92 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.cdc.cache; + +import org.apache.commons.io.FileUtils; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Random; + +import static org.junit.jupiter.api.Assertions.*; + +public class PersistentCacheTests { + private String path; + + @BeforeEach + public void setUp() { + path = "./rocksdb_mutation_cache_" + (new Random()).nextInt(); + } + + @AfterEach + public void tearDown() { + try { + FileUtils.deleteDirectory(new File(path)); + } catch (Exception e) { + // Ignore any exceptions during cleanup + } + } + + @Test + public final void testMaxDigests() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + mutationCache.addMutationMd5("mutation1","digest4"); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testIsProcessed() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertFalse(mutationCache.isMutationProcessed("mutation1", "digest1")); + mutationCache.addMutationMd5("mutation1","digest1"); + assertTrue(mutationCache.isMutationProcessed("mutation1", "digest1")); + + } + + @Test + public final void testPersistence() throws Exception { + PersistentCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + mutationCache.addMutationMd5("mutation1","digest1"); + mutationCache.addMutationMd5("mutation1","digest2"); + mutationCache.addMutationMd5("mutation1","digest3"); + + mutationCache.close(); + mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + assertEquals(3L, mutationCache.getMutationCRCs("mutation1").size()); + } + + @Test + public final void testMaxCapacity() throws Exception { + MutationCache mutationCache = new PersistentCache<>(3, 10, Duration.ofHours(1), path, String::getBytes); + + for (int i = 0; i <20; i++) { + mutationCache.addMutationMd5("mutation" + i, "digest" + i); + } + + for (int i = 0; i < 20; i++) { + List expected = new java.util.ArrayList<>(); + expected.add("digest" + i); + assertEquals(expected, mutationCache.getMutationCRCs("mutation" + i)); + } + } +} diff --git a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc index 2a29bee3..b9645390 100644 --- a/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc +++ b/docs/docs-src/core/modules/ROOT/partials/cfgCassandraSource.adoc @@ -187,4 +187,10 @@ | | true +| *cache.persistent.directory* +| The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` This setting is only used if the cache is persistent. If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache. +| string +| +| /data/source-cache + |=== diff --git a/gradle.properties b/gradle.properties index 55098650..b9cae411 100644 --- a/gradle.properties +++ b/gradle.properties @@ -24,6 +24,7 @@ kafkaVersion=3.4.0 vavrVersion=0.10.3 testContainersVersion=1.19.1 caffeineVersion=2.8.8 +rocksdbVersion=9.4.0 guavaVersion=30.1-jre messagingConnectorsCommonsVersion=1.0.14 slf4jVersion=1.7.30 diff --git a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java index fda04663..a08c7249 100644 --- a/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java +++ b/testcontainers/src/main/java/com/datastax/testcontainers/ChaosNetworkContainer.java @@ -34,7 +34,7 @@ public class ChaosNetworkContainer> ext public ChaosNetworkContainer(String targetContainer, String pause) { super(PUMBA_IMAGE); - setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer); + setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer); addFileSystemBind("/var/run/docker.sock", "/var/run/docker.sock", BindMode.READ_WRITE); setWaitStrategy(Wait.forLogMessage(".*tc container created.*", 1)); withLogConsumer(o -> {