Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cf35c41
JNI
ZhaoNeil Oct 7, 2021
5c06d65
debug
ZhaoNeil Oct 8, 2021
8d9ad3e
aws
ZhaoNeil Nov 11, 2021
611d63d
multiple osds
ZhaoNeil Dec 5, 2021
385afef
add launch-vms
ZhaoNeil Dec 28, 2021
a46aaba
delete
ZhaoNeil Jan 5, 2022
d0ae262
ceph-volume
Jan 10, 2022
135c850
modify launch script
ZhaoNeil Jan 10, 2022
14e5f2d
multiple NVMe
ZhaoNeil Jan 12, 2022
eee74f7
WIP: getting everything to work for cloudlab. Changed a cp to an appe…
MariskaIJpelaar Feb 8, 2022
25b3e3b
WIP: getting rados-deploy to work on cloudlab: currently fixing 'too …
MariskaIJpelaar Feb 9, 2022
dda81c8
WIP: getting rados-deploy to work on cloudlab: fixed 'too many pgs' e…
MariskaIJpelaar Feb 9, 2022
6d679fc
WIP: getting rados-deploy to work on Cloudlab: fixed NameError.. Now …
MariskaIJpelaar Feb 10, 2022
4ffb467
Runs on cloudlab again.
MariskaIJpelaar Feb 15, 2022
ca2316a
Merge pull request #1 from MariskaIJpelaar/master
ZhaoNeil Feb 16, 2022
3ffa086
Add --use-ceph-volume argument for bluestore
ZhaoNeil Feb 16, 2022
eef63f4
WIP:getting data deployment from S3. Fix /etc/hosts: Permission denied
ZhaoNeil Feb 17, 2022
2693462
fix librados failed bug
ZhaoNeil Feb 27, 2022
5a97a6d
fix ceph.file.layout.object_size
ZhaoNeil Feb 27, 2022
c305696
download full tpcds dataset from s3
ZhaoNeil Mar 23, 2022
4a0b29a
tpcds-1T dataset
ZhaoNeil Apr 8, 2022
c166b0f
aws s3 sync
ZhaoNeil Apr 11, 2022
99ae5c6
hide aws credential
ZhaoNeil Apr 11, 2022
329e39b
tpcds split dataset
ZhaoNeil Apr 25, 2022
b1ba430
WIP: shkyhook version to latest
ZhaoNeil Jun 30, 2022
a3d4a00
remove launch script
ZhaoNeil Jun 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion rados_deploy/cli/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def subparser(subparsers):
Each node must provide extra info:
- device_path: Path to storage device, e.g. "/dev/nvme0n1p4".''')
bluestoreparser.add_argument('--device-path', metavar='path', dest='device_path', type=str, default=None, help='Overrides "device_path" specification for all nodes.')
bluestoreparser.add_argument('--use-ceph-volume', metavar='bool', dest='use_ceph_volume', type=bool, default=False, help='Use ceph-volume command for osds')

return [startparser, memstoreparser, bluestoreparser]

Expand All @@ -47,7 +48,7 @@ def deploy(parsers, args):
elif args.subcommand == 'bluestore':
from rados_deploy.start import bluestore
reservation = _cli_util.read_reservation_cli()
return bluestore(reservation, key_path=args.key_path, admin_id=args.admin_id, mountpoint_path=args.mountpoint, osd_op_threads=args.osd_op_threads, osd_pool_size=args.osd_pool_size, osd_max_obj_size=args.osd_max_obj_size, placement_groups=args.placement_groups, use_client_cache=not args.disable_client_cache, device_path=args.device_path, silent=args.silent, retries=args.retries)[0] if reservation else False
return bluestore(reservation, key_path=args.key_path, admin_id=args.admin_id, mountpoint_path=args.mountpoint, osd_op_threads=args.osd_op_threads, osd_pool_size=args.osd_pool_size, osd_max_obj_size=args.osd_max_obj_size, placement_groups=args.placement_groups, use_client_cache=not args.disable_client_cache, device_path=args.device_path, use_ceph_volume=args.use_ceph_volume, silent=args.silent, retries=args.retries)[0] if reservation else False
else: # User did not specify what type of storage type to use.
printe('Did not provide a storage type (e.g. bluestore).')
parsers[0].print_help()
Expand Down
1 change: 1 addition & 0 deletions rados_deploy/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def _install_rados(connection, module, reservation, install_dir, arrow_url=defau
printe('Could not install ceph-deploy.')
return False
hosts_designations_mapping = {x.hostname: [Designation[y.strip().upper()].name for y in x.extra_info['designations'].split(',')] if 'designations' in x.extra_info else [] for x in reservation.nodes}
print(hosts_designations_mapping)
if not remote_module.install_ceph(hosts_designations_mapping, silent):
printe('Could not install Ceph on some node(s).')
return False
Expand Down
43 changes: 36 additions & 7 deletions rados_deploy/internal/data_deploy/rados_deploy.deploy.plugin.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
import argparse
import concurrent.futures
from concurrent.futures import process
import itertools
from multiprocessing import cpu_count
import os
import subprocess
from tabnanny import check

import data_deploy.shared.copy
import data_deploy.shared.link
Expand Down Expand Up @@ -47,10 +49,22 @@ def _ensure_attr(connection):
'''Installs the 'attr' package, if not available.'''
_, _, exitcode = remoto.process.check(connection, 'which setfattr', shell=True)
if exitcode != 0:
out, err, exitcode = remoto.process.check(connection, 'sudo apt install attr -y', shell=True)
out, err, exitcode = remoto.process.check(connection, 'sudo apt update && sudo apt install attr -y', shell=True)
if exitcode != 0:
printe('Could not install "attr" package (needed for "setfattr" command). Exitcode={}.\nOut={}\nErr={}'.format(exitcode, out, err))
return False
with open ('/home/yzhao/aws_credential.txt') as f:
credential = f.readline()
access_key_id = credential.split(',')[0]
secret_access_key = credential.split(',')[1]
region = credential.split(',')[2]
_, _, exitcode = remoto.process.check(connection, 'which awscli', shell=True)
if exitcode != 0:
out, err, exitcode = remoto.process.check(connection, 'sudo apt install awscli -y && aws configure set aws_access_key_id {} && \
aws configure set aws_secret_access_key {} && aws configure set default.region {}'.format(access_key_id, secret_access_key, region), shell=True)
if exitcode != 0:
printe('Could not install "awscli" package. Exitcode={}.\nOut={}\nErr={}'.format(exitcode, out, err))
return False
return True


Expand Down Expand Up @@ -136,16 +150,17 @@ def _execute_internal(connectionwrapper, reservation, paths, dest, silent, copy_
printe('File {} is too large ({} bytes, max allowed is {} bytes)'.format(x, os.path.getsize(x), max_filesize))
return False
files_to_deploy += [(x, fs.join(dest, x[path_len+1:])) for x in files]

if not silent:
print('Transferring data...')

futures_pre_deploy = [executor.submit(_pre_deploy_remote_file, connectionwrapper.connection, stripe, copies_to_add, links_to_add, source_file, dest_file) for (source_file, dest_file) in files_to_deploy]
if not all(x.result() for x in futures_pre_deploy):
printe('Pre-data deployment error occured.')
return False

if not silent:
print('Transferring data...')
fun = lambda path: subprocess.call('rsync -e "ssh -F {}" -q -aHAXL --inplace {} {}:{}'.format(connectionwrapper.ssh_config.name, path, admin_node.ip_public, fs.join(dest, fs.basename(path))), shell=True) == 0

fun = lambda path: subprocess.call('ssh -F {} ubuntu@{} "cd {} && aws s3 sync s3://ceph-dataset/tpcds-1t-split/ ./"'.format(connectionwrapper.ssh_config.name, admin_node.ip_public, fs.join(dest, fs.basename(path))), shell=True) == 0
futures_rsync = {path: executor.submit(fun, path) for path in paths}

state_ok = True
for path,future in futures_rsync.items():
if not silent:
Expand All @@ -156,8 +171,22 @@ def _execute_internal(connectionwrapper, reservation, paths, dest, silent, copy_
if not state_ok:
return False

# futures_pre_deploy = [executor.submit(_pre_deploy_remote_file, connectionwrapper.connection, stripe, copies_to_add, links_to_add, source_file, dest_file) for (source_file, dest_file) in files_to_deploy]
# if not all(x.result() for x in futures_pre_deploy):
# printe('Pre-data deployment error occured.')
# return False

# Method 1:
# _, _, exitcode = remoto.process.check(connectionwrapper.connection, 'UNIX Command', shell=True)
# if exitcode != 0:
# return False
# Method 2:
# if subprocess.call('ssh -F {} ubuntu@{} "UNIX Command"'.format(connectionwrapper.ssh_config.name, admin_node.ip_public), shell=True) != 0:
# return False


futures_post_deploy = [executor.submit(_post_deploy_remote_file, connectionwrapper.connection, stripe, copies_to_add, links_to_add, source_file, dest_file) for (source_file, dest_file) in files_to_deploy]
if all(x.result() for x in futures_pre_deploy):
if all(x.result() for x in futures_post_deploy):
prints('Data deployment success')
return True
else:
Expand Down
2 changes: 1 addition & 1 deletion rados_deploy/internal/defaults/install.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ def install_dir():
return './deps'

def arrow_url():
return 'https://github.com/Sebastiaan-Alvarez-Rodriguez/arrow/archive/refs/heads/master.zip'
return 'https://github.com/uccross/skyhookdm-arrow/archive/refs/heads/arrow-master.zip'

def cores():
return 16
Expand Down
1 change: 1 addition & 0 deletions rados_deploy/internal/remoto/modulegenerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def with_files(self, *filepaths):
def _is_regular_python(self, name):
if not self._stl_modules_cache:
self._stl_modules_cache = list(_generate_stl_libs())
self._stl_modules_cache.append('time') # so we are able to import time
return name in self._stl_modules_cache


Expand Down
4 changes: 2 additions & 2 deletions rados_deploy/internal/remoto/modules/rados/cephfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,13 @@ def start_cephfs(node, connection, ceph_deploypath, path='/mnt/cephfs', use_clie

import time
for x in range(retries):
_, _, exitcode = remoto.process.check(connection, 'sudo {} {}'.format(cmd, path), shell=True)
out, err, exitcode = remoto.process.check(connection, 'sudo {} {}'.format(cmd, path), shell=True)
if exitcode == 0:
prints('[{}] Succesfully called ceph-fuse (attempt {}/{}) (I/O caching={})'.format(node.hostname, x+1, retries, 'true' if use_client_cache else 'false'))
state_ok = True
break
else:
printw('[{}] Executing ceph-fuse... (attempt {}/{})'.format(node.hostname, x+1, retries))
printw('[{}] Executing ceph-fuse... (attempt {}/{})\n output: {}\n error: {}'.format(node.hostname, x+1, retries, out, err))
time.sleep(1)
if not state_ok:
return False
Expand Down
12 changes: 8 additions & 4 deletions rados_deploy/internal/remoto/modules/rados/osd.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

def stop_osds_memstore(osds, silent):
'''Completely stops and removes all old running OSDs. Does not return anything.
Warning: First, CephFS must be stopped, and seconfly, the Ceph pools must removed, before calling this function.'''
Warning: First, CephFS must be stopped, and secondly, the Ceph pools must removed, before calling this function.'''


