diff --git a/zstash/create.py b/zstash/create.py index b502f1e6..694c4343 100644 --- a/zstash/create.py +++ b/zstash/create.py @@ -6,7 +6,7 @@ import os.path import sqlite3 import sys -from typing import Any, List, Tuple +from typing import Any, List, Optional, Tuple from six.moves.urllib.parse import urlparse @@ -14,6 +14,7 @@ from .hpss import hpss_put from .hpss_utils import add_files from .settings import DEFAULT_CACHE, config, get_db_filename, logger +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import ( create_tars_table, get_files_to_archive, @@ -52,12 +53,13 @@ def create(): logger.error(input_path_error_str) raise NotADirectoryError(input_path_error_str) + gtc: Optional[GlobusTransferCollection] = None if hpss != "none": url = urlparse(hpss) if url.scheme == "globus": # identify globus endpoints - logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)") - globus_activate(hpss) + logger.debug(f"{ts_utc()}:Calling globus_activate()") + gtc = globus_activate(hpss) else: # config.hpss is not "none", so we need to # create target HPSS directory @@ -88,14 +90,23 @@ def create(): # Create and set up the database logger.debug(f"{ts_utc()}: Calling create_database()") - failures: List[str] = create_database(cache, args) + htc: HPSSTransferCollection = HPSSTransferCollection() + failures: List[str] = create_database(cache, args, gtc=gtc, htc=htc) # Transfer to HPSS. Always keep a local copy. logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}") - hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) + hpss_put( + hpss, + get_db_filename(cache), + cache, + keep=args.keep, + is_index=True, + gtc=gtc, + # htc=htc, # Don't track index.db for deletion + ) logger.debug(f"{ts_utc()}: calling globus_finalize()") - globus_finalize(non_blocking=args.non_blocking) + globus_finalize(gtc, htc, non_blocking=args.non_blocking) if len(failures) > 0: # List the failures @@ -204,7 +215,12 @@ def setup_create() -> Tuple[str, argparse.Namespace]: return cache, args -def create_database(cache: str, args: argparse.Namespace) -> List[str]: +def create_database( + cache: str, + args: argparse.Namespace, + gtc: Optional[GlobusTransferCollection], + htc: Optional[HPSSTransferCollection], +) -> List[str]: # Create new database logger.debug(f"{ts_utc()}:Creating index database") if os.path.exists(get_db_filename(cache)): @@ -263,26 +279,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: files: List[str] = get_files_to_archive(cache, args.include, args.exclude) failures: List[str] - if args.follow_symlinks: - try: - # Add files to archive - failures = add_files( - cur, - con, - -1, - files, - cache, - args.keep, - args.follow_symlinks, - skip_tars_md5=args.no_tars_md5, - non_blocking=args.non_blocking, - error_on_duplicate_tar=args.error_on_duplicate_tar, - overwrite_duplicate_tars=args.overwrite_duplicate_tars, - force_database_corruption=args.for_developers_force_database_corruption, - ) - except FileNotFoundError: - raise Exception("Archive creation failed due to broken symlink.") - else: + try: # Add files to archive failures = add_files( cur, @@ -297,7 +294,14 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]: error_on_duplicate_tar=args.error_on_duplicate_tar, overwrite_duplicate_tars=args.overwrite_duplicate_tars, force_database_corruption=args.for_developers_force_database_corruption, + gtc=gtc, + htc=htc, ) + except FileNotFoundError as e: + if args.follow_symlinks: + raise Exception("Archive creation failed due to broken symlink.") + else: + raise e # Close database con.commit() diff --git a/zstash/globus.py b/zstash/globus.py index 2cacad5f..36dcc7d1 100644 --- a/zstash/globus.py +++ b/zstash/globus.py @@ -3,92 +3,79 @@ import sys from typing import List, Optional -from globus_sdk import TransferAPIError, TransferClient, TransferData +from globus_sdk import TransferAPIError, TransferData from globus_sdk.response import GlobusHTTPResponse -from globus_sdk.services.transfer.response.iterable import IterableTransferResponse from six.moves.urllib.parse import urlparse from .globus_utils import ( HPSS_ENDPOINT_MAP, check_state_files, + file_exists, get_local_endpoint_id, get_transfer_client_with_auth, + globus_block_wait, + globus_wait, set_up_TransferData, submit_transfer_with_checks, ) from .settings import logger +from .transfer_tracking import ( + GlobusTransfer, + GlobusTransferCollection, + HPSSTransferCollection, + delete_transferred_files, +) from .utils import ts_utc -remote_endpoint = None -local_endpoint = None -transfer_client: TransferClient = None -transfer_data: TransferData = None -task_id = None -archive_directory_listing: IterableTransferResponse = None - - -def globus_activate(hpss: str): - """ - Read the local globus endpoint UUID from ~/.zstash.ini. - If the ini file does not exist, create an ini file with empty values, - and try to find the local endpoint UUID based on the FQDN - """ - global transfer_client - global local_endpoint - global remote_endpoint +def globus_activate( + hpss: str, gtc: Optional[GlobusTransferCollection] = None +) -> Optional[GlobusTransferCollection]: url = urlparse(hpss) if url.scheme != "globus": - return + return None + if gtc is None: + gtc = GlobusTransferCollection() check_state_files() - remote_endpoint = url.netloc - local_endpoint = get_local_endpoint_id(local_endpoint) - if remote_endpoint.upper() in HPSS_ENDPOINT_MAP.keys(): - remote_endpoint = HPSS_ENDPOINT_MAP.get(remote_endpoint.upper()) - both_endpoints: List[Optional[str]] = [local_endpoint, remote_endpoint] - transfer_client = get_transfer_client_with_auth(both_endpoints) + gtc.remote_endpoint = url.netloc + gtc.local_endpoint = get_local_endpoint_id(gtc.local_endpoint) + upper_remote_ep = gtc.remote_endpoint.upper() + if upper_remote_ep in HPSS_ENDPOINT_MAP.keys(): + gtc.remote_endpoint = HPSS_ENDPOINT_MAP.get(upper_remote_ep) + both_endpoints: List[Optional[str]] = [gtc.local_endpoint, gtc.remote_endpoint] + gtc.transfer_client = get_transfer_client_with_auth(both_endpoints) for ep_id in both_endpoints: - r = transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600) + r = gtc.transfer_client.endpoint_autoactivate(ep_id, if_expires_in=600) if r.get("code") == "AutoActivationFailed": logger.error( f"The {ep_id} endpoint is not activated or the current activation expires soon. Please go to https://app.globus.org/file-manager/collections/{ep_id} and (re)activate the endpoint." ) sys.exit(1) - - -def file_exists(name: str) -> bool: - - for entry in archive_directory_listing: - if entry.get("name") == name: - return True - return False - - -global_variable_tarfiles_pushed = 0 + return gtc # C901 'globus_transfer' is too complex (20) def globus_transfer( # noqa: C901 - remote_ep: str, remote_path: str, name: str, transfer_type: str, non_blocking: bool + gtc: Optional[GlobusTransferCollection], + remote_ep: str, + remote_path: str, + name: str, + transfer_type: str, + non_blocking: bool, ): - global transfer_data - global task_id - global archive_directory_listing - global global_variable_tarfiles_pushed - logger.info(f"{ts_utc()}: Entered globus_transfer() for name = {name}") logger.debug(f"{ts_utc()}: non_blocking = {non_blocking}") - if not transfer_client: - globus_activate("globus://" + remote_ep) - if not transfer_client: + if (not gtc) or (not gtc.transfer_client): + gtc = globus_activate("globus://" + remote_ep) + if (not gtc) or (not gtc.transfer_client): sys.exit(1) if transfer_type == "get": - if not archive_directory_listing: - archive_directory_listing = transfer_client.operation_ls( - remote_endpoint, remote_path + if not gtc.archive_directory_listing: + gtc.archive_directory_listing = gtc.transfer_client.operation_ls( + gtc.remote_endpoint, remote_path ) - if not file_exists(name): + if not file_exists(gtc.archive_directory_listing, name): logger.error( "Remote file globus://{}{}/{} does not exist".format( remote_ep, remote_path, name @@ -96,45 +83,43 @@ def globus_transfer( # noqa: C901 ) sys.exit(1) - transfer_data = set_up_TransferData( + mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() + transfer_data: TransferData = set_up_TransferData( transfer_type, - local_endpoint, # Global - remote_endpoint, # Global + gtc.local_endpoint, + gtc.remote_endpoint, remote_path, name, - transfer_client, # Global - transfer_data, # Global + gtc.transfer_client, + mrt.transfer_data if mrt else None, ) task: GlobusHTTPResponse try: - if task_id: - task = transfer_client.get_task(task_id) - prev_task_status = task["status"] + if mrt and mrt.task_id: + task = gtc.transfer_client.get_task(mrt.task_id) + mrt.task_status = task["status"] # one of {ACTIVE, SUCCEEDED, FAILED, CANCELED, PENDING, INACTIVE} # NOTE: How we behave here depends upon whether we want to support mutliple active transfers. # Presently, we do not, except inadvertantly (if status == PENDING) - if prev_task_status == "ACTIVE": - logger.info( - f"{ts_utc()}: Previous task_id {task_id} Still Active. Returning ACTIVE." - ) - return "ACTIVE" - elif prev_task_status == "SUCCEEDED": + if mrt.task_status == "ACTIVE": + logger.info(f"{ts_utc()}: Previous task_id {mrt.task_id} Still Active.") + # Don't return early - continue to submit the new transfer + # The previous transfer will complete asynchronously + elif mrt.task_status == "SUCCEEDED": logger.info( - f"{ts_utc()}: Previous task_id {task_id} status = SUCCEEDED." + f"{ts_utc()}: Previous task_id {mrt.task_id} status = SUCCEEDED." ) src_ep = task["source_endpoint_id"] dst_ep = task["destination_endpoint_id"] label = task["label"] ts = ts_utc() logger.info( - "{}:Globus transfer {}, from {} to {}: {} succeeded".format( - ts, task_id, src_ep, dst_ep, label - ) + f"{ts}:Globus transfer {mrt.task_id}, from {src_ep} to {dst_ep}: {label} succeeded" ) else: logger.error( - f"{ts_utc()}: Previous task_id {task_id} status = {prev_task_status}." + f"{ts_utc()}: Previous task_id {mrt.task_id} status = {mrt.task_status}." ) # DEBUG: review accumulated items in TransferData @@ -142,15 +127,15 @@ def globus_transfer( # noqa: C901 attribs = transfer_data.__dict__ for item in attribs["data"]["DATA"]: if item["DATA_TYPE"] == "transfer_item": - global_variable_tarfiles_pushed += 1 + gtc.cumulative_tarfiles_pushed += 1 print( - f" (routine) PUSHING (#{global_variable_tarfiles_pushed}) STORED source item: {item['source_path']}", + f" (routine) PUSHING (#{gtc.cumulative_tarfiles_pushed}) STORED source item: {item['source_path']}", flush=True, ) - # SUBMIT new transfer here + # Submit the current transfer_data logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") - task = submit_transfer_with_checks(transfer_client, transfer_data) + task = submit_transfer_with_checks(gtc.transfer_client, transfer_data) task_id = task.get("task_id") # NOTE: This log message is misleading. If we have accumulated multiple tar files for transfer, # the "lable" given here refers only to the LAST tarfile in the TransferData list. @@ -158,7 +143,14 @@ def globus_transfer( # noqa: C901 f"{ts_utc()}: SURFACE Submit Transfer returned new task_id = {task_id} for label {transfer_data['label']}" ) - # Nullify the submitted transfer data structure so that a new one will be created on next call. + # Create new transfer record with the task info + new_transfer = GlobusTransfer() + new_transfer.transfer_data = None # This batch was submitted + new_transfer.task_id = task_id + new_transfer.task_status = "UNKNOWN" + gtc.transfers.append(new_transfer) + + # Reset for building the next batch transfer_data = None except TransferAPIError as e: if e.code == "NoCredException": @@ -174,151 +166,96 @@ def globus_transfer( # noqa: C901 logger.error("Exception: {}".format(e)) sys.exit(1) - # test for blocking on new task_id - task_status = "UNKNOWN" - if not non_blocking: - task_status = globus_block_wait( - task_id=task_id, wait_timeout=7200, polling_interval=10, max_retries=5 - ) - else: - logger.info(f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {task_id}") - - if transfer_type == "put": - return task_status - - if transfer_type == "get" and task_id: - globus_wait(task_id) + new_mrt: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() - return task_status - - -def globus_block_wait( - task_id: str, wait_timeout: int, polling_interval: int, max_retries: int -): - - # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours - logger.info( - f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" - ) - task_status = "UNKNOWN" - retry_count = 0 - while retry_count < max_retries: - try: - # Wait for the task to complete - logger.info( - f"{ts_utc()}: on task_wait try {retry_count + 1} out of {max_retries}" - ) - transfer_client.task_wait( - task_id, timeout=wait_timeout, polling_interval=10 + # test for blocking on new task_id + if new_mrt and new_mrt.task_id: + if not non_blocking: + new_mrt.task_status = globus_block_wait( + transfer_client=gtc.transfer_client, + task_id=new_mrt.task_id, + wait_timeout=7200, + max_retries=5, ) - logger.info(f"{ts_utc()}: done with wait") - except Exception as e: - logger.error(f"Unexpected Exception: {e}") else: - curr_task = transfer_client.get_task(task_id) - task_status = curr_task["status"] - if task_status == "SUCCEEDED": - break - finally: - retry_count += 1 logger.info( - f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + f"{ts_utc()}: NO BLOCKING (task_wait) for task_id {new_mrt.task_id}" ) - if retry_count == max_retries: - logger.info( - f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" - ) - task_status = "EXHAUSTED_TIMEOUT_RETRIES" + if transfer_type == "get": + globus_wait(gtc.transfer_client, new_mrt.task_id) - logger.info( - f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" - ) - return task_status +def globus_finalize( + gtc: Optional[GlobusTransferCollection], + htc: HPSSTransferCollection, + non_blocking: bool = False, +): + last_task_id = None + if gtc is None: + logger.warning("No GlobusTransferCollection object provided for finalization") + return -def globus_wait(task_id: str): + transfer: Optional[GlobusTransfer] = gtc.get_most_recent_transfer() + if transfer: + # Check if there's any pending transfer data that hasn't been submitted yet + if transfer.transfer_data: + # DEBUG: review accumulated items in TransferData + logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") + attribs = transfer.transfer_data.__dict__ + for item in attribs["data"]["DATA"]: + if item["DATA_TYPE"] == "transfer_item": + gtc.cumulative_tarfiles_pushed += 1 + print( + f" (finalize) PUSHING ({gtc.cumulative_tarfiles_pushed}) source item: {item['source_path']}", + flush=True, + ) - try: - """ - A Globus transfer job (task) can be in one of the three states: - ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a - status of the transfer job (task) from the Globus Transfer service, - with 20 second timeout limit. If the task is ACTIVE after time runs - out 'task_wait' returns False, and True otherwise. - """ - while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): - pass - """ - The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). - Check if the transfer SUCCEEDED or FAILED. - """ - task = transfer_client.get_task(task_id) - if task["status"] == "SUCCEEDED": - src_ep = task["source_endpoint_id"] - dst_ep = task["destination_endpoint_id"] - label = task["label"] + # SUBMIT new transfer here logger.info( - "Globus transfer {}, from {} to {}: {} succeeded".format( - task_id, src_ep, dst_ep, label - ) + f"{ts_utc()}: DIVING: Submit Transfer for {transfer.transfer_data['label']}" ) - else: - logger.error("Transfer FAILED") - except TransferAPIError as e: - if e.code == "NoCredException": - logger.error( - "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( - e.message + try: + last_task = submit_transfer_with_checks( + gtc.transfer_client, transfer.transfer_data ) - ) - else: - logger.error(e) - sys.exit(1) - except Exception as e: - logger.error("Exception: {}".format(e)) - sys.exit(1) - - -def globus_finalize(non_blocking: bool = False): - global global_variable_tarfiles_pushed - - last_task_id = None - - if transfer_data: - # DEBUG: review accumulated items in TransferData - logger.info(f"{ts_utc()}: FINAL TransferData: accumulated items:") - attribs = transfer_data.__dict__ - for item in attribs["data"]["DATA"]: - if item["DATA_TYPE"] == "transfer_item": - global_variable_tarfiles_pushed += 1 - print( - f" (finalize) PUSHING ({global_variable_tarfiles_pushed}) source item: {item['source_path']}", - flush=True, - ) - - # SUBMIT new transfer here - logger.info(f"{ts_utc()}: DIVING: Submit Transfer for {transfer_data['label']}") - try: - last_task = submit_transfer_with_checks(transfer_client, transfer_data) - last_task_id = last_task.get("task_id") - except TransferAPIError as e: - if e.code == "NoCredException": - logger.error( - "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( - e.message + last_task_id = last_task.get("task_id") + except TransferAPIError as e: + if e.code == "NoCredException": + logger.error( + "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( + e.message + ) ) + else: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.error("Exception: {}".format(e)) + sys.exit(1) + + # Wait for any submitted transfers to complete + # In non-blocking mode, this ensures index.db and any accumulated tar files complete + # In blocking mode, this is redundant but harmless + skip_last_wait: bool = False + if transfer and transfer.task_id: + if transfer.task_id == last_task_id: + skip_last_wait = ( + True # No reason to call globus_wait twice on the same task_id ) - else: - logger.error(e) - sys.exit(1) - except Exception as e: - logger.error("Exception: {}".format(e)) - sys.exit(1) + logger.info( + f"{ts_utc()}: Waiting for transfer task_id={transfer.task_id} to complete" + ) + globus_wait(gtc.transfer_client, transfer.task_id) + if last_task_id and (not skip_last_wait): + logger.info( + f"{ts_utc()}: Waiting for last transfer task_id={last_task_id} to complete" + ) + globus_wait(gtc.transfer_client, last_task_id) - if not non_blocking: - if task_id: - globus_wait(task_id) - if last_task_id: - globus_wait(last_task_id) + # Clean up tar files that were queued for deletion + if htc.curr_transfers: + delete_transferred_files(htc) + if htc.prev_transfers: + delete_transferred_files(htc) diff --git a/zstash/globus_utils.py b/zstash/globus_utils.py index e5346f69..15a8740e 100644 --- a/zstash/globus_utils.py +++ b/zstash/globus_utils.py @@ -20,6 +20,7 @@ from globus_sdk.services.auth.errors import AuthAPIError from .settings import logger +from .utils import ts_utc # Global constants ############################################################ HPSS_ENDPOINT_MAP: Dict[str, str] = { @@ -46,7 +47,7 @@ TOKEN_FILE = os.path.expanduser("~/.zstash_globus_tokens.json") # Independent functions ####################################################### -# The functions here don't rely on the global variables defined in globus.py. +# The functions here don't rely on the classes defined in globus.py. # Primarily used by globus_activate ########################################### @@ -217,6 +218,13 @@ def save_tokens(token_response): # Primarily used by globus_transfer ########################################### +def file_exists(archive_directory_listing, name: str) -> bool: + for entry in archive_directory_listing: + if entry.get("name") == name: + return True + return False + + def set_up_TransferData( transfer_type: str, local_endpoint: Optional[str], @@ -290,3 +298,95 @@ def submit_transfer_with_checks(transfer_client, transfer_data) -> GlobusHTTPRes else: raise err return task + + +def globus_block_wait( + transfer_client: TransferClient, + task_id: str, + wait_timeout: int, + max_retries: int, +): + + # poll every "polling_interval" seconds to speed up small transfers. Report every 2 hours, stop waiting aftert 5*2 = 10 hours + logger.info( + f"{ts_utc()}: BLOCKING START: invoking task_wait for task_id = {task_id}" + ) + task_status = "UNKNOWN" + retry_count = 0 + while retry_count < max_retries: + try: + # Wait for the task to complete + logger.info( + f"{ts_utc()}: on task_wait try {retry_count + 1} out of {max_retries}" + ) + transfer_client.task_wait( + task_id, timeout=wait_timeout, polling_interval=10 + ) + logger.info(f"{ts_utc()}: done with wait") + except Exception as e: + logger.error(f"Unexpected Exception: {e}") + else: + curr_task = transfer_client.get_task(task_id) + task_status = curr_task["status"] + if task_status == "SUCCEEDED": + break + finally: + retry_count += 1 + logger.info( + f"{ts_utc()}: BLOCKING retry_count = {retry_count} of {max_retries} of timeout {wait_timeout} seconds" + ) + + if retry_count == max_retries: + logger.info( + f"{ts_utc()}: BLOCKING EXHAUSTED {max_retries} of timeout {wait_timeout} seconds" + ) + task_status = "EXHAUSTED_TIMEOUT_RETRIES" + + logger.info( + f"{ts_utc()}: BLOCKING ENDS: task_id {task_id} returned from task_wait with status {task_status}" + ) + + return task_status + + +# Primarily used by globus_transfer, globus_finalize ########################## +def globus_wait(transfer_client: TransferClient, task_id: str): + try: + """ + A Globus transfer job (task) can be in one of the three states: + ACTIVE, SUCCEEDED, FAILED. The script every 20 seconds polls a + status of the transfer job (task) from the Globus Transfer service, + with 20 second timeout limit. If the task is ACTIVE after time runs + out 'task_wait' returns False, and True otherwise. + """ + while not transfer_client.task_wait(task_id, timeout=300, polling_interval=20): + pass + """ + The Globus transfer job (task) has been finished (SUCCEEDED or FAILED). + Check if the transfer SUCCEEDED or FAILED. + """ + task = transfer_client.get_task(task_id) + if task["status"] == "SUCCEEDED": + src_ep = task["source_endpoint_id"] + dst_ep = task["destination_endpoint_id"] + label = task["label"] + logger.info( + "Globus transfer {}, from {} to {}: {} succeeded".format( + task_id, src_ep, dst_ep, label + ) + ) + else: + logger.error("Transfer FAILED") + except TransferAPIError as e: + if e.code == "NoCredException": + logger.error( + "{}. Please go to https://app.globus.org/endpoints and activate the endpoint.".format( + e.message + ) + ) + else: + logger.error(e) + sys.exit(1) + except Exception as e: + logger.error("Exception: {}".format(e)) + sys.exit(1) diff --git a/zstash/hpss.py b/zstash/hpss.py index 24603388..91dc1459 100644 --- a/zstash/hpss.py +++ b/zstash/hpss.py @@ -2,19 +2,23 @@ import os.path import subprocess -from typing import List +from typing import List, Optional from six.moves.urllib.parse import urlparse from .globus import globus_transfer from .settings import get_db_filename, logger +from .transfer_tracking import ( + GlobusTransferCollection, + HPSSTransferCollection, + delete_current_files, + delete_transferred_files, +) from .utils import run_command, ts_utc -prev_transfers: List[str] = list() -curr_transfers: List[str] = list() - -def hpss_transfer( +# C901 'hpss_transfer' is too complex (19) +def hpss_transfer( # noqa: C901 hpss: str, file_path: str, transfer_type: str, @@ -22,15 +26,16 @@ def hpss_transfer( keep: bool = False, non_blocking: bool = False, is_index: bool = False, + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ): - global prev_transfers - global curr_transfers - + if not htc: + htc = HPSSTransferCollection() logger.info( - f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {prev_transfers}" + f"{ts_utc()}: in hpss_transfer, prev_transfers is starting as {htc.prev_transfers}" ) # logger.debug( - # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {curr_transfers}" + # f"{ts_utc()}: in hpss_transfer, curr_transfers is starting as {htc.curr_transfers}" # ) if hpss == "none": @@ -85,10 +90,12 @@ def hpss_transfer( endpoint = url.netloc url_path = url.path - curr_transfers.append(file_path) - # logger.debug( - # f"{ts_utc()}: curr_transfers has been appended to, is now {curr_transfers}" - # ) + # Only track files for deletion if they should be deleted + # Don't track if keep=True or if it's an index file + if (not keep) and (not is_index): + htc.curr_transfers.append(file_path) + # logger.debug(f"{ts_utc()}: curr_transfers has been appended to, is now {htc.curr_transfers}") + path, name = os.path.split(file_path) # Need to be in local directory for `hsi` to work @@ -104,16 +111,21 @@ def hpss_transfer( # For `get`, this directory is where the file we get from HPSS will go. os.chdir(path) + globus_status: Optional[str] = "UNKNOWN" if scheme == "globus": - globus_status = "UNKNOWN" + if not gtc: + raise RuntimeError( + "Scheme is 'globus' but no GlobusTransferCollection provided" + ) # Transfer file using the Globus Transfer Service logger.info(f"{ts_utc()}: DIVING: hpss calls globus_transfer(name={name})") - globus_status = globus_transfer( - endpoint, url_path, name, transfer_type, non_blocking - ) - logger.info( - f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}) returns {globus_status}" - ) + globus_transfer(gtc, endpoint, url_path, name, transfer_type, non_blocking) + mrt = gtc.get_most_recent_transfer() + if mrt: + globus_status = mrt.task_status + logger.info( + f"{ts_utc()}: SURFACE hpss globus_transfer(name={name}), task_id={mrt.task_id}, globus_status={globus_status}" + ) # NOTE: Here, the status could be "EXHAUSTED_TIMEOUT_RETRIES", meaning a very long transfer # or perhaps transfer is hanging. We should decide whether to ignore it, or cancel it, but # we'd need the task_id to issue a cancellation. Perhaps we should have globus_transfer @@ -129,20 +141,15 @@ def hpss_transfer( os.chdir(cwd) if transfer_type == "put": - if not keep: - if (scheme != "globus") or (globus_status == "SUCCEEDED"): + if (not keep) and (not is_index): + if scheme != "globus": + # For direct HPSS, delete immediately since transfer is synchronous + # Delete the file that was just transferred + delete_current_files(htc) + elif globus_status == "SUCCEEDED": # Note: This is intended to fulfill the default removal of successfully-transfered # tar files when keep=False, irrespective of non-blocking status - logger.debug( - f"{ts_utc()}: deleting transfered files {prev_transfers}" - ) - for src_path in prev_transfers: - os.remove(src_path) - prev_transfers = curr_transfers - curr_transfers = list() - logger.info( - f"{ts_utc()}: prev_transfers has been set to {prev_transfers}" - ) + delete_transferred_files(htc) def hpss_put( @@ -152,18 +159,26 @@ def hpss_put( keep: bool = True, non_blocking: bool = False, is_index=False, + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ): """ Put a file to the HPSS archive. """ - hpss_transfer(hpss, file_path, "put", cache, keep, non_blocking, is_index) + hpss_transfer( + hpss, file_path, "put", cache, keep, non_blocking, is_index, gtc=gtc, htc=htc + ) def hpss_get(hpss: str, file_path: str, cache: str): """ Get a file from the HPSS archive. """ - hpss_transfer(hpss, file_path, "get", cache, False) + url = urlparse(hpss) + gtc = None + if url.scheme == "globus": + gtc = GlobusTransferCollection() + hpss_transfer(hpss, file_path, "get", cache, False, gtc=gtc) def hpss_chgrp(hpss: str, group: str, recurse: bool = False): diff --git a/zstash/hpss_utils.py b/zstash/hpss_utils.py index 2f1158bc..9d82b38d 100644 --- a/zstash/hpss_utils.py +++ b/zstash/hpss_utils.py @@ -13,6 +13,7 @@ from .hpss import hpss_put from .settings import TupleFilesRowNoId, TupleTarsRowNoId, config, logger +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import create_tars_table, tars_table_exists, ts_utc @@ -66,6 +67,8 @@ def add_files( error_on_duplicate_tar: bool = False, overwrite_duplicate_tars: bool = False, force_database_corruption: str = "", + gtc: Optional[GlobusTransferCollection] = None, + htc: Optional[HPSSTransferCollection] = None, ) -> List[str]: # Now, perform the actual archiving @@ -160,7 +163,15 @@ def add_files( logger.info( f"{ts_utc()}: DIVING: (add_files): Calling hpss_put to dispatch archive file {tfname} [keep, non_blocking] = [{keep}, {non_blocking}]" ) - hpss_put(hpss, os.path.join(cache, tfname), cache, keep, non_blocking) + hpss_put( + hpss, + os.path.join(cache, tfname), + cache, + keep, + non_blocking, + gtc=gtc, + htc=htc, + ) logger.info( f"{ts_utc()}: SURFACE (add_files): Called hpss_put to dispatch archive file {tfname}" ) diff --git a/zstash/transfer_tracking.py b/zstash/transfer_tracking.py new file mode 100644 index 00000000..161b9f58 --- /dev/null +++ b/zstash/transfer_tracking.py @@ -0,0 +1,68 @@ +import os +from typing import List, Optional + +from globus_sdk import TransferClient, TransferData +from globus_sdk.services.transfer.response.iterable import IterableTransferResponse + +from .settings import logger +from .utils import ts_utc + + +# Globus specific ############################################################# +class GlobusTransfer(object): + def __init__(self): + self.transfer_data: Optional[TransferData] = None + self.task_id: Optional[str] = None + # https://docs.globus.org/api/transfer/task/#task_fields + # ACTIVE, SUCCEEDED, FAILED, INACTIVE + self.task_status: Optional[str] = None + logger.debug(f"{ts_utc()}: GlobusTransfer initialized") + + +class GlobusTransferCollection(object): + def __init__(self): + # Attributes common to all the transfers + self.remote_endpoint: Optional[str] = None + self.local_endpoint: Optional[str] = None + self.transfer_client: Optional[TransferClient] = None + self.archive_directory_listing: Optional[IterableTransferResponse] = None + + self.transfers: List[GlobusTransfer] = ( + [] + ) # TODO: Replace with collections.deque? + self.cumulative_tarfiles_pushed: int = 0 + logger.debug(f"{ts_utc()}: GlobusTransferCollection initialized") + + def get_most_recent_transfer(self) -> Optional[GlobusTransfer]: + return self.transfers[-1] if self.transfers else None + + +# All Transfers ############################################################### +class HPSSTransferCollection(object): + def __init__(self): + self.prev_transfers: List[str] = [] # Can remove + self.curr_transfers: List[str] = [] # Still using! + logger.debug(f"{ts_utc()}: HPSSTransferCollection initialized") + + +def delete_transferred_files(htc: HPSSTransferCollection): + logger.debug(f"{ts_utc()}: deleting transfered files {htc.prev_transfers}") + for src_path in htc.prev_transfers: + if os.path.exists(src_path): + os.remove(src_path) + else: + logger.debug(f"{ts_utc()}: {src_path} already deleted or doesn't exist") + htc.prev_transfers = htc.curr_transfers + htc.curr_transfers = [] + logger.debug(f"{ts_utc()}: prev_transfers has been set to {htc.prev_transfers}") + + +def delete_current_files(htc: HPSSTransferCollection): + logger.debug(f"{ts_utc()}: deleting current files {htc.curr_transfers}") + for src_path in htc.curr_transfers: + if os.path.exists(src_path): + os.remove(src_path) + else: + logger.debug(f"{ts_utc()}: {src_path} already deleted or doesn't exist") + htc.curr_transfers = [] # Clear the list after deletion + logger.debug(f"{ts_utc()}: curr_transfers cleared after deletion") diff --git a/zstash/update.py b/zstash/update.py index b0f2af40..d9634676 100644 --- a/zstash/update.py +++ b/zstash/update.py @@ -21,6 +21,7 @@ get_db_filename, logger, ) +from .transfer_tracking import GlobusTransferCollection, HPSSTransferCollection from .utils import get_files_to_archive, update_config @@ -29,8 +30,11 @@ def update(): args: argparse.Namespace cache: str args, cache = setup_update() + htc = HPSSTransferCollection() - result: Optional[List[str]] = update_database(args, cache) + result: Optional[List[str]] + gtc: Optional[GlobusTransferCollection] + result, gtc = update_database(args, cache, htc) if result is None: # There was either nothing to update or `--dry-run` was set. @@ -43,9 +47,17 @@ def update(): hpss = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True) + hpss_put( + hpss, + get_db_filename(cache), + cache, + keep=args.keep, + is_index=True, + gtc=gtc, + # htc=htc, # Don't track index.db for deletion + ) - globus_finalize(non_blocking=args.non_blocking) + globus_finalize(gtc, htc, non_blocking=args.non_blocking) # List failures if len(failures) > 0: @@ -147,10 +159,11 @@ def setup_update() -> Tuple[argparse.Namespace, str]: # C901 'update_database' is too complex (20) def update_database( # noqa: C901 - args: argparse.Namespace, cache: str -) -> Optional[List[str]]: + args: argparse.Namespace, cache: str, htc: HPSSTransferCollection +) -> Tuple[Optional[List[str]], Optional[GlobusTransferCollection]]: # Open database logger.debug("Opening index database") + gtc: Optional[GlobusTransferCollection] = None if not os.path.exists(get_db_filename(cache)): # The database file doesn't exist in the cache. # We need to retrieve it from HPSS @@ -160,7 +173,7 @@ def update_database( # noqa: C901 hpss: str = config.hpss else: raise TypeError("Invalid config.hpss={}".format(config.hpss)) - globus_activate(hpss) + gtc = globus_activate(hpss) hpss_get(hpss, get_db_filename(cache), cache) else: error_str: str = ( @@ -242,7 +255,7 @@ def update_database( # noqa: C901 # Close database con.commit() con.close() - return None + return None, gtc # --dry-run option if args.dry_run: @@ -252,7 +265,7 @@ def update_database( # noqa: C901 # Close database con.commit() con.close() - return None + return None, gtc # Find last used tar archive itar: int = -1 @@ -263,24 +276,7 @@ def update_database( # noqa: C901 itar = max(itar, int(tfile_string[0:6], 16)) failures: List[str] - if args.follow_symlinks: - try: - # Add files - failures = add_files( - cur, - con, - itar, - newfiles, - cache, - keep, - args.follow_symlinks, - non_blocking=args.non_blocking, - error_on_duplicate_tar=args.error_on_duplicate_tar, - overwrite_duplicate_tars=args.overwrite_duplicate_tars, - ) - except FileNotFoundError: - raise Exception("Archive update failed due to broken symlink.") - else: + try: # Add files failures = add_files( cur, @@ -293,10 +289,17 @@ def update_database( # noqa: C901 non_blocking=args.non_blocking, error_on_duplicate_tar=args.error_on_duplicate_tar, overwrite_duplicate_tars=args.overwrite_duplicate_tars, + gtc=gtc, + htc=htc, ) + except FileNotFoundError as e: + if args.follow_symlinks: + raise Exception("Archive update failed due to broken symlink.") + else: + raise e # Close database con.commit() con.close() - return failures + return failures, gtc