diff --git a/build.gradle b/build.gradle index 266db12355f60..d51489ab29f96 100644 --- a/build.gradle +++ b/build.gradle @@ -329,7 +329,11 @@ if (repo != null) { 'docker/docker_official_images/.gitkeep', 'sonar-project.properties', 'vagrant/cloudwatch-agent-setup.sh', - 'vagrant/cloudwatch-agent-configuration.json' + 'vagrant/cloudwatch-agent-configuration.json', + 'vagrant/aws-packer.json', + 'vagrant/worker-ami.json', + 'ssh_checkers/aws_checker.py', + 'terraform/*' ]) } } else { diff --git a/ssh_checkers/aws_checker.py b/ssh_checkers/aws_checker.py new file mode 100644 index 0000000000000..7486a27e43ac9 --- /dev/null +++ b/ssh_checkers/aws_checker.py @@ -0,0 +1,36 @@ +import subprocess +import logging +from ducktape.utils.util import wait_until + +DEFAULT_AWS_COMMAND_TIMEOUT_SECOND = 360 + + +def aws_ssh_checker(error, remote_account): + """This function use to check if node is still and running or termination. + Also semaphore agent able to do ssh + (this should be just one node if the node is still up, and zero + if its terminated) + Args: + error (Exception): the ssh exception passed from ducktapes remote account + remote_account (RemoteAccount): ducktapes remote account object experiencing the + ssh failure + Raises: + Exception: when the aws node is missing + """ + remote_account.logger.log(logging.INFO, 'running aws_checker:') + + cmd = ['aws','ec2','describe-instances','--filters', + '"Name=private-ip-address,Values={}"'.format(remote_account.externally_routable_ip), + '--query', '"Reservations[].Instances[].Tags[?Key==\'Name\']|[0][0].Value"', + '--region', 'us-west-2'] + remote_account.logger.log(logging.INFO, 'running command {}'.format(cmd)) + result = subprocess.Popen(" ".join(cmd), + stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, executable='/bin/bash') + wait_until(result.poll, DEFAULT_AWS_COMMAND_TIMEOUT_SECOND) + output = [line.decode() for line in iter(result.stdout.readline, b'') if line.strip() and line.strip() != b'null'] + if result.returncode != 0: + remote_account.logger.log(logging.ERROR, 'aws command "{}" failed:'.format(" ".join(cmd))) + remote_account.logger.log(logging.ERROR, result.stdout.read().decode()) + remote_account.logger.log(logging.ERROR, result.stderr.read().decode()) + if not output: + raise Exception("AWS host no longer exists {}".format(remote_account.externally_routable_ip)) \ No newline at end of file diff --git a/terraform/cloudinit.yml b/terraform/cloudinit.yml new file mode 100644 index 0000000000000..a62cdf659a02c --- /dev/null +++ b/terraform/cloudinit.yml @@ -0,0 +1,20 @@ +# Copyright 2024 Confluent Inc. +groups: + - ce-kafka + +users: + - default + - name: terraform + gecos: terraform + shell: /bin/bash + primary_group: muckrake + sudo: ALL=(ALL) NOPASSWD:ALL + groups: users, admin, sudo, ubuntu, adm, dialout, cdrom, floppy, audio, dip, video, plugdev, netdev, lxd + lock_passwd: false + ssh_authorized_keys: + - ${ public_key } + +runcmd: + - bash /vagrant/vagrant/base.sh + - python3.11 -m pip install -U pip + - python3.11 -m pip install -r /vagrant/resources/requirements.txt \ No newline at end of file diff --git a/terraform/kafka_runner/package_ami.py b/terraform/kafka_runner/package_ami.py new file mode 100644 index 0000000000000..8e0ef20377b2f --- /dev/null +++ b/terraform/kafka_runner/package_ami.py @@ -0,0 +1,197 @@ +# Copyright 2024 Confluent Inc. +import glob +import hashlib +import logging +import os +import re +import time +from datetime import datetime, timedelta +import boto3 + +from terraform.kafka_runner.util import HASH_ALGORITHM, AWS_REGION, AWS_ACCOUNT_ID, AMI_NAME_MAX_LENGTH, \ + BASE_KAFKA_DIR, run, WORKER_AMI_JSON, WORKER_AMI_NAME, INSTANCE_TYPE, IPV6_SUBNET_ID,IPV4_SUBNET_ID, AMI, AWS_PACKER_JSON, VPC_ID, KAFKA_BRANCH, ALLOW_ALL_SECURITY_GROUP_ID + +GET_VERSION_FROM_PACKAGES_RE = re.compile('.*confluent-packages-(?P\d+)\.(?P\d+)\.(?P\d+).*') + +def hash_files(file_list, **kwargs): + """ + Creates a hash based on the contents of the files. + Arguments: + file_list: a list of file paths + **kwargs: additional key-value pairs to include in the hash + """ + hasher = hashlib.new(HASH_ALGORITHM) + sorted_files = sorted(file_list) + + hasher.update(str(sorted(kwargs.items())).encode()) + + for f in sorted_files: + with open(f, "rb") as fd: + hasher.update(fd.read()) + return hasher.hexdigest() + +def ensure_trailing_separator(dirname): + """Ensure trailing separator on the directory name + E.g:: + my/path -> my/path/ # Add trailing '/' + my/path/ -> my/path/ # No change + """ + if not dirname.endswith(os.path.sep): + dirname += os.path.sep + return dirname + + +def compute_packer_hash(**extras): + """Compute a hash which depends on the contents and directory layout of Packer files. + Since Packer files are changed infrequently, hopefully this provides a reasonable way to cache and reuse a + pre-created ami. + Arguments: + **extras: named user arguments to pass to packer + """ + previous_wd = os.getcwd() + os.chdir(BASE_KAFKA_DIR) + + dirname = os.path.dirname(AWS_PACKER_JSON) + + def with_extension(extension): + return glob.glob(os.path.join(dirname, '*.%s' % extension)) + + file_list = with_extension('sh') + with_extension('json') + [ + os.path.join(BASE_KAFKA_DIR, "resources/requirements.txt")] + + logging.info('Files considered for packer_hash: %s', ', '.join(file_list)) + logging.info('Extras considered for packer_hash: %s', extras) + + hash_val = hash_files(file_list, **extras) + os.chdir(previous_wd) + + return hash_val + +def image_from(name=None, image_id=None, region_name=AWS_REGION): + """Given the image name or id, return a boto3 object corresponding to the image, or None if no such image exists.""" + if bool(name) == bool(image_id): + raise ValueError('image_from requires either name or image_id') + ec2 = boto3.resource("ec2", region_name=region_name) + filters = [] + if image_id: + filters.append({'Name': 'image-id', 'Values': [image_id]}) + if name: + filters.append({'Name': 'name', 'Values': [name]}) + return next(iter(ec2.images.filter(Owners=[AWS_ACCOUNT_ID], Filters=filters)), None) + +def create_ami(image_name, source_ami=AMI, region_name=AWS_REGION, volume_size=60, + packer_json=AWS_PACKER_JSON, instance_type=INSTANCE_TYPE, **extras): + """Create a new ami using packer!""" + previons_wd = os.getcwd() + os.chdir(BASE_KAFKA_DIR) + extras.setdefault('linux_distro', os.environ.get('LINUX_DISTRO', 'ubuntu')) + + cmd = 'packer build' + cmd += ' -var "region=%s"' % region_name + cmd += ' -var "source_ami=%s"' % source_ami + cmd += ' -var "ami_name=%s"' % image_name + cmd += ' -var "volume_size=%s"' % volume_size + cmd += ' -var "instance_type=%s"' % instance_type + cmd += ' -var "vpc_id=%s"' % VPC_ID + cmd += ' -var "subnet_id=%s"' % IPV4_SUBNET_ID + cmd += ' -var "security_group_id=%s"' % ALLOW_ALL_SECURITY_GROUP_ID + cmd += ''.join([' -var "{}={}"'.format(*v) for v in extras.items() if v[1] is not None]) + cmd += ' ' + packer_json + + logging.info("Creating a new image with name %s in region %s..." % (image_name, region_name)) + logging.info("This may take 10-20 minutes...") + run(cmd, allow_fail=False, print_output=True) + + image = image_from(name=image_name, region_name=region_name) + assert image is not None, "Expected aws image %s to exist after running packer!" % image_name + os.chdir(previons_wd) + + logging.info('Successfully created new image with id = %s', image.image_id) + + return image + +def wait_ready(image_id, region_name=AWS_REGION, timeout_sec=1200): + """Block until the given image_id is ready. Raise exception if no image with the given id.""" + + logging.info("Waiting for %s to become available..." % image_id) + start = time.time() + backoff = 5 + counter = 0 + while time.time() - start <= timeout_sec: + image = image_from(image_id=image_id, region_name=region_name) + assert image is not None, "Expected an image to exist with id %s, but it doesn't." % image_id + + if image.state.lower() == "available": + logging.info("Image %s is available." % image_id) + break + time.sleep(backoff) + counter += 1 + + # progress bar, indicate + for each minute elapsed + if counter % (60 / backoff) == 0: + print("+") + else: + print("-") + +def package_base_ami(instance_type=INSTANCE_TYPE, source_ami=AMI, ssh_account=None, volume_size=60, **hash_extras): + """ + :param instance_type: instance to use create ami + :param source_ami: base ami to spin up the instance + :param ssh_account: which account to use ssh into the instance + :param volume_size: size of the instance + :param hash_extras: other parameters + :return: + This function creates base ami for the workers. In this base ami we download common modules. + Using base ami we create target ami + """ + if ssh_account is None: + ssh_account = "ubuntu" + packer_hash = compute_packer_hash(source_ami=source_ami, **hash_extras) + logging.info("packer_hash: %s" % packer_hash) + + ami_name = "kafka-%s-%s" % (packer_hash,KAFKA_BRANCH) + ami_name = ami_name[:AMI_NAME_MAX_LENGTH] # Truncate to maximum length + logging.info("Base AMI name: %s (created from %s)" % (ami_name, source_ami)) + + # Check for cached image, and create if not present + image = image_from(name=ami_name) + if image: + logging.info("Found image matching %s: %s" % (ami_name, image)) + # Corner case: wait until image is ready + wait_ready(image.image_id) + else: + logging.info("No image matching %s." % ami_name) + image = create_ami(ami_name, instance_type=instance_type, source_ami=source_ami, ssh_account=ssh_account, volume_size=volume_size, packer_json=AWS_PACKER_JSON, **hash_extras) + + return image.image_id + +def package_worker_ami(install_type, volume_size, source_ami=AMI, + instance_type=INSTANCE_TYPE, ssh_account=None, **extras): + """ Create a worker AMI with Confluent Platform """ + if ssh_account is None: + ssh_account = "ubuntu" + base_ami = package_base_ami(instance_type=instance_type, source_ami=source_ami, ssh_account=ssh_account, + volume_size=volume_size, **extras) + + logging.info("Worker AMI name: %s" % WORKER_AMI_NAME) + image = create_ami(WORKER_AMI_NAME, source_ami=base_ami, packer_json= WORKER_AMI_JSON, install_type=install_type, + ssh_account=ssh_account, volume_size=volume_size, instance_type=instance_type, **extras) + delete_old_worker_amis() + return image.image_id + +def delete_old_worker_amis(): + """ Delete worker AMIs older than 30 days """ + logging.info('Checking for old worker AMIs to delete...') + + ec2 = boto3.resource("ec2", region_name=AWS_REGION) + for image in ec2.images.filter(Owners=[AWS_ACCOUNT_ID], Filters=[{'Name': 'tag:Service', 'Values': ['ce-kafka']}, + {'Name': 'tag:CreatedBy', 'Values': ['kafka-system-test']}]): + created_date = datetime.strptime(image.creation_date, "%Y-%m-%dT%H:%M:%S.000Z") + + if datetime.utcnow() - created_date > timedelta(days=30): + snapshot_ids = [s['Ebs']['SnapshotId'] for s in image.block_device_mappings if 'Ebs' in s] + logging.info('Deleting worker AMI {} with snapshot(s): {}'.format(image.id, snapshot_ids)) + + image.deregister() + for snapshot in ec2.snapshots.filter(SnapshotIds=snapshot_ids): + snapshot.delete() \ No newline at end of file diff --git a/terraform/kafka_runner/run-test.py b/terraform/kafka_runner/run-test.py new file mode 100644 index 0000000000000..1c3e75b1c6ee6 --- /dev/null +++ b/terraform/kafka_runner/run-test.py @@ -0,0 +1,327 @@ +# Copyright 2024 Confluent Inc. +import json +import logging +import os +import shutil +import sys +import subprocess + +import time +from datetime import datetime, timedelta, timezone +from functools import partial +from traceback import format_exc +from jinja2 import Environment, FileSystemLoader +from ducktape.utils.util import wait_until +from paramiko.ssh_exception import NoValidConnectionsError + +from terraform.kafka_runner.util import run, SOURCE_INSTALL,ssh +from terraform.kafka_runner.package_ami import package_worker_ami +from terraform.kafka_runner.util import INSTANCE_TYPE, ABS_KAFKA_DIR, JOB_ID, AWS_REGION, AWS_ACCOUNT_ID, AMI, IPV4_SUBNET_ID,IPV6_SUBNET_ID, IS_IPV6_RUN +from terraform.kafka_runner.util import setup_virtualenv, parse_args, parse_bool +from ssh_checkers.aws_checker import aws_ssh_checker + +class KafkaRunner: + kafka_dir = ABS_KAFKA_DIR + cluster_file_name = f"{kafka_dir}/tf-cluster.json" + tf_variables_file = f"{kafka_dir}/tf-vars.tfvars.json" + + def __init__(self, args, venv_dir): + self.args = args + self._terraform_outputs = None + self.venv_dir = venv_dir + self.public_key = os.environ['MUCKRAKE_SECRET'] + + def _run_creds(self, cmd, *args, **kwargs): + """ Assuming semaphore-access role to facilitate semaphore node to maintain the ec2 lifecycle and perform necessary action """ + return run(f". assume-iam-role arn:aws:iam::419470726136:role/semaphore-access> /dev/null; cd {self.kafka_dir}; {cmd}", *args, **kwargs) + + def terraform_outputs(self): + """Returning the output of terraform command""" + if not self._terraform_outputs: + raw_json = self._run_creds(f"terraform output -json", print_output=True, allow_fail=False, + return_stdout=True, cwd=self.kafka_dir) + self._terraform_outputs = json.loads(raw_json) + return self._terraform_outputs + + def update_hosts(self): + """Function to update host file of workers nodes""" + cmd = "sudo bash -c 'echo \"" + terraform_outputs_dict = self.terraform_outputs() + worker_names = terraform_outputs_dict['worker-names']["value"] + worker_ips = [ip[0] for ip in terraform_outputs_dict['worker-ipv6s']["value"]] if IS_IPV6_RUN else terraform_outputs_dict['worker-private-ips']["value"] + worker_instance_id= terraform_outputs_dict['worker-instance-ids']["value"] + for hostname, ip in zip(worker_names, worker_ips): + cmd += f"{ip} {hostname} \n" + cmd += "\" >> /etc/hosts'" + run_cmd = partial(ssh, command=cmd) + + for host in worker_ips: + run_cmd(host) + run(cmd, print_output=True, allow_fail=False) + + def generate_clusterfile(self): + """Generating cluster file containing information about worker nodes""" + logging.info("generating cluster file") + terraform_outputs_dict = self.terraform_outputs() + worker_names = terraform_outputs_dict['worker-names']["value"] + worker_ips = [ip[0] for ip in terraform_outputs_dict['worker-ipv6s']["value"]] if IS_IPV6_RUN else terraform_outputs_dict['worker-private-ips']["value"] + nodes = [] + for hostname, ip in zip(worker_names, worker_ips): + routable_ip = f"[{ip}]" if IS_IPV6_RUN else ip + nodes.append({ + "externally_routable_ip": routable_ip, + "ssh_config": { + "host": hostname, + "hostname": hostname, + "port": 22, + "user": "ubuntu", + "password": None, + "identityfile": os.path.join(os.environ.get('WORKSPACE'),'semaphore-muckrake.pem') + } + }) + with open(self.cluster_file_name, 'w') as f: + json.dump({"nodes": nodes}, f) + + def wait_until_ready(self, timeoutSecond=180, polltime=2): + """ + Function to wait until worker node is not ready + """ + terraform_outputs_dict = self.terraform_outputs() + worker_ips = [ip[0] for ip in terraform_outputs_dict['worker-ipv6s']["value"]] if IS_IPV6_RUN else terraform_outputs_dict['worker-private-ips']["value"] + worker_instance_id= terraform_outputs_dict['worker-instance-ids']["value"] + start = time.time() + # print all instance ids + for instance_id in worker_instance_id: + logging.info(f"instance_id: {instance_id}") + + def check_node_boot_finished(host): + code, _, _ = ssh(host, "[ -f /var/lib/cloud/instance/boot-finished ]") + return 0==code + + def check_for_ssh(host): + try: + ssh(host, "true") + return True + except NoValidConnectionsError as e: + logging.error(f"{e}") + return False + + def poll_all_nodes(): + # check and see if cloud init is done on all nodes + unfinished_nodes = [ip for ip in worker_ips if not check_node_boot_finished(ip)] + # unfinished_nodes = [instance_id for instance_id in worker_instance_id if not check_node_boot_finished(instance_id)] + result = len(unfinished_nodes) == 0 + if not result: + time_diff = time.time() - start + logging.warning(f"{time_diff}: still waiting for {unfinished_nodes}") + return result + wait_until(lambda: all(check_for_ssh(ip) for ip in worker_ips), + timeoutSecond, polltime, err_msg="ssh didn't become available") + self.update_hosts() + logging.warning("updated hosts file") + wait_until(poll_all_nodes, 15 * 60, 2, err_msg="didn't finish cloudinit") + logging.info("cloudinit finished on all nodes") + + def tags_to_aws_format(tags): + """ + :return: key value pairs of tags for aws resources + """ + kv_format = [f"Key={k},Value={v}" for k,v in tags.items()] + return f"{' '.join(kv_format)}" + + def generate_tf_file(self): + """ + Generate terraform file dynamically for resource creation + :return: + """ + logging.info("creating terraform file") + env = Environment(loader=FileSystemLoader(f'{self.kafka_dir}')) + template = env.get_template('main.tf') + # this spot instance expiration time. This is a failsafe, as terraform + # should cancel the request on a terraform destroy, which occurs on a provission + # failure + spot_instance_time = datetime.now(timezone.utc) + timedelta(hours=2) + spot_instance_time = spot_instance_time.isoformat() + with open(f'{self.kafka_dir}/main.tf', 'w') as f: + f.write(template.render()) + logging.info("terraform file created") + + def setup_tf_variables(self, ami): + """ + param ami: Ami using for spin up worker nodes + :return: + Function of set the value of all terraform variables + """ + + num_workers = self.args.num_workers + vars = { + "instance_type": self.args.worker_instance_type, + "worker_ami": ami, + "num_workers": num_workers, + "num_workers_spot": 0, + "deployment": self.args.linux_distro, + "public_key": self.public_key, + "spot_price": self.args.spot_price, + "build_url": self.args.build_url, + "subnet_id": IPV6_SUBNET_ID if IS_IPV6_RUN else IPV4_SUBNET_ID, + "ipv6_address_count": 1 if IS_IPV6_RUN else 0, + "job_id": JOB_ID + } + with open(self.tf_variables_file, 'w') as f: + json.dump(vars, f) + + def provission_terraform(self): + """ + Resource creation using terraform + :return: + """ + logging.info("provisioning tf file") + self._run_creds(f"terraform --version", print_output=True, allow_fail=False) + self._run_creds(f"terraform init", print_output=True, allow_fail=False, venv=False, cwd=self.kafka_dir) + self._run_creds(f"terraform apply -auto-approve -var-file={self.tf_variables_file}", print_output=True, allow_fail=False, + venv=False, cwd=self.kafka_dir) + + def destroy_terraform(self, allow_fail=False): + """ + Destroy all resources that are created by terraform + """ + self._run_creds(f"terraform init", print_output=True, allow_fail=True, cwd=self.kafka_dir) + self._run_creds(f"terraform destroy -auto-approve -var-file={self.tf_variables_file}", print_output=True, + allow_fail=allow_fail, cwd=self.kafka_dir) + + def install_custom_ducktape_branch(self, ducktape_branch): + """Override the default ducktape installation with the given branch""" + for i in range(10): + try: + run(f"{self.args.python} -m pip uninstall -y ducktape", + print_output=True, venv_dir=self.venv_dir, venv=True, allow_fail=False, cwd=self.muckrake_dir) + except Exception as e: + if "Command failed" in str(e): + break + + run(f"if [ ! -d ducktape ]; then git clone https://github.com/confluentinc/ducktape.git; fi", + print_output=True, venv_dir=self.venv_dir, venv=True, allow_fail=False, cwd=self.kafka_dir) + run(f"git checkout {ducktape_branch} && {self.args.python} setup.py develop", + print_output=True, venv_dir=self.venv_dir, venv=True, allow_fail=False, cwd=f"{self.kafka_dir}/ducktape") + +def main(): + logging.basicConfig(format='[%(levelname)s:%(asctime)s]: %(message)s', level=logging.INFO) + args, ducktape_args = parse_args() + kafka_dir = ABS_KAFKA_DIR + if args.new_globals is not None: + global_val = json.loads(args.new_globals) + file_data = open(f'{kafka_dir}/resources/{args.install_type}-globals.json', 'r') + globals_dict = json.loads(file_data.read()) + file_data.close() + for key, value in global_val.items(): + globals_dict[key] = value + with open(f'{kafka_dir}/resources/{args.install_type}-globals.json', "w") as outfile: + json.dump(globals_dict, outfile, indent = 4) + logging.info(f"New globals passed - {globals_dict}") + kafka_dir = ABS_KAFKA_DIR + venv_dir = os.path.join(os.environ.get('WORKSPACE'), "venv") + + # setup virtualenv directory + if os.path.exists(venv_dir): + shutil.rmtree(venv_dir, ignore_errors=True) + setup_virtualenv(venv_dir, args) + + # reset directory containing source code for CP components + projects_dir = os.path.join(kafka_dir, "projects") + if os.path.exists(projects_dir): + shutil.rmtree(projects_dir) + + test_runner = KafkaRunner(args, venv_dir) + run(f"{args.python} -m pip install -U -r resources/requirements.txt", + print_output=True, venv=True, allow_fail=False, cwd=kafka_dir) + run(f"{args.python} -m pip install -U -r resources/requirement_override.txt", + print_output=True, venv=True, allow_fail=True, cwd=kafka_dir) + exit_status = 0 + + try: + # Check that the test path is valid before doing expensive cluster bringup + # We still do this after the build step since that's how we get kafka, and our ducktape dependency + test_path = " ".join(args.test_path) + cmd = f"{args.python} `which ducktape` {test_path} --collect-only" + run(cmd, venv=True, venv_dir=venv_dir, print_output=True, allow_fail=False) + if args.collect_only: + logging.info("--collect-only flag used; exiting without running tests") + return + + # Skip build if we are re-using an older image + if args.aws: + ssh_account = 'ubuntu' + base_ami = AMI + if args.existing_ami is None: + logging.info(f"linux distro input: {args.linux_distro}") + logging.info(f"base_ami: {base_ami}") + image_id = package_worker_ami( + args.install_type, + args.worker_volume_size, + source_ami=base_ami, + linux_distro=args.linux_distro, + instance_type=args.worker_instance_type, + ssh_account=ssh_account, + instance_name=args.instance_name, + jdk_version=args.jdk_version, + jdk_arch=args.jdk_arch + ) + else: + logging.info(f"using existing ami: {args.existing_ami}") + image_id = args.existing_ami + + # Take down any existing to bring up cluster from scratch + logging.info("calling generate_tf_file") + logging.info(f"ami: {image_id}") + test_runner.generate_tf_file() + test_runner.setup_tf_variables(image_id) + test_runner.destroy_terraform(allow_fail=True) + cluster_file_name = f"{kafka_dir}/tf-cluster.json" + + if args.aws: + # re-source vagrant credentials before bringing up cluster + run(f". assume-iam-role arn:aws:iam::419470726136:role/semaphore-access; cd {kafka_dir};", + print_output=True, allow_fail=False) + test_runner.provission_terraform() + logging.info("calling function to generate cluster file") + test_runner.generate_clusterfile() + else: + run(f"cd {kafka_dir};", + print_output=True, allow_fail=False) + test_runner.provission_terraform() + test_runner.generate_clusterfile() + if logging.getLogger().isEnabledFor(logging.DEBUG): + with open(f"{kafka_dir}/tf-cluster.json", "r") as f: + logging.debug(f'starting with cluster: {f.read()}') + + test_runner.wait_until_ready() + + source= os.path.join(os.environ.get('WORKSPACE'),'kafka') + + # Run the tests! + cmd = f"{args.python} `which ducktape` {test_path} " \ + f"--results-root {args.results_root} " \ + f"--default-num-nodes 1 " \ + f"--max-parallel=1000 " \ + f"--cluster ducktape.cluster.json.JsonCluster " \ + f"--cluster-file {cluster_file_name} " \ + f"--compress " \ + f"{' '.join(ducktape_args)}" + exit_status = run(cmd, venv=True, venv_dir=venv_dir, print_output=True) + logging.info("Tests " + ("passed" if exit_status == 0 else "failed")) + + except Exception as e: + logging.warning(e) + logging.warning(format_exc()) + exit_status = 1 + finally: + # Cleanup and teardown all workers + if not args.collect_only and args.cleanup: + logging.info("bringing down terraform cluster...") + test_runner.destroy_terraform() + elif not args.cleanup: + logging.warning("--cleanup is false, leaving nodes alive") + sys.exit(exit_status) + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/terraform/kafka_runner/util.py b/terraform/kafka_runner/util.py new file mode 100644 index 0000000000000..e9426fbfc7372 --- /dev/null +++ b/terraform/kafka_runner/util.py @@ -0,0 +1,181 @@ +import boto3 +import logging +import os +import time +import subprocess +from io import StringIO +import requests +import argparse +import sys + +from paramiko import SSHClient +from ducktape.cluster.remoteaccount import IgnoreMissingHostKeyPolicy +from botocore.exceptions import ClientError + +HASH_ALGORITHM = "sha224" +BASE_KAFKA_DIR = os.path.join(os.path.dirname(__file__), "..") +ABS_KAFKA_DIR = os.path.abspath(BASE_KAFKA_DIR) +WORKER_AMI_JSON = '../vagrant/worker-ami.json' +AWS_PACKER_JSON = '../vagrant/aws-packer.json' + +# List files in the directory +workspace= os.environ.get('WORKSPACE') +SOURCE_INSTALL = "source" +logging.getLogger("paramiko").setLevel(logging.WARNING) + +AWS_REGION = "us-west-2" +AWS_ACCOUNT_ID = boto3.client('sts', region_name=AWS_REGION).get_caller_identity().get('Account') +AWS_IAM = boto3.client('sts', region_name=AWS_REGION).get_caller_identity().get('Arn').split("/")[1] + +AMI_NAME_MAX_LENGTH = 128 +WORKER_AMI_NAME = 'kafka-{}'.format( # E.g. BUILD_TAG = semaphore-system-test-kafka-master-452 + os.environ['SEMAPHORE_JOB_ID'] if 'SEMAPHORE_JOB_ID' in os.environ else str(int(time.time()))) + +AMI= os.environ.get('AMI_ID') +INSTANCE_TYPE= os.environ.get('INSTANCE_TYPE') +IPV4_SUBNET_ID= "subnet-0429253329fde0351" +IPV6_SUBNET_ID= "subnet-00c4999d6841fd454" + +VPC_NAME= "system-test-ducktape-infra" +VPC_ID= "vpc-00acc0e3d6688724b" +BRANCH_NAME = os.environ.get('BRANCH_NAME') + +ALLOW_ALL_SECURITY_GROUP_ID = "sg-0344366211836c8fe" + +KAFKA_BRANCH = os.environ.get('KAFKA_BRANCH') +JOB_ID = os.environ.get('SEMAPHORE_JOB_ID') +IS_IPV6 = os.environ.get('IS_IPV6',"False") +IS_IPV6_RUN = IS_IPV6=="True" +def ssh(host, command, port=22, username='ubuntu', password=None, key_file = os.path.join(os.environ.get('WORKSPACE'),'semaphore-muckrake.pem')): + """ + :param host: IP address of worker node + :param command: command that run while doing ssh + :param username: username of ssh user + :param key_file: pem file path that require for ssh + :return: success of ssh or not + """ + os.chmod(key_file, 0o444) + client = SSHClient() + client.set_missing_host_key_policy(IgnoreMissingHostKeyPolicy()) + client.connect( + hostname=host, + port=port, + username=username, + password=password, + key_filename=key_file, + look_for_keys=False) + _stdin, stdout, stderr = client.exec_command(command) + code = stdout.channel.recv_exit_status() + stdout = stdout.read() + stderr = stderr.read() + client.close() + return code, stdout, stderr + +def run(cmd, venv=False, venv_dir="venv", print_output=False, allow_fail=True, return_stdout=False, cwd=None): + """Function to use run bash command""" + if venv: + # On Ubuntu, it can be necessary to unset PYTHONPATH in order to avoid picking up dist-utils + cmd = ("unset PYTHONPATH; . %s/bin/activate; " % venv_dir) + cmd + logging.info(f"Running command: {cmd}") + proc = subprocess.Popen( + cmd, shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + executable='/bin/bash', cwd=cwd + ) + lines = StringIO() + for line in iter(proc.stdout.readline, b''): + line = line.decode() + if print_output: + logging.info(line.strip()) + lines.write(line) + + output, err = proc.communicate() + logging.info(output) + returncode = proc.returncode + if returncode != 0: + if allow_fail: + logging.warning("Command failed with code %s: %s: %s" % (returncode, cmd, err)) + else: + raise RuntimeError("Command failed with code %s: %s: %s" % (returncode, cmd, err)) + + logging.info(f"finished command {cmd}") + return lines.getvalue() if return_stdout else returncode + +def setup_virtualenv(venv_dir, args): + """Install virtualenv if necessary, and create VIRTUAL_ENV directory with virtualenv command.""" + if run("which virtualenv") != 0: + # install virtualenv if necessary + logging.info("No virtualenv found. Installing...") + run(f"{args.python} -m pip install virtualenv", + allow_fail=False) + logging.info("Installation of virtualenv succeeded") + + if not os.path.exists(venv_dir): + logging.info("Setting up virtualenv...") + run(f"virtualenv -p {args.python} {venv_dir}", allow_fail=False) + run("pip install --upgrade pip setuptools", venv=True, allow_fail=False) + +def parse_args(): + """ + This is function is parsing all argument that we are passing while running the file + """ + parser = argparse.ArgumentParser() + parser.add_argument("--aws", action="store_true", help="use commands for running on AWS") + parser.add_argument("--image-name", action="store", type=str, default=os.environ.get('IMAGE_NAME'), + help="Name of image to use for virtual machines.") + parser.add_argument("--install-type", action="store", default=SOURCE_INSTALL, + help="how Confluent Platform will be installed") + parser.add_argument("--instance-name", action="store", default="KAFKA_TEST_SEMAPHORE", + help="Name of AWS instances") + parser.add_argument("--worker-instance-type", action="store", default=INSTANCE_TYPE, + help="AWS instance type to be used for worker nodes.") + parser.add_argument("--worker-volume-size", action="store", type=int, + default=40, help="Volume size in GB to be used for worker nodes. Min 50GB.") + parser.add_argument("--vagrantfile", action="store", default="system-test/resources/scripts/system-tests/kafka-system-test/Vagrantfile.local", + help="specify location of template for Vagrantfile.local") + parser.add_argument("--num-workers", action="store", type=int, + default=1, help="number of worker nodes to bring up") + parser.add_argument("--enable-pwd-scan", action="store_true", + help="run password scanner with --enable-pwd-scan flag." + "Scan for passwords from constants file in logs ") + parser.add_argument("--notify-slack-with-passwords", action="store_true", + help="Notify slack channel with passwords found") + parser.add_argument("--results-root", action="store", default="./results", help="direct test output here") + parser.add_argument('test_path', metavar='test_path', type=str, nargs='*', + default=["tests/kafkatest/tests/core/security_test.py"], + help="run tests found underneath this directory. " + "This directory is relative to root muckrake directory.") + parser.add_argument("--collect-only", action="store_true", help="run ducktape with --collect-only flag") + parser.add_argument("--cleanup", action="store", type=parse_bool, default=True, + help="Tear down instances after tests.") + parser.add_argument("--repeat", action="store", type=int, default=1, + help="Use this flag to repeat all discovered tests the given number of times.") + parser.add_argument("--parallel", action="store_true", help="if true, run tests in parallel") + parser.add_argument("--linux-distro", action="store", type=str, default="ubuntu", + help="The linux distro to install on.") + parser.add_argument("--sample", action="store", type=int, default=None, + help="The size of a random sample of tests to run") + parser.add_argument("--python", action="store", type=str, default="python", help="The python executable to use") + parser.add_argument("--build-url", action="store", type=str, default="kafka.test.us", + help="The Jenkins Build URL to tag AWS Resources") + parser.add_argument("--parameters", action="store", type=str, default=None, help="Override test parameter") + parser.add_argument("--spot-instance", action="store_true", help="run as spot instances") + parser.add_argument("--spot-price", action="store", type=float, default=0.266, help="maximum price for a spot instance") + parser.add_argument("--ssh-checker", action="store", nargs='+', + default=['ssh_checkers.aws_checker.aws_ssh_checker'], + help="full module path of functions to run in the case of an ssh error") + parser.add_argument("--jdk-version", action="store", type=str, default="8", + help="JDK version to install on the nodes."), + parser.add_argument("--jdk-arch", action="store", type=str, default="x64", + help="JDK arch to execute."), + parser.add_argument("--nightly", action="store_true", default=False, help="Mark this as a nightly run") + parser.add_argument("--new-globals", action="store", type=str, default=None, help="Additional global params to be passed in ducktape") + parser.add_argument("--arm-image", action="store_true", help="load the ARM based image of specified distro") + parser.add_argument("--existing-ami", action="store", type=str, default=None, help="AMI ID to use for the instance, skipping ami creation") + args, rest = parser.parse_known_args(sys.argv[1:]) + + return args, rest + +def parse_bool(s): + return True if s and s.lower() not in ('0', 'f', 'no', 'n', 'false') else False \ No newline at end of file diff --git a/terraform/main.tf b/terraform/main.tf new file mode 100644 index 0000000000000..03d927cab2844 --- /dev/null +++ b/terraform/main.tf @@ -0,0 +1,83 @@ +# Copyright 2024 Confluent Inc. +locals { + common_tags = { + "Name": "ccs-kafka-worker", + "ducktape": "true", + "Owner": "ce-kafka", + "role": "ce-kafka", + "cflt_environment": "devel", + "cflt_partition": "onprem", + "cflt_managed_by": "iac", + "cflt_managed_id": "ce-kafka", + "cflt_service": "ce-kafka" + } +} + +terraform { + required_providers { + aws = { + source = "hashicorp/aws" + version = "~> 5.0" + } + } +} + +provider "aws" { + region = "us-west-2" + default_tags { + tags = local.common_tags + } +} + +data "template_file" "user_data" { + template = file("./cloudinit.yml") + vars = { + deployment = var.deployment + public_key = var.public_key + base_script = data.local_file.base_script.content + } +} + +data "cloudinit_config" "user_data" { + gzip = false + base64_encode = true + + part { + content_type = "text/cloud-config" + content = data.template_file.user_data.rendered + } +} + +data "local_file" "base_script" { + filename = "../vagrant/base.sh" +} + +resource "aws_instance" "worker" { + count= var.num_workers + ami= var.worker_ami + instance_type = var.instance_type + key_name = "semaphore-muckrake" + subnet_id = var.subnet_id + vpc_security_group_ids = [var.security_group] + associate_public_ip_address = false + iam_instance_profile = "semaphore-access" + user_data = data.cloudinit_config.user_data.rendered + ipv6_address_count = var.ipv6_address_count + tags = { + Name = format("ccs-kafka-%d", count.index), + "SemaphoreBuildUrl":var.build_url, + "SemaphoreWorkflowUrl":var.build_url, + "SemaphoreJobId": var.job_id + } +} + +output "cloudinit_content-spot" { value= data.cloudinit_config.user_data.rendered} + +output "worker-public-ips" { value = aws_instance.worker.*.public_ip } +output "worker-ipv6s" { value = aws_instance.worker.*.ipv6_addresses } +output "worker-public-dnss" { value = aws_instance.worker.*.public_dns } +output "worker-private-ips" { value = aws_instance.worker.*.private_ip } +output "worker-private-dnss" { value = aws_instance.worker.*.private_dns } +output "worker-names" { value = aws_instance.worker.*.tags.Name } +output "worker-instance-ids" { value = aws_instance.worker.*.id } +output "cloudinit_content" { value= data.cloudinit_config.user_data.rendered} \ No newline at end of file diff --git a/terraform/resources/requirement_override.txt b/terraform/resources/requirement_override.txt new file mode 100644 index 0000000000000..740955b1b126d --- /dev/null +++ b/terraform/resources/requirement_override.txt @@ -0,0 +1 @@ +ducktape~=0.12.0 \ No newline at end of file diff --git a/terraform/resources/requirements.txt b/terraform/resources/requirements.txt new file mode 100644 index 0000000000000..237b8d2b429a9 --- /dev/null +++ b/terraform/resources/requirements.txt @@ -0,0 +1,68 @@ +attrs==21.2.0 +avro==1.11.3 +bcrypt==4.0.1 +boto3==1.20.54 +botocore==1.23.54 +cachetools==4.2.2 +certifi==2021.10.8 +cffi==1.15.1 +configparser==4.0.2 +contextlib2==0.6.0.post1 +crcmod==1.7 +cryptography==41.0.3 +elasticsearch==7.17.6 +charset-normalizer==2.0.6 +click==8.0.1 +debugpy==1.4.3 +flake8==3.9.2 +Flask==2.2.5 +Flask-HTTPAuth==4.4.0 +google-api-core==2.0.1 +google-auth==2.2.0 +google-cloud-core==2.0.0 +google-cloud-storage==1.42.2 +google-crc32c==1.2.0 +google-resumable-media==2.0.3 +googleapis-common-protos==1.53.0 +grpcio==1.58.0 +idna==3.7 +iniconfig==1.1.1 +itsdangerous==2.0.1 +javaobj-py3==0.4.3 +jinja2==3.0.3 +jmespath==0.10.0 +MarkupSafe==2.1.3 +mccabe==0.6.1 +opencensus-proto==0.1.0 +opentelemetry-proto==1.19.0 +packaging==23.0 +paho-mqtt==1.5.1 +pluggy==1.0.0 +protobuf==4.24.4 +py==1.10.0 +pyasn1==0.4.8 +pycodestyle==2.7.0 +pycparser==2.21 +pycryptodome==3.19.1 +pyflakes==2.3.1 +pyjks==20.0.0 +pyparsing==3.0.9 +pytest==6.2.5 +python-dateutil==2.8.2 +pytz==2021.1 +requests==2.32.2 +rsa==4.7.2 +s3transfer==0.5.0 +six==1.16.0 +toml==0.10.2 +twofish==0.3.0 +urllib3==1.26.7 +websocket-client==1.2.1 +Werkzeug==2.2.3 +zstandard==0.15.2 +psutil==5.9.0 +gnupg==2.3.1 +selenium==4.14.0 +distro==1.9.0 +jwcrypto==1.5.6 +python-jwt==4.0.0 diff --git a/terraform/resources/terraform-worker-config.sh b/terraform/resources/terraform-worker-config.sh new file mode 100644 index 0000000000000..a6bbd0a7cc3fe --- /dev/null +++ b/terraform/resources/terraform-worker-config.sh @@ -0,0 +1,14 @@ +# Check AMI_ID present or not as env variable if not then set +AMI_ID() { + echo "ami-29ebb519" +} + +if [ -z "${AMI_ID}" ]; then + echo "AMI_ID is not set. Setting it now." + # Set the AMI_ID variable + export AMI_ID=$(AMI_ID) +else + echo "AMI_ID is already set to: $AMI_ID" +fi + +export IPV6_SUPPORT="False" \ No newline at end of file diff --git a/terraform/variable.tf b/terraform/variable.tf new file mode 100644 index 0000000000000..794369abadff3 --- /dev/null +++ b/terraform/variable.tf @@ -0,0 +1,57 @@ +variable "ec2_region" { + type = string + description = "EC2 region" + default = "us-west-2" +} + +variable "num_workers" { + type = string + description = "number of workers" +} + +variable "worker_ami" { + type = string + description = "AMI of aws" +} + +variable "instance_type" { + type = string + description = "Instance type of aws" + default = "c4.xlarge" +} + +variable "deployment" { + type = string + default = "ubuntu" + description = "linux distro you are deploying with, valid values are ubuntu and rpm" +} + +variable "public_key" { + type = string + description = "kafka pem file public key" +} + +variable "build_url" { + type = string +} + +variable "job_id" { + type = string + description = "semaphore job id" +} + +variable "subnet_id" { + type = string + description = "subnet id" +} + +variable "ipv6_address_count"{ + type = number + description = "number of ipv6 addresses" +} + +variable "security_group" { + type = string + description = "security group id" + default = "sg-03364f9fef903b17d" +} \ No newline at end of file diff --git a/vagrant/aws-packer.json b/vagrant/aws-packer.json new file mode 100644 index 0000000000000..7025202447e2e --- /dev/null +++ b/vagrant/aws-packer.json @@ -0,0 +1,62 @@ +{ + "variables": { + "aws_access_key": "", + "aws_secret_key": "", + "region": "", + "source_ami": "", + "ami_name": "", + "instance_type": "", + "ssh_account": "ubuntu", + "linux_distro": "ubuntu", + "jdk_version": "8", + "jdk_arch": "x64", + "enable_ena_driver_install": "true" + + }, + "builders": [{ + "type": "amazon-ebs", + "iam_instance_profile": "semaphore-access", + "region": "{{user `region`}}", + "subnet_id": "{{user `subnet_id`}}", + "source_ami": "{{user `source_ami`}}", + "instance_type": "{{user `instance_type`}}", + "ssh_username": "{{user `ssh_account`}}", + "ami_name": "{{user `ami_name`}}", + "aws_polling": { + "delay_seconds": 60, + "max_attempts": 60 + }, + "security_group_id": "{{user `security_group_id`}}", + "tags": { + "Owner": "ce-kafka", + "Service": "ce-kafka", + "Type": "Base", + "role": "ce-kafka", + "CreatedBy": "kafka-system-test" + }, + "launch_block_device_mappings": [{ + "device_name": "/dev/sda1", + "volume_size": 60, + "volume_type": "gp3", + "delete_on_termination": true + }], + "ena_support": true + }], + "provisioners": [ + { + "type": "shell", + "inline": ["while [ ! -f /var/lib/cloud/instance/boot-finished ]; do echo 'Waiting for cloud-init...'; sleep 1; done"] + }, + { + "environment_vars": [ + "JDK_MAJOR={{ user `jdk_version` }}", + "JDK_ARCH={{ user `jdk_arch` }}" + ], + "execute_command": "echo 'packer' | {{ .Vars }} sudo -E -S bash '{{ .Path }}'", + "type": "shell", + "scripts": [ + "../vagrant/base.sh" + ] + } + ] +} \ No newline at end of file diff --git a/vagrant/base.sh b/vagrant/base.sh index 87597de50226a..611437c67f253 100755 --- a/vagrant/base.sh +++ b/vagrant/base.sh @@ -38,6 +38,8 @@ fetch_jdk_tgz() { JDK_MAJOR="${JDK_MAJOR:-17}" JDK_FULL="${JDK_FULL:-17-linux-x64}" +echo "JDK_MAJOR=$JDK_MAJOR JDK_ARCH=$JDK_ARCH" +export DEBIAN_FRONTEND=noninteractive if [ -z `which javac` ]; then apt-get -y update diff --git a/vagrant/worker-ami.json b/vagrant/worker-ami.json new file mode 100644 index 0000000000000..1bd2e549143db --- /dev/null +++ b/vagrant/worker-ami.json @@ -0,0 +1,82 @@ +{ + "variables": { + "aws_access_key": "", + "aws_secret_key": "", + "region": "", + "vpc_id": "", + "subnet_id": "", + "source_ami": "", + "ami_name": "", + "install_type": "", + "resource_url": "", + "instance_type": "", + "ssh_account": "ubuntu", + "linux_distro": "ubuntu", + "jdk_version": "8", + "jdk_arch": "x64", + "mvn_user": "{{env `ORG_GRADLE_PROJECT_mavenUsername`}}", + "mvn_pass": "{{env `ORG_GRADLE_PROJECT_mavenPassword`}}" + }, + "builders": [{ + "type": "amazon-ebs", + "iam_instance_profile": "semaphore-access", + "region": "{{user `region`}}", + "subnet_id": "{{user `subnet_id`}}", + "vpc_id": "{{user `vpc_id`}}", + "source_ami": "{{user `source_ami`}}", + "instance_type": "{{user `instance_type`}}", + "ssh_username": "{{user `ssh_account`}}", + "ami_name": "{{user `ami_name`}}", + "aws_polling": { + "delay_seconds": 60, + "max_attempts": 60 + }, + "security_group_id": "{{user `security_group_id`}}", + "tags": { + "Owner": "ce-kafka", + "Service": "ce-kafka", + "Type": "Worker", + "role": "kafka-worker", + "CreatedBy": "kafka-system-test", + "Name": "{{user `instance_name`}}", + "ResourceUrl": "{{user `resource_url`}}", + "NightlyRun": "{{user `nightly_run`}}", + "Distro": "{{user `linux_distro`}}", + "JdkVersion": "{{user `jdk_version`}}", + "Arch": "{{user `jdk_arch`}}" + }, + "launch_block_device_mappings": [{ + "device_name": "/dev/sda1", + "volume_size": "{{user `volume_size`}}", + "volume_type": "gp3", + "delete_on_termination": true + }] + }], + "provisioners": [ + { + "type": "shell", + "inline": [ + "sudo mkdir -p /vagrant", + "sudo mkdir -p ~/.m2", + "sudo chown -R {{user `ssh_account`}}:{{user `ssh_account`}} ~/.m2", + "sudo chown -R {{user `ssh_account`}}:{{user `ssh_account`}} /vagrant", + "sudo ln -s /vagrant /opt/kafka" + ] + }, + { + "type": "file", + "source": "/home/semaphore/kafka-overlay/kafka/", + "destination": "/vagrant/" + }, + { + "type": "file", + "source": "/home/semaphore/.m2/settings.xml", + "destination": "~/.m2/settings.xml" + }, + { + "type": "file", + "source": "/home/semaphore/.m2/", + "destination": "/vagrant/.m2" + } + ] +}