# stopping osds
Expand Down Expand Up @@ -130,19 +130,23 @@ def func(number, silent):
return all(x.result() for x in futures)


def start_osd_bluestore(ceph_deploypath, osd, num_osds, silent):
def start_osd_bluestore(ceph_deploypath, osd, num_osds, silent, use_ceph_volume):
'''Starts a Ceph OSD for bluestore clusters.
Requires that a key "device_path" is set in the extra_info of the node, which points to a device that will serve as data storage location.
Args:
ceph_deploypath (str): Absolute path to ceph_deploy.
osd (metareserve.Node): Node to start OSD daemon on.
num_osds (int): Amount of OSD daemons to spawn on local device.
silent: If set, suppresses debug output.
use_ceph_volume: If set, uses 'ceph-volume' instead of 'osd create'

Returns:
`True` on success, `False` on failure.'''
executors = [Executor('{} -q osd create --data {} {}'.format(ceph_deploypath, osd.extra_info['device_path'], osd.hostname), **get_subprocess_kwargs(silent)) for x in range(num_osds)]
Executor.run_all(executors)
if use_ceph_volume:
executors = [Executor('ssh {} "sudo ceph-volume lvm batch --yes --no-auto --osds-per-device {} {}"'.format(osd.hostname, num_osds, osd.extra_info['device_path'].split(',')[x]), **get_subprocess_kwargs(silent)) for x in range(len(osd.extra_info['device_path'].split(',')))]
else:
executors = [Executor('{} -q osd create --data {} {}'.format(ceph_deploypath, osd.extra_info['device_path'].split(',')[x], osd.hostname), **get_subprocess_kwargs(silent)) for x in range(num_osds)]
Executor.run_all(executors)
return Executor.wait_all(executors, print_on_error=True)


