Skip to content
33 changes: 27 additions & 6 deletions redisbench_admin/environments/oss_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import redis

from redisbench_admin.run.cluster import split_primaries_per_db_nodes
from redisbench_admin.utils.utils import (
wait_for_conn,
redis_server_config_module_part,
Expand Down Expand Up @@ -68,21 +69,41 @@ def spin_up_local_redis_cluster(


def setup_redis_cluster_from_conns(redis_conns, shard_count, shard_host, start_port):
meet_cmds = []
logging.info("Setting up cluster. Total {} primaries.".format(len(redis_conns)))
meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port)
meet_cmds = generate_meet_cmds(shard_count, shard_host, start_port, meet_cmds)
status = setup_oss_cluster_from_conns(meet_cmds, redis_conns, shard_count)
if status is True:
for conn in redis_conns:
conn.execute_command("CLUSTER SAVECONFIG")
return status


def generate_meet_cmds(shard_count, shard_host, start_port):
meet_cmds = []
def generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port):
host_port_pairs = []
(
primaries_per_node,
db_private_ips,
_,
) = split_primaries_per_db_nodes(server_private_ips, None, shard_count)
shard_start = 1
for node_n, primaries_this_node in enumerate(primaries_per_node, start=0):
server_private_ip = db_private_ips[node_n]
for master_shard_id in range(
shard_start, shard_start + primaries_this_node + 1
):
shard_port = master_shard_id + cluster_start_port - 1
host_port_pairs.append([server_private_ip, shard_port])

for master_shard_id in range(1, shard_count + 1):
shard_port = master_shard_id + start_port - 1
meet_cmds.append("CLUSTER MEET {} {}".format(shard_host, shard_port))
return host_port_pairs


def generate_meet_cmds(
shard_count, server_private_ips, cluster_start_port, meet_cmds, shard_start=1
):
generate_host_port_pairs(server_private_ips, shard_count, cluster_start_port)
for pair in generate_host_port_pairs:
meet_cmds.append("CLUSTER MEET {} {}".format(pair[0], pair[1]))
return meet_cmds


Expand Down
97 changes: 65 additions & 32 deletions redisbench_admin/run/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ def generate_startup_nodes_array(redis_conns):

# noinspection PyBroadException
def spin_up_redis_cluster_remote_redis(
server_public_ip,
server_private_ip,
server_public_ips,
server_private_ips,
username,
private_key,
remote_module_files,
Expand All @@ -116,42 +116,75 @@ def spin_up_redis_cluster_remote_redis(
redis_7=True,
):
logging.info("Generating the remote redis-server command arguments")

redis_process_commands = []
logfiles = []
logname_prefix = logname[: len(logname) - 4] + "-"
for master_shard_id in range(1, shard_count + 1):
shard_port = master_shard_id + start_port - 1

command, logfile = generate_cluster_redis_server_args(
"redis-server",
dbdir_folder,
remote_module_files,
server_private_ip,
shard_port,
redis_configuration_parameters,
"yes",
modules_configuration_parameters_map,
logname_prefix,
"yes",
redis_7,
)
logging.error(
"Remote primary shard {} command: {}".format(
master_shard_id, " ".join(command)
(
primaries_per_node,
server_private_ips,
server_public_ips,
) = split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count)
shard_start = 1
for node_n, primaries_this_node in enumerate(primaries_per_node, start=0):
server_private_ip = server_private_ips[node_n]
server_public_ip = server_public_ips[node_n]
for master_shard_id in range(
shard_start, shard_start + primaries_this_node + 1
):
shard_port = master_shard_id + start_port - 1

command, logfile = generate_cluster_redis_server_args(
"redis-server",
dbdir_folder,
remote_module_files,
server_private_ip,
shard_port,
redis_configuration_parameters,
"yes",
modules_configuration_parameters_map,
logname_prefix,
"yes",
redis_7,
)
)
logfiles.append(logfile)
redis_process_commands.append(" ".join(command))
res = execute_remote_commands(
server_public_ip, username, private_key, redis_process_commands, ssh_port
)
for pos, res_pos in enumerate(res):
[recv_exit_status, stdout, stderr] = res_pos
if recv_exit_status != 0:
logging.error(
"Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format(
pos, recv_exit_status, stdout, stderr
"Remote primary shard {} command: {}".format(
master_shard_id, " ".join(command)
)
)
logfiles.append(logfile)
redis_process_commands.append(" ".join(command))
res = execute_remote_commands(
server_public_ip, username, private_key, redis_process_commands, ssh_port
)
for pos, res_pos in enumerate(res):
[recv_exit_status, stdout, stderr] = res_pos
if recv_exit_status != 0:
logging.error(
"Remote primary shard {} command returned exit code {}. stdout {}. stderr {}".format(
pos, recv_exit_status, stdout, stderr
)
)
shard_start = shard_start + primaries_this_node

return logfiles


def split_primaries_per_db_nodes(server_private_ips, server_public_ips, shard_count):
if type(server_public_ips) is str:
server_public_ips = [server_public_ips]
if type(server_private_ips) is str:
server_private_ips = [server_private_ips]
db_node_count = len(server_private_ips)
primaries_per_db_node = db_node_count // shard_count
remainder_first_node = db_node_count % shard_count
first_node_primaries = primaries_per_db_node + remainder_first_node
logging.info("DB node {} will have {} primaries".format(1, first_node_primaries))
primaries_per_node = [first_node_primaries]
for node_n, node_id in enumerate(range(2, db_node_count + 1), start=2):
logging.info("Setting")
logging.info(
"DB node {} will have {} primaries".format(node_n, primaries_per_db_node)
)
primaries_per_node.append(primaries_per_db_node)
return primaries_per_node, server_private_ips, server_public_ips
4 changes: 3 additions & 1 deletion redisbench_admin/run_async/terraform.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ def setup_remote_environment(
)
_, _, _ = tf.refresh()
tf_output = tf.output()
logging.error("TF OUTPUT setup_remote_environment: {}".format(tf_output))
server_private_ip = tf_output_or_none(tf_output, "runner_private_ip")
server_public_ip = tf_output_or_none(tf_output, "runner_public_ip")
if server_private_ip is not None or server_public_ip is not None:
Expand Down Expand Up @@ -269,6 +270,7 @@ def terraform_spin_or_reuse_env(
else:
logging.info("Reusing remote setup {}".format(remote_id))
tf = remote_envs[remote_id]
tf_output = tf.output()
(
tf_return_code,
username,
Expand All @@ -277,7 +279,7 @@ def terraform_spin_or_reuse_env(
server_plaintext_port,
client_private_ip,
client_public_ip,
) = retrieve_tf_connection_vars(None, tf)
) = retrieve_tf_connection_vars(None, tf_output)
return (
client_public_ip,
deployment_type,
Expand Down
Loading