-
Notifications
You must be signed in to change notification settings - Fork 24
Persistent Cache Implementation using RocksDB #198
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
b61d340
10334bc
1f9dae5
e4563bb
758630b
ed979b8
01834cf
73d7510
1422bcd
177a363
aad0eb4
e86d5d0
e240df6
d0f550b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig { | |
public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest"; | ||
public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity"; | ||
public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms"; | ||
public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory"; | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the extra line necessary for formatting checks? If not, please remove |
||
|
||
public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; | ||
public static final String VALUE_CONVERTER_CLASS_CONFIG = "value.converter"; | ||
|
@@ -195,6 +197,13 @@ public class CassandraSourceConnectorConfig { | |
ConfigDef.Importance.HIGH, | ||
"The maximum number of digest per mutation cache entry, with a default set to 3", | ||
"CQL Read cache", 1, ConfigDef.Width.NONE, "CacheMaxDigest") | ||
.define(CACHE_PERSISTENT_DIRECTORY_CONFIG, | ||
ConfigDef.Type.STRING, | ||
null, | ||
ConfigDef.Importance.HIGH, | ||
"The directory where the persistent mutation cache will be stored. Formated as `{cache.persistent.directory}/{source-name}-{instance-id}` " + | ||
"If set, the connector will use RocksDB to store digests, otherwise it will use an in-memory cache.", | ||
"CQL Read cache", 1, ConfigDef.Width.NONE, "CachePersistentDirectory") | ||
.define(CACHE_MAX_CAPACITY_CONFIG, | ||
ConfigDef.Type.LONG, | ||
"32767", | ||
|
@@ -661,6 +670,10 @@ public long getCacheMaxDigests() { | |
return globalConfig.getLong(CACHE_MAX_DIGESTS_CONFIG); | ||
} | ||
|
||
public String getCachePersistentDirectory() { | ||
return globalConfig.getString(CACHE_PERSISTENT_DIRECTORY_CONFIG); | ||
} | ||
|
||
public long getCacheMaxCapacity() { | ||
return globalConfig.getLong(CACHE_MAX_CAPACITY_CONFIG); | ||
} | ||
|
@@ -782,6 +795,7 @@ public String toString() { | |
+ " " + QUERY_BACKOFF_IN_MS_CONFIG + ": %d%n" | ||
+ " " + QUERY_MAX_BACKOFF_IN_SEC_CONFIG + ": %d%n" | ||
+ " " + CACHE_MAX_DIGESTS_CONFIG + ": %d%n" | ||
+ " " + CACHE_PERSISTENT_DIRECTORY_CONFIG + ": %s%n" | ||
+ " " + CACHE_MAX_CAPACITY_CONFIG + ": %d%n" | ||
+ " " + CACHE_EXPIRE_AFTER_MS_CONFIG + ": %d%n" | ||
+ " " + CACHE_ONLY_IF_COORDINATOR_MATCH + ": %s%n" | ||
|
@@ -805,6 +819,7 @@ public String toString() { | |
getQueryBackoffInMs(), | ||
getQueryMaxBackoffInSec(), | ||
getCacheMaxDigests(), | ||
getCachePersistentDirectory(), | ||
getCacheMaxCapacity(), | ||
getCacheExpireAfterMs(), | ||
getCacheOnlyIfCoordinatorMatch(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.datastax.oss.cdc; | ||
package com.datastax.oss.cdc.cache; | ||
|
||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
|
@@ -24,19 +24,15 @@ | |
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
/** | ||
* Keep MD5 digests to deduplicate Cassandra mutations | ||
*/ | ||
public class MutationCache<K> { | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add |
||
public class InMemoryCache<K> implements MutationCache<K> { | ||
Cache<K, List<String>> mutationCache; | ||
|
||
/** | ||
* Max number of cached digest per cached entry. | ||
* Max number of cached digests per cached entry. | ||
*/ | ||
long maxDigests; | ||
|
||
public MutationCache(long maxDigests, long maxCapacity, Duration expireAfter) { | ||
public InMemoryCache(long maxDigests, long maxCapacity, Duration expireAfter) { | ||
this.maxDigests = maxDigests; | ||
mutationCache = Caffeine.newBuilder() | ||
.expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/** | ||
* Copyright DataStax, Inc 2021. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.datastax.oss.cdc.cache; | ||
|
||
import com.github.benmanes.caffeine.cache.stats.CacheStats; | ||
|
||
import java.util.List; | ||
|
||
public interface MutationCache<K> { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Interface level description on what this cache will be used for. (de-duplication) |
||
/** | ||
* Add a mutation MD5 digest to the cache for the given mutation key. | ||
* @param mutationKey the key for the mutation, typically a partition key or a unique identifier | ||
* @param md5Digest the MD5 digest of the mutation to be added | ||
* @return a list of MD5 digests for the given mutation key, which may include the newly added digest | ||
*/ | ||
List<String> addMutationMd5(K mutationKey, String md5Digest); | ||
|
||
/** | ||
* Retrieve the list of MD5 digests for the given mutation key. | ||
* @param mutationKey the key for the mutation | ||
* @return a list of MD5 digests associated with the mutation key, or an empty list if none exist | ||
*/ | ||
List<String> getMutationCRCs(K mutationKey); | ||
|
||
/** | ||
* Check if a mutation with the given key and MD5 digest has already been processed. | ||
* @param mutationKey the key for the mutation | ||
* @param md5Digest the MD5 digest of the mutation | ||
* @return true if the mutation has been processed, false otherwise | ||
*/ | ||
boolean isMutationProcessed(K mutationKey, String md5Digest); | ||
|
||
/** | ||
* Gives the current statistics of the cache, such as hit rate, miss rate, and size. | ||
* <a href="https://github.com/ben-manes/caffeine/wiki/Statistics">Caffeine Statistics wiki</a> | ||
*/ | ||
CacheStats stats(); | ||
|
||
/** | ||
* Gives the estimated size of the cache. | ||
*/ | ||
long estimatedSize(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
/** | ||
* Copyright DataStax, Inc 2021. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package com.datastax.oss.cdc.cache; | ||
|
||
import com.github.benmanes.caffeine.cache.Cache; | ||
import com.github.benmanes.caffeine.cache.Caffeine; | ||
import com.github.benmanes.caffeine.cache.RemovalCause; | ||
import com.github.benmanes.caffeine.cache.stats.CacheStats; | ||
import org.rocksdb.Options; | ||
import org.rocksdb.RocksDBException; | ||
import org.rocksdb.TtlDB; | ||
|
||
import java.time.Duration; | ||
import java.util.ArrayList; | ||
import java.util.List; | ||
import java.util.concurrent.TimeUnit; | ||
import java.util.function.Function; | ||
|
||
public class PersistentCache<K> implements MutationCache<K> { | ||
/** | ||
* The mutation cache | ||
*/ | ||
private final Cache<K, List<String>> mutationCache; | ||
|
||
private final TtlDB rocksDB; | ||
/** | ||
* Max number of cached digests per cached entry. | ||
*/ | ||
long maxDigests; | ||
|
||
private final Function<K, byte[]> keySerializer; | ||
|
||
static { | ||
TtlDB.loadLibrary(); | ||
} | ||
|
||
public PersistentCache(long maxDigests, long maxCapacity, Duration expireAfter, String dbPath, Function<K, byte[]> keySerializer) throws RocksDBException { | ||
this.maxDigests = maxDigests; | ||
this.keySerializer = keySerializer; | ||
|
||
Options options = new Options().setCreateIfMissing(true); | ||
this.rocksDB = TtlDB.open(options, dbPath, (int) expireAfter.getSeconds(), false); | ||
|
||
this.mutationCache = Caffeine.newBuilder() | ||
.expireAfterWrite(expireAfter.getSeconds(), TimeUnit.SECONDS) | ||
.maximumSize(maxCapacity) | ||
.recordStats() | ||
.removalListener((K key, List<String> value, RemovalCause cause) -> { | ||
try { | ||
// If the removal cause is not SIZE, we delete the key from RocksDB | ||
// This is to avoid deleting the key when it is removed due to the size limit | ||
if (cause != RemovalCause.SIZE ) { | ||
rocksDB.delete(this.keySerializer.apply(key)); | ||
} | ||
} catch (RocksDBException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}) | ||
.build(); | ||
} | ||
|
||
|
||
private List<String> valueDeserializer(byte[] data){ | ||
return data == null || data.length == 0 ? null : List.of(new String(data).split(",")); | ||
} | ||
|
||
private byte[] valueSerializer(List<String> data){ | ||
return data.stream() | ||
.reduce((s1, s2) -> s1 + "," + s2) | ||
.orElse("") | ||
.getBytes(); | ||
} | ||
|
||
public List<String> getMutationCRCs(K mutationKey) { | ||
return mutationCache.get(mutationKey, k -> { | ||
try { | ||
return valueDeserializer(rocksDB.get(this.keySerializer.apply(k))); | ||
} catch (RocksDBException e) { | ||
throw new RuntimeException(e); | ||
} | ||
}); | ||
} | ||
|
||
public void putMutationCRCs(K key, List<String> value) { | ||
mutationCache.asMap().compute(key, (k, v) -> { | ||
try { | ||
rocksDB.put(keySerializer.apply(k), valueSerializer(value)); | ||
} catch (RocksDBException e) { | ||
throw new RuntimeException(e); | ||
} | ||
return value; | ||
}); | ||
} | ||
|
||
public List<String> addMutationMd5(K mutationKey, String md5Digest) { | ||
List<String> crcs = getMutationCRCs(mutationKey); | ||
if(crcs == null) { | ||
crcs = new ArrayList<>(1); | ||
crcs.add(md5Digest); | ||
} else { | ||
if (!crcs.contains(md5Digest)) { | ||
if (crcs.size() >= maxDigests) { | ||
// remove the oldest digest | ||
crcs.remove(0); | ||
} | ||
crcs.add(md5Digest); | ||
} | ||
} | ||
putMutationCRCs(mutationKey, crcs); | ||
return crcs; | ||
} | ||
|
||
public boolean isMutationProcessed(K mutationKey, String md5Digest) { | ||
List<String> digests = getMutationCRCs(mutationKey); | ||
return digests != null && digests.contains(md5Digest); | ||
} | ||
|
||
public CacheStats stats() { | ||
return mutationCache.stats(); | ||
} | ||
|
||
public long estimatedSize() { | ||
return mutationCache.estimatedSize(); | ||
} | ||
|
||
public void close() { | ||
rocksDB.close(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
360 minutes is too high. Why don't we make it 180.
cc: @srinath-ctds