Expand Down
4 changes: 2 additions & 2 deletions rados_deploy/internal/remoto/modules/rados/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ def destroy_pools(silent):
def create_pools(placement_groups, silent):
'''Create ceph pools.'''
try:
subprocess.check_call('sudo ceph osd pool create cephfs_data {}'.format(placement_groups), **get_subprocess_kwargs(silent))
subprocess.check_call('sudo ceph osd pool create cephfs_metadata {}'.format(placement_groups), **get_subprocess_kwargs(silent))
subprocess.check_call('sudo ceph osd pool create cephfs_data {0} {0}'.format(placement_groups), **get_subprocess_kwargs(silent))
subprocess.check_call('sudo ceph osd pool create cephfs_metadata {0} {0}'.format(placement_groups), **get_subprocess_kwargs(silent))
subprocess.check_call('sudo ceph osd pool set cephfs_data pg_autoscale_mode off', **get_subprocess_kwargs(silent))
subprocess.check_call('sudo ceph fs new cephfs cephfs_metadata cephfs_data', **get_subprocess_kwargs(silent))
return True
Expand Down
21 changes: 16 additions & 5 deletions rados_deploy/internal/remoto/modules/rados_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def install_ceph(hosts_designations_mapping, silent=False):
if not any(designations): # If no designation given for node X, we skip installation of Ceph for X.
continue
designation_out = '--'+' --'.join([x.lower() for x in set(designations)])
executors.append(Executor('{} --overwrite-conf install --release octopus {} {}'.format(ceph_deploypath, designation_out, hostname), shell=True))
executors.append(Executor('{} --overwrite-conf install --release octopus {} {}'.format(ceph_deploypath, designation_out, hostname), shell=True))
Executor.run_all(executors)
return Executor.wait_all(executors, print_on_error=True)

