Skip to content

Initial shard aware driver #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions tests/integration/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
try:
from ccmlib.dse_cluster import DseCluster
from ccmlib.cluster import Cluster as CCMCluster
from ccmlib.scylla_cluster import ScyllaCluster as CCMSCyllaCluster
from ccmlib.cluster_factory import ClusterFactory as CCMClusterFactory
from ccmlib import common
except ImportError as e:
Expand Down Expand Up @@ -174,8 +175,12 @@ def _get_dse_version_from_cass(cass_version):
try:
cassandra_version = Version(cv_string) # env var is set to test-dse for DDAC
except:
# fallback to MAPPED_CASSANDRA_VERSION
cassandra_version = Version(mcv_string)
try:
# fallback to MAPPED_CASSANDRA_VERSION
cassandra_version = Version(mcv_string)
except:
cassandra_version = Version('3.11.4')
cv_string = '3.11.4'

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cv_string is not initialized outside this branch - looks wrong.


CASSANDRA_VERSION = Version(mcv_string) if mcv_string else cassandra_version
CCM_VERSION = mcv_string if mcv_string else cv_string
Expand All @@ -184,20 +189,27 @@ def _get_dse_version_from_cass(cass_version):
CASSANDRA_DIR = os.getenv('CASSANDRA_DIR', None)

CCM_KWARGS = {}
IS_SCYLLA = False
if DSE_VERSION:
log.info('Using DSE version: %s', DSE_VERSION)
if not CASSANDRA_DIR:
CCM_KWARGS['version'] = DSE_VERSION
if DSE_CRED:
log.info("Using DSE credentials file located at {0}".format(DSE_CRED))
CCM_KWARGS['dse_credentials_file'] = DSE_CRED

elif CASSANDRA_DIR:
log.info("Using Cassandra dir: %s", CASSANDRA_DIR)
CCM_KWARGS['install_dir'] = CASSANDRA_DIR
else:
elif os.environ.get('CASSANDRA_VERSION'):
log.info('Using Cassandra version: %s', CCM_VERSION)
CCM_KWARGS['version'] = CCM_VERSION

elif os.getenv('INSTALL_DIRECTORY'):
CCM_KWARGS['install_dir'] = os.path.join(os.getenv('INSTALL_DIRECTORY'))
IS_SCYLLA = True
elif os.getenv('SCYLLA_VERSION'):
CCM_KWARGS['cassandra_version'] = os.path.join(os.getenv('SCYLLA_VERSION'))
IS_SCYLLA = True

#This changes the default contact_point parameter in Cluster
def set_default_cass_ip():
Expand Down Expand Up @@ -447,10 +459,10 @@ def is_current_cluster(cluster_name, node_counts, workloads):
if CCM_CLUSTER and CCM_CLUSTER.name == cluster_name:
if [len(list(nodes)) for dc, nodes in
groupby(CCM_CLUSTER.nodelist(), lambda n: n.data_center)] == node_counts:
for node in CCM_CLUSTER.nodelist():
if set(node.workloads) != set(workloads):
print("node workloads don't match creating new cluster")
return False
#for node in CCM_CLUSTER.nodelist():
# if set(node.workloads) != set(workloads):
# print("node workloads don't match creating new cluster")
# return False
return True
return False

Expand Down Expand Up @@ -559,8 +571,14 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,

CCM_CLUSTER.set_dse_configuration_options(dse_options)
else:
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
if IS_SCYLLA:
CCM_CLUSTER = CCMSCyllaCluster(path, cluster_name, **ccm_options)
else:
CCM_CLUSTER = CCMCluster(path, cluster_name, **ccm_options)
CCM_CLUSTER.set_configuration_options({'start_native_transport': True})
if IS_SCYLLA:
CCM_CLUSTER.set_configuration_options({'experimental': True})

if Version(cassandra_version) >= Version('2.2'):
CCM_CLUSTER.set_configuration_options({'enable_user_defined_functions': True})
if Version(cassandra_version) >= Version('3.0'):
Expand All @@ -574,9 +592,9 @@ def use_cluster(cluster_name, nodes, ipformat=None, start=True, workloads=None,

# This will enable the Mirroring query handler which will echo our custom payload k,v pairs back

if 'graph' not in workloads:
if PROTOCOL_VERSION >= 4:
jvm_args = [" -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"]
#if 'graph' not in workloads:
# if PROTOCOL_VERSION >= 4:
# jvm_args = [" -Dcassandra.custom_query_handler_class=org.apache.cassandra.cql3.CustomPayloadMirroringQueryHandler"]
if len(workloads) > 0:
for node in CCM_CLUSTER.nodes.values():
node.set_workloads(workloads)
Expand Down