Skip to content

Persistent Cache Implementation using RocksDB #198

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
Open

Conversation

priyanshu-ctds
Copy link
Collaborator

Types of changes

  • Bug fix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Breaking change (fix or feature that would cause existing functionality to change)
  • I have read the CONTRIBUTING document.
  • My code follows the code style of this project.
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Description

The Pull request implements Persistent Cache with RocksDB.

The implementation uses an in-memory cache mechanism using Caffeine and a Write-through persistent cache based on RocksDB.

Fixes

  1. When there are too many mutation events (more than cache.max.capacity) being "processed" in a very short period of time, this can occur on high loads on the in-memory cache, causing eviction and duplication in the data topic.
  2. When the connector restarts (manual restart operations or crashes) in between processing events arising from a single mutation, the data is wiped out from the in-memory cache but stays in the persistent cache.

What's added/changed

  • A new class named PersistentCache.java which implements the MutationCache.java interface, having the persistent cache implementation.
  • A new configuration named cache.persistent.directory to enable the persistent cache and set the cache directory at the same time.
  • Renamed the old MutationCache.java to InMemory.java, which has a non-persistent in-memory cache implementation.

Checklist:

  • My code follows the style guidelines of this project.
  • I have performed a self-review of my own code.
  • I have commented my code, particularly in hard-to-understand areas.
  • I have made corresponding changes to the documentation.
  • My changes generate no new warnings.
  • Any dependent changes have been merged and published in downstream modules.

@@ -33,7 +33,7 @@ jobs:
needs: build
name: Test
runs-on: ubuntu-latest
timeout-minutes: 120
timeout-minutes: 360

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

360 minutes is too high. Why don't we make it 180.
cc: @srinath-ctds

@@ -24,6 +24,7 @@ kafkaVersion=3.4.0
vavrVersion=0.10.3
testContainersVersion=1.19.1
caffeineVersion=2.8.8
rocksdbVersion=9.4.0

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for Info, Why specifically 9.4.0?

@@ -34,7 +34,7 @@ public class ChaosNetworkContainer<SELF extends ChaosNetworkContainer<SELF>> ext

public ChaosNetworkContainer(String targetContainer, String pause) {
super(PUMBA_IMAGE);
setCommand("--log-level debug netem --tc-image gaiadocker/iproute2 --duration " + pause + " loss --percent 100 " + targetContainer);
setCommand("--log-level debug netem --tc-image ghcr.io/alexei-led/pumba-debian-nettools --duration " + pause + " loss --percent 100 " + targetContainer);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add why this change is made in the PR description.

@@ -66,6 +66,8 @@ public class CassandraSourceConnectorConfig {
public static final String CACHE_MAX_DIGESTS_CONFIG = "cache.max.digest";
public static final String CACHE_MAX_CAPACITY_CONFIG = "cache.max.capacity";
public static final String CACHE_EXPIRE_AFTER_MS_CONFIG = "cache.expire.after.ms";
public static final String CACHE_PERSISTENT_DIRECTORY_CONFIG = "cache.persistent.directory";

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the extra line necessary for formatting checks? If not, please remove


import java.util.List;

public interface MutationCache<K> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interface level description on what this cache will be used for. (de-duplication)

* Keep MD5 digests to deduplicate Cassandra mutations
*/
public class MutationCache<K> {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Override annotation to the methods getting implemented from the interface in both the classes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants