66import os .path
77import sqlite3
88import sys
9- from typing import Any , List , Tuple
9+ from typing import Any , List , Optional , Tuple
1010
1111from six .moves .urllib .parse import urlparse
1212
1313from .globus import globus_activate , globus_finalize
1414from .hpss import hpss_put
1515from .hpss_utils import add_files
1616from .settings import DEFAULT_CACHE , config , get_db_filename , logger
17+ from .transfer_tracking import GlobusTransferCollection , HPSSTransferCollection
1718from .utils import (
1819 create_tars_table ,
1920 get_files_to_archive ,
@@ -52,12 +53,13 @@ def create():
5253 logger .error (input_path_error_str )
5354 raise NotADirectoryError (input_path_error_str )
5455
56+ gtc : Optional [GlobusTransferCollection ] = None
5557 if hpss != "none" :
5658 url = urlparse (hpss )
5759 if url .scheme == "globus" :
5860 # identify globus endpoints
59- logger .debug (f"{ ts_utc ()} :Calling globus_activate(hpss )" )
60- globus_activate (hpss )
61+ logger .debug (f"{ ts_utc ()} :Calling globus_activate()" )
62+ gtc = globus_activate (hpss )
6163 else :
6264 # config.hpss is not "none", so we need to
6365 # create target HPSS directory
@@ -88,14 +90,23 @@ def create():
8890
8991 # Create and set up the database
9092 logger .debug (f"{ ts_utc ()} : Calling create_database()" )
91- failures : List [str ] = create_database (cache , args )
93+ htc : HPSSTransferCollection = HPSSTransferCollection ()
94+ failures : List [str ] = create_database (cache , args , gtc = gtc , htc = htc )
9295
9396 # Transfer to HPSS. Always keep a local copy.
9497 logger .debug (f"{ ts_utc ()} : calling hpss_put() for { get_db_filename (cache )} " )
95- hpss_put (hpss , get_db_filename (cache ), cache , keep = args .keep , is_index = True )
98+ hpss_put (
99+ hpss ,
100+ get_db_filename (cache ),
101+ cache ,
102+ keep = args .keep ,
103+ is_index = True ,
104+ gtc = gtc ,
105+ # htc=htc,
106+ )
96107
97108 logger .debug (f"{ ts_utc ()} : calling globus_finalize()" )
98- globus_finalize (non_blocking = args .non_blocking )
109+ globus_finalize (gtc , htc , non_blocking = args .non_blocking )
99110
100111 if len (failures ) > 0 :
101112 # List the failures
@@ -204,7 +215,12 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
204215 return cache , args
205216
206217
207- def create_database (cache : str , args : argparse .Namespace ) -> List [str ]:
218+ def create_database (
219+ cache : str ,
220+ args : argparse .Namespace ,
221+ gtc : Optional [GlobusTransferCollection ],
222+ htc : Optional [HPSSTransferCollection ],
223+ ) -> List [str ]:
208224 # Create new database
209225 logger .debug (f"{ ts_utc ()} :Creating index database" )
210226 if os .path .exists (get_db_filename (cache )):
@@ -263,26 +279,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
263279 files : List [str ] = get_files_to_archive (cache , args .include , args .exclude )
264280
265281 failures : List [str ]
266- if args .follow_symlinks :
267- try :
268- # Add files to archive
269- failures = add_files (
270- cur ,
271- con ,
272- - 1 ,
273- files ,
274- cache ,
275- args .keep ,
276- args .follow_symlinks ,
277- skip_tars_md5 = args .no_tars_md5 ,
278- non_blocking = args .non_blocking ,
279- error_on_duplicate_tar = args .error_on_duplicate_tar ,
280- overwrite_duplicate_tars = args .overwrite_duplicate_tars ,
281- force_database_corruption = args .for_developers_force_database_corruption ,
282- )
283- except FileNotFoundError :
284- raise Exception ("Archive creation failed due to broken symlink." )
285- else :
282+ try :
286283 # Add files to archive
287284 failures = add_files (
288285 cur ,
@@ -297,7 +294,14 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
297294 error_on_duplicate_tar = args .error_on_duplicate_tar ,
298295 overwrite_duplicate_tars = args .overwrite_duplicate_tars ,
299296 force_database_corruption = args .for_developers_force_database_corruption ,
297+ gtc = gtc ,
298+ htc = htc ,
300299 )
300+ except FileNotFoundError as e :
301+ if args .follow_symlinks :
302+ raise Exception ("Archive creation failed due to broken symlink." )
303+ else :
304+ raise e
301305
302306 # Close database
303307 con .commit ()
0 commit comments