Skip to content
82 changes: 75 additions & 7 deletions redisbench_admin/environments/oss_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,22 +67,53 @@ def spin_up_local_redis_cluster(
return redis_processes, redis_conns


def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_port):
def setup_redis_cluster_from_conns(
redis_conns, shard_count, shard_host, start_port, db_nodes=1
):
meet_cmds = []
logging.info("Setting up cluster. Total {} primaries.".format(len(redis_conns)))
meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port)
meet_cmds = generate_meet_cmds(
shard_count, shard_host, start_port, meet_cmds, 1, db_nodes
)
status = setup_oss_cluster_from_conns(meet_cmds, redis_conns, shard_count)
if status is True:
for conn in redis_conns:
conn.execute_command("CLUSTER SAVECONFIG")
return status


def generate_meet_cmds(shard_count, shard_host, start_port):
meet_cmds = []
def generate_host_port_pairs(
server_private_ips, shard_count, cluster_start_port, required_node_count=1
):
host_port_pairs = []
(primaries_per_node, db_private_ips, _,) = split_primaries_per_db_nodes(
server_private_ips, None, shard_count, required_node_count
)
shard_start = 1
for node_n, primaries_this_node in enumerate(primaries_per_node, start=0):
server_private_ip = db_private_ips[node_n]
for master_shard_id in range(
shard_start, shard_start + primaries_this_node + 1
):
shard_port = master_shard_id + cluster_start_port - 1
host_port_pairs.append([server_private_ip, shard_port])

for master_shard_id in range(1, shard_count + 1):
shard_port = master_shard_id + start_port - 1
meet_cmds.append("CLUSTER MEET {} {}".format(shard_host, shard_port))
return host_port_pairs


def generate_meet_cmds(
shard_count,
server_private_ips,
cluster_start_port,
meet_cmds,
shard_start=1,
db_nodes=1,
):
host_port_pairs = generate_host_port_pairs(
server_private_ips, shard_count, cluster_start_port, db_nodes
)
for pair in host_port_pairs:
meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1]))
return meet_cmds


