@@ -3544,6 +3544,31 @@ def test_compat_makedirs(self):
35443544 py2vs3 .makedirs (name , exist_ok = True ) # No error
35453545 self .assertExists (name )
35463546
3547+ def test_get_first_nonexisting_parent (self ):
3548+ """Test get_first_nonexisting_parent function."""
3549+ test_root = os .path .join (self .test_prefix , 'a' )
3550+
3551+ base_path = os .path .join (self .test_prefix , 'a' )
3552+ target_path = os .path .join (self .test_prefix , 'a' , 'b' , 'c' )
3553+ os .makedirs (base_path )
3554+ first_nonexisting_parent = ft .get_first_nonexisting_parent (target_path )
3555+ self .assertEqual (first_nonexisting_parent , os .path .join (self .test_prefix , 'a' , 'b' ))
3556+ shutil .rmtree (test_root )
3557+
3558+ base_path = os .path .join (self .test_prefix , 'a' , 'b' )
3559+ target_path = os .path .join (self .test_prefix , 'a' , 'b' , 'c' )
3560+ os .makedirs (base_path )
3561+ first_nonexisting_parent = ft .get_first_nonexisting_parent (target_path )
3562+ self .assertEqual (first_nonexisting_parent , os .path .join (self .test_prefix , 'a' , 'b' , 'c' ))
3563+ shutil .rmtree (test_root )
3564+
3565+ base_path = os .path .join (self .test_prefix , 'a' , 'b' , 'c' )
3566+ target_path = os .path .join (self .test_prefix , 'a' , 'b' , 'c' )
3567+ os .makedirs (base_path )
3568+ first_nonexisting_parent = ft .get_first_nonexisting_parent (target_path )
3569+ self .assertEqual (first_nonexisting_parent , None )
3570+ shutil .rmtree (test_root )
3571+
35473572 def test_create_unused_dir (self ):
35483573 """Test create_unused_dir function."""
35493574 path = ft .create_unused_dir (self .test_prefix , 'folder' )
@@ -3582,6 +3607,126 @@ def test_create_unused_dir(self):
35823607 self .assertEqual (path , os .path .join (self .test_prefix , 'file_0' ))
35833608 self .assertExists (path )
35843609
3610+ def test_create_unused_dirs (self ):
3611+ """Test create_unused_dirs function."""
3612+ requested_paths = []
3613+ requested_paths .append (os .path .join (self .test_prefix , 'folder_a' ))
3614+ requested_paths .append (os .path .join (self .test_prefix , 'folder_b' ))
3615+ paths = ft .create_unused_dirs (requested_paths )
3616+ self .assertEqual (paths , requested_paths )
3617+ map (lambda p : self .assertExists (p ), paths )
3618+
3619+ # Repeat with existing folder(s) should create new ones
3620+ for i in range (10 ):
3621+ requested_paths = []
3622+ requested_paths .append (os .path .join (self .test_prefix , 'folder_a' ))
3623+ requested_paths .append (os .path .join (self .test_prefix , 'folder_b' ))
3624+ paths = ft .create_unused_dirs (requested_paths )
3625+ self .assertEqual (paths , list (map (lambda p : p + '_%s' % i , requested_paths )))
3626+ map (lambda p : self .assertExists (p ), paths )
3627+
3628+ # Skip suffix if a directory with the suffix already exists
3629+ existing_idx = 1
3630+ os .mkdir (os .path .join (self .test_prefix , "existing_idx_a" ))
3631+ os .mkdir (os .path .join (self .test_prefix , "existing_idx_b" ))
3632+ os .mkdir (os .path .join (self .test_prefix , f"existing_idx_b_{ existing_idx } " ))
3633+
3634+ def expected_idx (n ):
3635+ if n >= existing_idx :
3636+ return n + 1
3637+ return n
3638+
3639+ for i in range (3 ):
3640+ requested_paths = []
3641+ requested_paths .append (os .path .join (self .test_prefix , 'existing_idx_a' ))
3642+ requested_paths .append (os .path .join (self .test_prefix , 'existing_idx_b' ))
3643+ paths = ft .create_unused_dirs (requested_paths )
3644+ self .assertEqual (paths , list (map (lambda p : p + '_%s' % expected_idx (i ), requested_paths )))
3645+ map (lambda p : self .assertExists (p ), paths )
3646+ self .assertNotExists (os .path .join (self .test_prefix , f"existing_idx_a_{ existing_idx } " ))
3647+ self .assertExists (os .path .join (self .test_prefix , f"existing_idx_b_{ existing_idx } " ))
3648+
3649+ # Support creation of parent directories
3650+ requested_paths = [os .path .join (self .test_prefix , 'parent_folder' , 'folder' )]
3651+ paths = ft .create_unused_dirs (requested_paths )
3652+ self .assertEqual (paths , requested_paths )
3653+ map (lambda p : self .assertExists (p ), paths )
3654+
3655+ # Not influenced by similar folder
3656+ requested_paths = [os .path .join (self .test_prefix , 'folder_a2' )]
3657+ paths = ft .create_unused_dirs (requested_paths )
3658+ self .assertEqual (paths , requested_paths )
3659+ map (lambda p : self .assertExists (p ), paths )
3660+ for i in range (10 ):
3661+ paths = ft .create_unused_dirs (requested_paths )
3662+ self .assertEqual (paths , list (map (lambda p : p + '_%s' % i , requested_paths )))
3663+ map (lambda p : self .assertExists (p ), paths )
3664+
3665+ # Fail cleanly if passed a readonly folder
3666+ readonly_dir = os .path .join (self .test_prefix , 'ro_folder' )
3667+ ft .mkdir (readonly_dir )
3668+ old_perms = os .lstat (readonly_dir )[stat .ST_MODE ]
3669+ ft .adjust_permissions (readonly_dir , stat .S_IREAD | stat .S_IEXEC , relative = False )
3670+ requested_path = os .path .join (readonly_dir , 'new_folder' )
3671+ try :
3672+ self .assertErrorRegex (EasyBuildError , 'Failed to create directory' ,
3673+ ft .create_unused_dirs , [requested_path ])
3674+ finally :
3675+ ft .adjust_permissions (readonly_dir , old_perms , relative = False )
3676+
3677+ # Fail if the number of attempts to create the directory is exceeded
3678+ os .mkdir (os .path .join (self .test_prefix , 'attempt' ))
3679+ os .mkdir (os .path .join (self .test_prefix , 'attempt_0' ))
3680+ os .mkdir (os .path .join (self .test_prefix , 'attempt_1' ))
3681+ os .mkdir (os .path .join (self .test_prefix , 'attempt_2' ))
3682+ os .mkdir (os .path .join (self .test_prefix , 'attempt_3' ))
3683+ requested_path = os .path .join (self .test_prefix , 'attempt' )
3684+ self .assertErrorRegex (
3685+ EasyBuildError ,
3686+ 'Exceeded maximum number of attempts to generate unique directory' ,
3687+ ft .create_unused_dirs ,
3688+ [requested_path ], index_upper_bound = 4
3689+ )
3690+
3691+ # Ignore files same as folders. So first just create a file with no contents
3692+ requested_path = os .path .join (self .test_prefix , 'file' )
3693+ ft .write_file (requested_path , '' )
3694+ paths = ft .create_unused_dirs ([requested_path ])
3695+ self .assertEqual (paths , [requested_path + '_0' ])
3696+ map (lambda p : self .assertExists (p ), paths )
3697+
3698+ # Deny creation of nested directories
3699+ requested_paths = [
3700+ os .path .join (self .test_prefix , 'nested_a/foo/bar' ),
3701+ os .path .join (self .test_prefix , 'nested_a/foo/bar/baz' ),
3702+ ]
3703+ self .assertErrorRegex (
3704+ EasyBuildError ,
3705+ "Path '.*/foo/bar' is predecesor of '.*/foo/bar/baz'" ,
3706+ ft .create_unused_dirs ,
3707+ requested_paths
3708+ )
3709+ requested_paths = [
3710+ os .path .join (self .test_prefix , 'nested_b/foo/bar/baz' ),
3711+ os .path .join (self .test_prefix , 'nested_b/foo/bar' ),
3712+ ]
3713+ self .assertErrorRegex (
3714+ EasyBuildError ,
3715+ "Path '.*/foo/bar' is predecesor of '.*/foo/bar/baz'" ,
3716+ ft .create_unused_dirs ,
3717+ requested_paths
3718+ )
3719+
3720+ # Allow creation of non-nested directories
3721+ requested_paths = [
3722+ os .path .join (self .test_prefix , 'nested_c/foo/bar' ),
3723+ os .path .join (self .test_prefix , 'nested_c/foo/baz' ),
3724+ os .path .join (self .test_prefix , 'nested_c/buz' ),
3725+ ]
3726+ paths = ft .create_unused_dirs (requested_paths )
3727+ self .assertEqual (paths , requested_paths )
3728+ map (lambda p : self .assertExists (p ), paths )
3729+
35853730
35863731def suite ():
35873732 """ returns all the testcases in this module """
0 commit comments