Skip to content

Commit 60422a6

Browse files
committed
Add support for import/export of repos
1 parent 5d180e5 commit 60422a6

File tree

4 files changed

+140
-21
lines changed

4 files changed

+140
-21
lines changed

dbclient/ClustersClient.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ def __init__(self, configs, checkpoint_service):
1212
super().__init__(configs)
1313
self._checkpoint_service = checkpoint_service
1414
self.groups_to_keep = configs.get("groups_to_keep", False)
15+
self.skip_missing_users = configs['skip_missing_users']
1516

1617
create_configs = {'num_workers',
1718
'autoscale',
@@ -299,9 +300,17 @@ def import_cluster_configs(self, log_file='clusters.log', acl_log_file='acl_clus
299300
raise ValueError(error_message)
300301
api = f'/preview/permissions/clusters/{cid}'
301302
resp = self.put(api, acl_args)
302-
if not logging_utils.log_response_error(error_logger, resp):
303-
if 'object_id' in data:
304-
checkpoint_cluster_configs_set.write(data['object_id'])
303+
304+
if self.skip_missing_users:
305+
ignore_error_list = ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_ALREADY_EXISTS"]
306+
else:
307+
ignore_error_list = ["RESOURCE_ALREADY_EXISTS"]
308+
309+
if logging_utils.check_error(resp, ignore_error_list):
310+
logging_utils.log_response_error(error_logger, resp)
311+
elif 'object_id' in data:
312+
checkpoint_cluster_configs_set.write(data['object_id'])
313+
305314
print(resp)
306315

307316
def _log_cluster_ids_and_original_creators(

dbclient/WorkspaceClient.py

Lines changed: 125 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
WS_IMPORT = "/workspace/import"
1818
WS_EXPORT = "/workspace/export"
1919
LS_ZONES = "/clusters/list-zones"
20-
20+
REPOS = "/repos"
2121

2222
class WorkspaceClient(dbclient):
2323
def __init__(self, configs, checkpoint_service):
@@ -39,8 +39,8 @@ def get_top_level_folders(self):
3939
supported_types = ('NOTEBOOK', 'DIRECTORY')
4040
root_items = self.get(WS_LIST, {'path': '/'}).get('objects', [])
4141
# filter out Projects and Users folders
42-
non_users_dir = list(filter(lambda x: (x.get('path') != '/Users' and x.get('path') != '/Projects'),
43-
root_items))
42+
non_users_dir = list(filter(lambda x: (x.get('path') not in ['/Users', '/Repos']
43+
and x.get('path') != '/Projects'), root_items))
4444
dirs_and_nbs = list(filter(lambda x: (x.get('object_type') in supported_types),
4545
non_users_dir))
4646
return dirs_and_nbs
@@ -111,12 +111,23 @@ def is_user_ws_item(ws_dir):
111111
return True
112112
return False
113113

114+
@staticmethod
115+
def is_repo(ws_dir):
116+
"""
117+
Checks if this item is part of a repo.
118+
We need to use a separate API for these, so they should not be treated as standard WS items
119+
"""
120+
path_list = [x for x in ws_dir.split('/') if x]
121+
if len(path_list) >= 2 and path_list[0] == 'Repos':
122+
return True
123+
return False
124+
114125
@staticmethod
115126
def is_user_ws_root(ws_dir):
116127
"""
117-
Check if we're at the users home folder to skip folder creation
128+
Check if we're at the users home folder or repos root folder to skip folder creation
118129
"""
119-
if ws_dir == '/Users/' or ws_dir == '/Users':
130+
if ws_dir in ['/Users/', '/Users', '/Repos/', '/Repos']:
120131
return True
121132
path_list = [x for x in ws_dir.split('/') if x]
122133
if len(path_list) == 2 and path_list[0] == 'Users':
@@ -385,26 +396,33 @@ def init_workspace_logfiles(self, workspace_log_file='user_workspace.log',
385396
if os.path.exists(libs_log):
386397
os.remove(libs_log)
387398

388-
def log_all_workspace_items_entry(self, ws_path='/', workspace_log_file='user_workspace.log', libs_log_file='libraries.log', dir_log_file='user_dirs.log', exclude_prefixes=[]):
399+
def log_all_workspace_items_entry(self, ws_path='/', workspace_log_file='user_workspace.log', libs_log_file='libraries.log', dir_log_file='user_dirs.log', repos_log_file='repos.log', exclude_prefixes=[]):
389400
logging.info(f"Skip all paths with the following prefixes: {exclude_prefixes}")
390401

391402
workspace_log_writer = ThreadSafeWriter(self.get_export_dir() + workspace_log_file, "a")
392403
libs_log_writer = ThreadSafeWriter(self.get_export_dir() + libs_log_file, "a")
393404
dir_log_writer = ThreadSafeWriter(self.get_export_dir() + dir_log_file, "a")
405+
repos_log_writer = ThreadSafeWriter(self.get_export_dir() + repos_log_file, "a")
394406
checkpoint_item_log_set = self._checkpoint_service.get_checkpoint_key_set(
395407
wmconstants.WM_EXPORT, wmconstants.WORKSPACE_ITEM_LOG_OBJECT
396408
)
397409
try:
398-
num_nbs = self.log_all_workspace_items(ws_path=ws_path, workspace_log_writer=workspace_log_writer,
399-
libs_log_writer=libs_log_writer, dir_log_writer=dir_log_writer, checkpoint_set=checkpoint_item_log_set, exclude_prefixes=exclude_prefixes)
410+
num_nbs = self.log_all_workspace_items(ws_path=ws_path,
411+
workspace_log_writer=workspace_log_writer,
412+
libs_log_writer=libs_log_writer,
413+
dir_log_writer=dir_log_writer,
414+
repos_log_writer=repos_log_writer,
415+
checkpoint_set=checkpoint_item_log_set,
416+
exclude_prefixes=exclude_prefixes)
400417
finally:
401418
workspace_log_writer.close()
402419
libs_log_writer.close()
403420
dir_log_writer.close()
421+
repos_log_writer.close()
404422

405423
return num_nbs
406424

407-
def log_all_workspace_items(self, ws_path, workspace_log_writer, libs_log_writer, dir_log_writer, checkpoint_set, exclude_prefixes=[]):
425+
def log_all_workspace_items(self, ws_path, workspace_log_writer, libs_log_writer, dir_log_writer, repos_log_writer, checkpoint_set, exclude_prefixes=[]):
408426
"""
409427
Loop and log all workspace items to download them at a later time
410428
:param ws_path: root path to log all the items of the notebook workspace
@@ -423,6 +441,7 @@ def log_all_workspace_items(self, ws_path, workspace_log_writer, libs_log_writer
423441
if not os.path.exists(self.get_export_dir()):
424442
os.makedirs(self.get_export_dir(), exist_ok=True)
425443
items = self.get(WS_LIST, get_args).get('objects', None)
444+
repos = self.get(REPOS).get('repos', None)
426445
num_nbs = 0
427446
if self.is_verbose():
428447
logging.info("Listing: {0}".format(get_args['path']))
@@ -472,12 +491,13 @@ def log_all_workspace_items(self, ws_path, workspace_log_writer, libs_log_writer
472491
if folders:
473492
def _recurse_log_all_workspace_items(folder):
474493
dir_path = folder.get('path', None)
475-
if not WorkspaceClient.is_user_trash(dir_path):
494+
if not self.is_user_trash(dir_path) and not self.is_repo(dir_path):
476495
dir_log_writer.write(json.dumps(folder) + '\n')
477496
return self.log_all_workspace_items(ws_path=dir_path,
478497
workspace_log_writer=workspace_log_writer,
479498
libs_log_writer=libs_log_writer,
480499
dir_log_writer=dir_log_writer,
500+
repos_log_writer=None,
481501
checkpoint_set=checkpoint_set,
482502
exclude_prefixes=exclude_prefixes)
483503

@@ -499,6 +519,13 @@ def _recurse_log_all_workspace_items(folder):
499519
checkpoint_set.write(dir_path)
500520
if num_nbs_plus:
501521
num_nbs += num_nbs_plus
522+
# log all repos
523+
if repos_log_writer is not None:
524+
for repo in repos:
525+
repo_path = repo.get('path', "")
526+
if not checkpoint_set.contains(repo_path) and not repo_path.startswith(tuple(exclude_prefixes)):
527+
repos_log_writer.write(json.dumps(repo) + '\n')
528+
checkpoint_set.write(repo_path)
502529

503530
return num_nbs
504531

@@ -519,9 +546,15 @@ def log_acl_to_file(self, artifact_type, read_log_filename, writer, error_logger
519546
if not os.path.exists(read_log_path):
520547
logging.info(f"No log exists for {read_log_path}. Skipping ACL export ...")
521548
return
549+
522550
def _acl_log_helper(json_data):
523551
data = json.loads(json_data)
524552
obj_id = data.get('object_id', None)
553+
alt_id = data.get('id', None)
554+
555+
if alt_id and not obj_id:
556+
obj_id = alt_id
557+
525558
api_endpoint = '/permissions/{0}/{1}'.format(artifact_type, obj_id)
526559
acl_resp = self.get(api_endpoint)
527560
acl_resp['path'] = data.get('path')
@@ -538,11 +571,13 @@ def _acl_log_helper(json_data):
538571

539572
def log_all_workspace_acls(self, workspace_log_file='user_workspace.log',
540573
dir_log_file='user_dirs.log',
574+
repo_log_file="repos.log",
541575
num_parallel=4):
542576
"""
543577
loop through all notebooks and directories to store their associated ACLs
544578
:param workspace_log_file: input file for user notebook listing
545579
:param dir_log_file: input file for user directory listing
580+
:param repo_log_file: input file for repo listing
546581
"""
547582
# define log file names for notebooks, folders, and libraries
548583
logging.info("Exporting the notebook permissions")
@@ -569,6 +604,19 @@ def log_all_workspace_acls(self, workspace_log_file='user_workspace.log',
569604
end = timer()
570605
logging.info("Complete Directories ACLs Export Time: " + str(timedelta(seconds=end - start)))
571606

607+
logging.info("Exporting the repo permissions")
608+
start = timer()
609+
acl_repo_error_logger = logging_utils.get_error_logger(
610+
wmconstants.WM_EXPORT, wmconstants.WORKSPACE_REPO_ACL_OBJECT, self.get_export_dir())
611+
acl_repo_writer = ThreadSafeWriter(self.get_export_dir() + "acl_repos.log", "w")
612+
try:
613+
self.log_acl_to_file('repos', repo_log_file, acl_repo_writer, acl_repo_error_logger,
614+
num_parallel)
615+
finally:
616+
acl_repo_writer.close()
617+
end = timer()
618+
logging.info("Complete Repo ACLs Export Time: " + str(timedelta(seconds=end - start)))
619+
572620
def apply_acl_on_object(self, acl_str, error_logger, checkpoint_key_set):
573621
"""
574622
apply the acl definition to the workspace object
@@ -616,22 +664,28 @@ def apply_acl_on_object(self, acl_str, error_logger, checkpoint_key_set):
616664
if access_control_list:
617665
api_args = {'access_control_list': access_control_list}
618666
resp = self.patch(api_path, api_args)
619-
if logging_utils.check_error(resp):
620-
if resp.get("error_code", None) == "RESOURCE_DOES_NOT_EXIST" and self.skip_missing_users:
621-
error_logger.info(resp)
622-
else:
623-
logging_utils.log_response_error(resp, error_logger)
667+
668+
if self.skip_missing_users:
669+
ignore_error_list = ["RESOURCE_DOES_NOT_EXIST", "RESOURCE_ALREADY_EXISTS"]
670+
else:
671+
ignore_error_list = ["RESOURCE_ALREADY_EXISTS"]
672+
673+
if logging_utils.check_error(resp, ignore_error_list):
674+
logging_utils.log_response_error(error_logger, resp)
624675
else:
625676
checkpoint_key_set.write(obj_path)
626677
return
627678

628679
def import_workspace_acls(self, workspace_log_file='acl_notebooks.log',
629-
dir_log_file='acl_directories.log', num_parallel=1):
680+
dir_log_file='acl_directories.log',
681+
repo_log_file='acl_repos.log', num_parallel=1):
630682
"""
631683
import the notebook and directory acls by looping over notebook and dir logfiles
632684
"""
633685
dir_acl_logs = self.get_export_dir() + dir_log_file
634686
notebook_acl_logs = self.get_export_dir() + workspace_log_file
687+
repo_acl_logs = self.get_export_dir() + repo_log_file
688+
635689
acl_notebooks_error_logger = logging_utils.get_error_logger(
636690
wmconstants.WM_IMPORT, wmconstants.WORKSPACE_NOTEBOOK_ACL_OBJECT, self.get_export_dir())
637691

@@ -653,7 +707,21 @@ def import_workspace_acls(self, workspace_log_file='acl_notebooks.log',
653707
futures = [executor.submit(self.apply_acl_on_object, dir_acl_str, acl_dir_error_logger, checkpoint_dir_acl_set) for dir_acl_str in dir_acls_fp]
654708
concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION")
655709
propagate_exceptions(futures)
656-
print("Completed import ACLs of Notebooks and Directories")
710+
711+
acl_repo_error_logger = logging_utils.get_error_logger(
712+
wmconstants.WM_IMPORT, wmconstants.WORKSPACE_REPO_ACL_OBJECT, self.get_export_dir())
713+
checkpoint_repo_acl_set = self._checkpoint_service.get_checkpoint_key_set(
714+
wmconstants.WM_IMPORT, wmconstants.WORKSPACE_REPO_ACL_OBJECT)
715+
716+
with open(repo_acl_logs) as repo_acls_fp:
717+
with ThreadPoolExecutor(max_workers=num_parallel) as executor:
718+
futures = [
719+
executor.submit(self.apply_acl_on_object, repo_acl_str, acl_repo_error_logger, checkpoint_repo_acl_set)
720+
for repo_acl_str in repo_acls_fp]
721+
concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION")
722+
propagate_exceptions(futures)
723+
724+
print("Completed import ACLs of Repos, Notebooks and Directories")
657725

658726
def get_current_users(self):
659727
"""
@@ -686,7 +754,7 @@ def does_path_exist(self, dir_path):
686754
return False
687755
return True
688756

689-
def import_current_workspace_items(self,artifact_dir='artifacts/'):
757+
def import_current_workspace_items(self, artifact_dir='artifacts/'):
690758
src_dir = self.get_export_dir() + artifact_dir
691759
error_logger = logging_utils.get_error_logger(wmconstants.WM_IMPORT, wmconstants.WORKSPACE_NOTEBOOK_OBJECT,
692760
self.get_export_dir())
@@ -814,3 +882,42 @@ def _file_upload_helper(f):
814882
futures = [executor.submit(_upload_all_files, walk[0], walk[1], walk[2]) for walk in self.walk(src_dir)]
815883
concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION")
816884
propagate_exceptions(futures)
885+
886+
def import_all_repos(self, repo_log_file="repos.log", num_parallel=1):
887+
dir_repo_logs = self.get_export_dir() + repo_log_file
888+
889+
# check to see if git creds are set up- repo import will fail if not
890+
git_cred_api_path = "/git-credentials"
891+
resp = self.get(git_cred_api_path)
892+
if not resp.get("credentials", None):
893+
logging.info("Repo import will be skipped; repos can only be imported if Git credentials are first set up.")
894+
logging.info("To import repos separately, please run repo_importer.py")
895+
return
896+
897+
repo_error_logger = logging_utils.get_error_logger(
898+
wmconstants.WM_IMPORT, wmconstants.WORKSPACE_REPO_OBJECT, self.get_export_dir())
899+
checkpoint_repo_set = self._checkpoint_service.get_checkpoint_key_set(
900+
wmconstants.WM_IMPORT, wmconstants.WORKSPACE_REPO_OBJECT)
901+
902+
with open(dir_repo_logs) as repo_fp:
903+
with ThreadPoolExecutor(max_workers=num_parallel) as executor:
904+
futures = [
905+
executor.submit(self.create_repo, repo_str, repo_error_logger,
906+
checkpoint_repo_set)
907+
for repo_str in repo_fp]
908+
concurrent.futures.wait(futures, return_when="FIRST_EXCEPTION")
909+
propagate_exceptions(futures)
910+
911+
def create_repo(self, repo_str, error_logger, checkpoint_repo_set):
912+
api_path = '/repos'
913+
repo_json = json.loads(repo_str)
914+
repo_url = repo_json.get('url', None)
915+
if repo_url:
916+
logging.info("Repo: {0}".format(repo_json.get('path', '')))
917+
resp = self.post(api_path, repo_json)
918+
if logging_utils.check_error(resp):
919+
logging_utils.log_response_error(error_logger, resp)
920+
else:
921+
checkpoint_repo_set.write(repo_url)
922+
else:
923+
logging.info(f"Could not import repo {repo_json.get('path', '')}; only remote repos can be created via API.")

tasks/tasks.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,7 @@ def run(self):
183183
'Overwrite notebooks only supports the SOURCE format. See Rest API docs for details')
184184
ws_c.import_all_workspace_items(archive_missing=self.args.archive_missing,
185185
num_parallel=self.client_config["num_parallel"])
186+
ws_c.import_all_repos(num_parallel=self.client_config["num_parallel"])
186187

187188

188189
class ClustersExportTask(AbstractTask):

wmconstants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
WORKSPACE_NOTEBOOK_PATH_OBJECT = "notebook_paths"
77
WORKSPACE_NOTEBOOK_OBJECT = "notebooks"
88
WORKSPACE_DIRECTORY_OBJECT = "directories"
9+
WORKSPACE_REPO_OBJECT = "repos"
910
WORKSPACE_NOTEBOOK_ACL_OBJECT = "acl_notebooks"
1011
WORKSPACE_DIRECTORY_ACL_OBJECT = "acl_directories"
12+
WORKSPACE_REPO_ACL_OBJECT = "acl_repos"
1113
METASTORE_TABLES = "metastore"
1214
METASTORE_TABLES_ACL = "metastore_acl"
1315
CLUSTER_OBJECT = "clusters"

0 commit comments

Comments
 (0)