Expand Down Expand Up @@ -199,3 +230,40 @@ def generate_cluster_redis_server_args(

def get_cluster_dbfilename(port):
return "cluster-node-port-{}.rdb".format(port)


def split_primaries_per_db_nodes(
server_private_ips, server_public_ips, shard_count, required_node_count=None
):
if type(server_public_ips) is str:
server_public_ips = [server_public_ips]
if type(server_private_ips) is str:
server_private_ips = [server_private_ips]
db_node_count = len(server_private_ips)
if required_node_count is None:
required_node_count = db_node_count
if db_node_count < required_node_count:
error_msg = "There are less servers ({}) than the required ones for this setup {}. Failing test...".format(
db_node_count, required_node_count
)
logging.error(error_msg)
raise Exception(error_msg)
if server_public_ips is not None:
server_public_ips = server_public_ips[0:required_node_count]
if server_private_ips is not None:
server_private_ips = server_private_ips[0:required_node_count]
primaries_per_db_node = shard_count // required_node_count
remainder_first_node = shard_count % required_node_count
first_node_primaries = primaries_per_db_node + remainder_first_node
logging.info("DB node {} will have {} primaries".format(1, first_node_primaries))
primaries_per_node = [first_node_primaries]
for node_n, node_id in enumerate(range(2, required_node_count + 1), start=2):
logging.info("Setting")
logging.info(
"DB node {} will have {} primaries".format(node_n, primaries_per_db_node)
)
primaries_per_node.append(primaries_per_db_node)
logging.info("Final primaries per node {}".format(primaries_per_node))
logging.info("Final server_private_ips {}".format(server_private_ips))
logging.info("Final server_public_ips {}".format(server_public_ips))
return primaries_per_node, server_private_ips, server_public_ips
3 changes: 3 additions & 0 deletions redisbench_admin/export/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def export_command_logic(args, project_name, project_version):
github_org,
github_repo,
triggering_env,
1,
)
logging.info("Parsed a total of {} metrics".format(len(timeseries_dict.keys())))
logging.info(
Expand Down Expand Up @@ -213,6 +214,7 @@ def export_json_to_timeseries_dict(
tf_github_org,
tf_github_repo,
triggering_env,
n_db_nodes=1,
):
results_dict = {}
for test_name, d in benchmark_file.items():
Expand All @@ -237,6 +239,7 @@ def export_json_to_timeseries_dict(
tf_github_repo,
triggering_env,
False,
n_db_nodes,
)
results_dict[ts_name] = {
"labels": timeserie_tags.copy(),
Expand Down
83 changes: 51 additions & 32 deletions redisbench_admin/run/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@

from redisbench_admin.utils.remote import execute_remote_commands

from redisbench_admin.environments.oss_cluster import generate_cluster_redis_server_args
from redisbench_admin.environments.oss_cluster import (
generate_cluster_redis_server_args,
split_primaries_per_db_nodes,
)
from redisbench_admin.utils.utils import wait_for_conn


Expand Down Expand Up @@ -101,8 +104,8 @@ def generate_startup_nodes_array(redis_conns):

# noinspection PyBroadException
def spin_up_redis_cluster_remote_redis(
server_public_ip,
server_private_ip,
server_public_ips,
server_private_ips,
username,
private_key,
remote_module_files,
Expand All @@ -114,44 +117,60 @@ def spin_up_redis_cluster_remote_redis(
modules_configuration_parameters_map,
logname,
redis_7=True,
required_db_nodes=1,
):
logging.info("Generating the remote redis-server command arguments")

redis_process_commands = []
logfiles = []
logname_prefix = logname[: len(logname) - 4] + "-"
for master_shard_id in range(1, shard_count + 1):
shard_port = master_shard_id + start_port - 1
(
primaries_per_node,
server_private_ips,
server_public_ips,
) = split_primaries_per_db_nodes(
server_private_ips, server_public_ips, shard_count, required_db_nodes
)
shard_start = 1
for node_n, primaries_this_node in enumerate(primaries_per_node, start=0):
server_private_ip = server_private_ips[node_n]
server_public_ip = server_public_ips[node_n]
for master_shard_id in range(
shard_start, shard_start + primaries_this_node + 1
):
shard_port = master_shard_id + start_port - 1

command, logfile = generate_cluster_redis_server_args(
"redis-server",
dbdir_folder,
remote_module_files,
server_private_ip,
shard_port,
redis_configuration_parameters,
"yes",
modules_configuration_parameters_map,
logname_prefix,
"yes",
redis_7,
)
logging.error(
"Remote primary shard {} command: {}".format(
master_shard_id, " ".join(command)
command, logfile = generate_cluster_redis_server_args(
"redis-server",
dbdir_folder,
remote_module_files,
server_private_ip,
shard_port,
redis_configuration_parameters,
"yes",
modules_configuration_parameters_map,
logname_prefix,
"yes",
redis_7,
)
)
logfiles.append(logfile)
redis_process_commands.append(" ".join(command))
res = execute_remote_commands(
server_public_ip, username, private_key, redis_process_commands, ssh_port
)
for pos, res_pos in enumerate(res):
[recv_exit_status, stdout, stderr] = res_pos
if recv_exit_status != 0:
logging.error(
"Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format(
pos, recv_exit_status, stdout, stderr
"Remote primary shard {} command: {}".format(
master_shard_id, " ".join(command)
)
)
logfiles.append(logfile)
redis_process_commands.append(" ".join(command))
res = execute_remote_commands(
server_public_ip, username, private_key, redis_process_commands, ssh_port
)
for pos, res_pos in enumerate(res):
[recv_exit_status, stdout, stderr] = res_pos
if recv_exit_status != 0:
logging.error(
"Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format(
pos, recv_exit_status, stdout, stderr
)
)
shard_start = shard_start + primaries_this_node

return logfiles
32 changes: 26 additions & 6 deletions redisbench_admin/run/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def prepare_benchmark_parameters(
benchmark_config,
benchmark_tool,
server_plaintext_port,
server_private_ip,
server_private_ips,
remote_results_file,
isremote=False,
current_workdir=None,
Expand Down Expand Up @@ -96,7 +96,7 @@ def prepare_benchmark_parameters(
isremote,
remote_results_file,
server_plaintext_port,
server_private_ip,
server_private_ips,
client_public_ip,
username,
private_key,
Expand All @@ -116,7 +116,7 @@ def prepare_benchmark_parameters(
isremote,
remote_results_file,
server_plaintext_port,
server_private_ip,
server_private_ips,
client_public_ip,
username,
private_key,
Expand Down Expand Up @@ -146,13 +146,17 @@ def prepare_benchmark_parameters_specif_tooling(
isremote,
remote_results_file,
server_plaintext_port,
server_private_ip,
server_private_ips,
client_public_ip,
username,
private_key,
client_ssh_port,
redis_password=None,
):
if type(server_private_ips) == list:
server_private_ip = server_private_ips[0]
else:
server_private_ip = server_private_ips
if "redis-benchmark" in benchmark_tool:
command_arr, command_str = prepare_redis_benchmark_command(
benchmark_tool,
Expand Down Expand Up @@ -296,6 +300,7 @@ def common_exporter_logic(
build_variant_name=None,
running_platform=None,
datapoints_timestamp=None,
n_db_nodes=1,
):
per_version_time_series_dict = {}
per_branch_time_series_dict = {}
Expand Down Expand Up @@ -341,6 +346,7 @@ def common_exporter_logic(
build_variant_name,
running_platform,
testcase_metric_context_paths,
n_db_nodes,
)
if tf_github_branch is not None and tf_github_branch != "":
# extract per branch datapoints
Expand All @@ -363,6 +369,7 @@ def common_exporter_logic(
build_variant_name,
running_platform,
testcase_metric_context_paths,
n_db_nodes,
)
else:
logging.error(
Expand Down Expand Up @@ -493,7 +500,11 @@ def extract_test_feasible_setups(
"name": "oss-standalone",
"type": "oss-standalone",
"redis_topology": {"primaries": 1, "replicas": 0},
"resources": {"requests": {"cpu": "1000m"}, "limits": {"cpu": "2000m"}},
"resources": {
"nodes": 1,
"requests": {"cpu": "1000m"},
"limits": {"cpu": "2000m"},
},
}
logging.info(
"Using a backwards compatible 'oss-standalone' setup, with settings: {}".format(
Expand All @@ -508,7 +519,16 @@ def get_setup_type_and_primaries_count(setup_settings):
setup_type = setup_settings["type"]
setup_name = setup_settings["name"]
shard_count = setup_settings["redis_topology"]["primaries"]
return setup_name, setup_type, shard_count

node_count = 1
shard_placement = "dense"
if "redis_topology" in setup_settings:
if "placement" in setup_settings["redis_topology"]:
shard_placement = setup_settings["redis_topology"]["placement"]
if "resources" in setup_settings:
if "nodes" in setup_settings["resources"]:
node_count = setup_settings["resources"]["nodes"]
return setup_name, setup_type, shard_count, shard_placement, node_count


def merge_default_and_config_metrics(
Expand Down
7 changes: 7 additions & 0 deletions redisbench_admin/run/redistimeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def prepare_timeseries_dict(
build_variant_name=None,
running_platform=None,
datapoints_timestamp=None,
n_db_nodes=1,
):
time_series_dict = {}
# check which metrics to extract
Expand Down Expand Up @@ -67,6 +68,7 @@ def prepare_timeseries_dict(
build_variant_name,
running_platform,
datapoints_timestamp,
n_db_nodes,
)
time_series_dict.update(per_version_time_series_dict)
time_series_dict.update(per_branch_time_series_dict)
Expand All @@ -93,6 +95,7 @@ def add_standardized_metric_bybranch(
metadata_tags={},
build_variant_name=None,
running_platform=None,
n_db_nodes=1,
):
if metric_value is not None:
tsname_use_case_duration = get_ts_metric_name(
Expand All @@ -109,6 +112,7 @@ def add_standardized_metric_bybranch(
False,
build_variant_name,
running_platform,
n_db_nodes,
)
labels = get_project_ts_tags(
tf_github_org,
Expand All @@ -119,6 +123,7 @@ def add_standardized_metric_bybranch(
metadata_tags,
build_variant_name,
running_platform,
n_db_nodes,
)
labels["branch"] = tf_github_branch
labels["deployment_name+branch"] = "{} {}".format(
Expand Down Expand Up @@ -238,6 +243,7 @@ def timeseries_test_sucess_flow(
build_variant_name=None,
running_platform=None,
timeseries_dict=None,
n_db_nodes=1,
):
testcase_metric_context_paths = []
version_target_tables = None
Expand Down Expand Up @@ -266,6 +272,7 @@ def timeseries_test_sucess_flow(
build_variant_name,
running_platform,
start_time_ms,
n_db_nodes,
)
if push_results_redistimeseries:
logging.info(
Expand Down
Loading