Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions dbclient/HiveClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,9 +561,17 @@ def log_table_ddl(self, cid, ec_id, db_name, table_name, metastore_dir, error_lo
return False
# read that data using the dbfs rest endpoint which can handle 2MB of text easily
read_args = {'path': '/tmp/migration/tmp_export_ddl.txt'}
read_resp = self.get('/dbfs/read', read_args)
offSet = 0
length = 999999
data_res = ''
while True:
read_resp = self.get(f'/dbfs/read?length={length}&offset={offSet}', read_args)
data_res += read_resp.get('data')
if int(read_resp.get('bytes_read')) >= length:
offSet += length
else: break
with open(table_ddl_path, "w", encoding="utf-8") as fp:
fp.write(base64.b64decode(read_resp.get('data')).decode('utf-8'))
fp.write(base64.b64decode(data_res).decode('utf-8'))
return True
else:
export_ddl_cmd = 'print(ddl_str)'
Expand Down
7 changes: 4 additions & 3 deletions dbclient/JobsClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def get_jobs_list(self, print_json=False):
# 'tasks' field) on API 2.0.
res = self.get("/jobs/list", print_json, version='2.0')
for job in res.get('jobs', []):
jobsById[job.get('job_id')] = job

if job.get('settings', {}).get('schedule', {}).get('pause_status', '') == 'UNPAUSED' or job.get('settings', {}).get('continuous', {}).get('pause_status', '') == 'UNPAUSED':
jobsById[job.get('job_id')] = job
limit = 25 # max limit supported by the API
offset = 0
has_more = True
Expand All @@ -46,7 +46,8 @@ def get_jobs_list(self, print_json=False):
for job in res.get('jobs', []):
jobId = job.get('job_id')
# only replaces "real" MULTI_TASK jobs, as they contain the task definitions.
if jobsById[jobId]['settings'].get('format') == 'MULTI_TASK':
presentInJobsById = jobsById.get(jobId, None)
if presentInJobsById and jobsById[jobId]['settings'].get('format') == 'MULTI_TASK':
jobsById[jobId] = job
return jobsById.values()

Expand Down
6 changes: 6 additions & 0 deletions dbclient/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,10 @@ def get_pipeline_parser() -> argparse.ArgumentParser:
parser.add_argument('--bypass-secret-acl', action='store_true', default=False,
help='Use to set the initial principal for secrets in standard-tier workspaces')

parser.add_argument('--database', action='store', default=None,
help='Database name to export for the metastore and table ACLs. Single database name supported')

parser.add_argument('--iam', action='store',
help='IAM Instance Profile to export metastore entires')

return parser
9 changes: 8 additions & 1 deletion tasks/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,15 @@ def __init__(self, client_config, checkpoint_service, args, skip=False):
self.args = args

def run(self):
print("Arguments:")
print(self.args)
hive_c = HiveClient(self.client_config, self.checkpoint_service)
hive_c.export_hive_metastore(cluster_name=self.args.cluster_name,
if self.args.database is not None:
# export only a single database with a given iam role
database_name = self.args.database
hive_c.export_database(database_name, self.args.cluster_name, self.args.iam, has_unicode=self.args.metastore_unicode)
else:
hive_c.export_hive_metastore(cluster_name=self.args.cluster_name,
has_unicode=self.args.metastore_unicode)


Expand Down