Expand Down Expand Up @@ -159,18 +159,28 @@ def install_rados(location, hosts_designations_mapping, arrow_url, force_reinsta
return False
if not silent:
print('Installing required libraries for RADOS-Ceph.\nPatience...')
cmd = 'sudo apt install libradospp-dev rados-objclass-dev openjdk-8-jdk-headless openjdk-11-jdk-headless libboost-all-dev automake bison flex g++ libevent-dev libssl-dev libtool make pkg-config maven cmake thrift-compiler -y'
# cmd = 'sudo apt install libradospp-dev rados-objclass-dev openjdk-8-jdk openjdk-11-jdk default-jdk libboost-all-dev automake bison flex g++ libevent-dev libssl-dev \
# libtool make pkg-config maven cmake thrift-compiler llvm -y'
cmd = 'sudo apt install -y python3 python3-pip python3-venv python3-numpy cmake libradospp-dev rados-objclass-dev llvm default-jdk maven'
if subprocess.call(cmd, shell=True, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL) != 0:
printe('Failed to install all required libraries. Command used: {}'.format(cmd))
return False
if not silent:
prints('Installed required libraries.')
if (not isdir(location)) and not _get_rados_dev(location, arrow_url, silent=silent, retries=5):
return False
cmake_cmd = 'cmake . -DARROW_PARQUET=ON -DARROW_DATASET=ON -DARROW_JNI=ON -DARROW_ORC=ON -DARROW_CSV=ON -DARROW_CLS=ON'

cmake_cmd = 'cmake -DARROW_SKYHOOK=ON -DARROW_PARQUET=ON -DARROW_WITH_SNAPPY=ON -DARROW_WITH_ZLIB=ON -DARROW_BUILD_EXAMPLES=ON -DPARQUET_BUILD_EXAMPLES=ON \
-DARROW_PYTHON=ON -DARROW_ORC=ON -DARROW_JAVA=ON -DARROW_JNI=ON -DARROW_DATASET=ON -DARROW_CSV=ON -DARROW_WITH_LZ4=ON -DARROW_WITH_ZSTD=ON'
if debug:
cmake_cmd += ' -DCMAKE_BUILD_TYPE=Debug'
if subprocess.call(cmake_cmd+' 1>&2', cwd='{}/cpp'.format(location), **kwargs) != 0:
print ("!!!! " + cmake_cmd + " !!!!!")

my_env = os.environ.copy()
my_env["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
print(my_env)
subprocess.call(cmake_cmd+' 1>&2', cwd='{}/cpp'.format(location), env=my_env, **kwargs)
if subprocess.call(cmake_cmd+' 1>&2', cwd='{}/cpp'.format(location), env=my_env, **kwargs) != 0:
return False
if subprocess.call('sudo make install -j{} 1>&2'.format(cores), cwd='{}/cpp'.format(location), **kwargs) != 0:
return False
Expand All @@ -193,6 +203,7 @@ def install_rados(location, hosts_designations_mapping, arrow_url, force_reinsta
executors = [Executor('ssh {} "sudo cp ~/.arrow-libs/libcls* /usr/lib/rados-classes/"'.format(x), **kwargs) for x in hosts]
executors += [Executor('ssh {} "sudo cp ~/.arrow-libs/libarrow* /usr/lib/"'.format(x), **kwargs) for x in hosts]
executors += [Executor('ssh {} "sudo cp ~/.arrow-libs/libparquet* /usr/lib/"'.format(x), **kwargs) for x in hosts]
executors += [Executor('ssh {} "sudo systemctl restart ceph-osd.target"'.format(x), **kwargs) for x in hosts]

Executor.run_all(executors)
if not Executor.wait_all(executors, print_on_error=True):
Expand All @@ -207,4 +218,4 @@ def install_rados(location, hosts_designations_mapping, arrow_url, force_reinsta
if not libpath or not '/usr/local/lib' in libpath.strip().split(':'):
env.set('LD_LIBRARY_PATH', '/usr/local/lib:'+libpath)
os.environ['LD_LIBRARY_PATH'] = '/usr/local/lib:'+libpath
return subprocess.call('sudo cp /usr/local/lib/libparq* /usr/lib/', **kwargs) == 0
return subprocess.call('sudo cp /usr/local/lib/libparq* /usr/lib/', **kwargs) == 0
18 changes: 16 additions & 2 deletions rados_deploy/internal/remoto/modules/ssh_install.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ def already_installed(privkey_sha256):

Returns:
`True` if the SSH keys are already installed.'''
home = os.path.expanduser('~/')
home = os.path.expanduser('~')
keyfile = '{}/.ssh/rados_deploy.rsa'.format(home)
if not isfile(keyfile):
return False
Expand Down Expand Up @@ -41,7 +41,7 @@ def install_ssh_keys(hosts, keypair, user, use_sudo=True):

Returns:
`True` on success, `False` on failure.'''
home = os.path.expanduser('~/')
home = os.path.expanduser('~')

mkdir('{}/.ssh'.format(home), exist_ok=True)
if isfile('{}/.ssh/config'.format(home)):
Expand All @@ -51,6 +51,20 @@ def install_ssh_keys(hosts, keypair, user, use_sudo=True):
hosts_available = []
neededinfo = sorted(hosts)

local_ip = ''.join('''{0} {1}\n'''.format(x[3:].replace("-", "."), x) for x in neededinfo)
with open('{}/hosts'.format(home), mode='a') as f:
# f.write('127.0.0.1 localhost\n')
f.write(local_ip)
subprocess.call('sudo cp {}/hosts /etc/hosts'.format(home),shell=True)
# subprocess.call('sudo cat {}/hosts >> /etc/hosts'.format(home),shell=True)

# if subprocess.call('sudo vgcreate --force --yes "ceph" "/dev/nvme1n1"',shell=True) != 0:
# return False
# if subprocess.call('sudo lvcreate --yes -l 40%VG -n "ceph-lv-1" "ceph"',shell=True) != 0:
# return False
# if subprocess.call('sudo lvcreate --yes -l 40%VG -n "ceph-lv-2" "ceph"',shell=True) != 0:
# return False

config = ''.join('''
Host {0}
Hostname {0}
Expand Down
5 changes: 3 additions & 2 deletions rados_deploy/internal/remoto/modules/start/bluestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ def _merge_kwargs(x, y):
return z


def start_rados_bluestore(reservation_str, mountpoint_path, osd_op_threads, osd_pool_size, osd_max_obj_size, placement_groups, use_client_cache, silent, retries):
def start_rados_bluestore(reservation_str, mountpoint_path, osd_op_threads, osd_pool_size, osd_max_obj_size, placement_groups, use_client_cache, use_ceph_volume, silent, retries):
'''Starts a Ceph cluster with RADOS-Arrow support.
Args:
reservation_str (str): String representation of a `metareserve.reservation.Reservation`.
Expand All @@ -119,6 +119,7 @@ def start_rados_bluestore(reservation_str, mountpoint_path, osd_op_threads, osd_
osd_max_obj_size (int): Maximal object size in bytes. Normal=128*1024*1024 (128MB).
placement_groups (int): Amount of placement groups in Ceph.
use_client_cache (bool): Toggles using cephFS I/O cache.
use_ceph_volume (bool): If set, uses 'ceph-volume' instead of 'osd create'
silent (bool): If set, prints are less verbose.
retries (int): Number of retries for potentially failing operations.

Expand Down Expand Up @@ -223,7 +224,7 @@ def start_rados_bluestore(reservation_str, mountpoint_path, osd_op_threads, osd_
futures_start_osds = []
for x in osds:
num_osds = len([1 for y in x.extra_info['designations'].split(',') if y == Designation.OSD.name.lower()])
futures_start_osds.append(executor.submit(start_osd_bluestore, ceph_deploypath, x, num_osds, silent))
futures_start_osds.append(executor.submit(start_osd_bluestore, ceph_deploypath, x, num_osds, silent, use_ceph_volume))
if not all(x.result() for x in futures_start_osds):
close_wrappers(connectionwrappers)
return False
Expand Down
2 changes: 2 additions & 0 deletions rados_deploy/internal/util/executor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import subprocess
import os
import threading
import time


class Executor(object):
'''Object to run subprocess commands in a separate thread. This way, Python can continue operating while interacting with subprocesses.'''
Expand Down
Loading