10
10
from pathlib import Path
11
11
from typing import Optional , List , Union , Any , Tuple
12
12
13
- from git import Repo , Actor
13
+ from git import Repo , Actor , GitCommandError
14
14
from packaging import version
15
15
16
16
from .configuration import configuration , ConfigurationVariable
@@ -48,7 +48,8 @@ def _git_url_ssh_to_https(self, url: str) -> str:
48
48
path = url .split ("github.com" , 1 )[1 ][1 :].strip ()
49
49
new = "https://{GITHUB_TOKEN}:[email protected] /%s" % path
50
50
logger .info ("rewriting git url to: %s" % new )
51
- return new .format (GITHUB_TOKEN = configuration .get_value (ConfigurationVariable .GIT_TOKEN ))
51
+ return new .format (GITHUB_TOKEN = configuration .get_value (
52
+ ConfigurationVariable .GIT_TOKEN ))
52
53
53
54
def clone (self , path : Path ) -> "GitWrapper" :
54
55
"""Clones this repository to the path.
@@ -59,9 +60,20 @@ def clone(self, path: Path) -> "GitWrapper":
59
60
Returns:
60
61
a wrapper over the cloned repository
61
62
"""
62
- git_clone = self .repo .clone_from (
63
- url = self .get_remote_url (), to_path = str (path ), multi_options = ["--recurse-submodules" ]
64
- )
63
+ try :
64
+ git_clone = self .repo .clone_from (
65
+ url = self .get_remote_url (), to_path = str (path ),
66
+ multi_options = ["--recurse-submodules" ]
67
+ )
68
+ except GitCommandError as e :
69
+ logger .info ("failed cloning repository: %s" % e )
70
+ logger .info ("trying with authentication" )
71
+ git_clone = self .repo .clone_from (
72
+ url = self ._git_url_ssh_to_https (self .get_remote_url ()),
73
+ to_path = str (path ),
74
+ multi_options = ["--recurse-submodules" ]
75
+ )
76
+
65
77
clone = GitWrapper (path = path , repo = git_clone )
66
78
clone .set_remote_url (self .get_remote_url ())
67
79
clone .fetch ()
@@ -78,8 +90,10 @@ def root(self) -> Path:
78
90
79
91
def configure_author (self ) -> None :
80
92
"""Sets the author."""
81
- self .repo .config_writer ().set_value ("user" , "name" , self .author .name ).release ()
82
- self .repo .config_writer ().set_value ("user" , "email" , self .author .email ).release ()
93
+ self .repo .config_writer ().set_value ("user" , "name" ,
94
+ self .author .name ).release ()
95
+ self .repo .config_writer ().set_value ("user" , "email" ,
96
+ self .author .email ).release ()
83
97
84
98
def checkout_branch (self , branch_name : str ) -> Any :
85
99
"""Checks out a branch from its name.
@@ -114,7 +128,8 @@ def _add_one_path(self, path_model: Path) -> None:
114
128
if not path_model .is_absolute ():
115
129
path_model = Path (self .root ).joinpath (path_model )
116
130
if not path_model .exists ():
117
- logger .warning (f"[Git] { path_model } cannot be added because not found." )
131
+ logger .warning (
132
+ f"[Git] { path_model } cannot be added because not found." )
118
133
return
119
134
relative_path = str (path_model .relative_to (self .root ))
120
135
unix_relative_path = relative_path .replace ("\\ " , "/" )
@@ -135,7 +150,8 @@ def add(self, path: Union[list, set, str]) -> None:
135
150
else :
136
151
self ._add_one_file_or_one_dir (path )
137
152
138
- def commit (self , message : str , ** kwargs : Optional [Tuple [str , Any ]]) -> None :
153
+ def commit (self , message : str ,
154
+ ** kwargs : Optional [Tuple [str , Any ]]) -> None :
139
155
"""Commits changes to the repository.
140
156
141
157
Args:
@@ -151,15 +167,17 @@ def get_master_branch(self) -> Any:
151
167
Returns:
152
168
corresponding branch
153
169
"""
154
- return self .get_branch (configuration .get_value (ConfigurationVariable .MASTER_BRANCH ))
170
+ return self .get_branch (
171
+ configuration .get_value (ConfigurationVariable .MASTER_BRANCH ))
155
172
156
173
def get_beta_branch (self ) -> Any :
157
174
"""Gets the `beta` branch.
158
175
159
176
Returns:
160
177
corresponding branch
161
178
"""
162
- return self .get_branch (configuration .get_value (ConfigurationVariable .BETA_BRANCH ))
179
+ return self .get_branch (
180
+ configuration .get_value (ConfigurationVariable .BETA_BRANCH ))
163
181
164
182
def is_release_branch (self , branch_name : Optional [str ]) -> bool :
165
183
"""Checks whether the branch is a `release` branch or not.
@@ -170,7 +188,8 @@ def is_release_branch(self, branch_name: Optional[str]) -> bool:
170
188
Returns:
171
189
True if the branch is used for `release` code; False otherwise
172
190
"""
173
- branch_pattern = configuration .get_value (ConfigurationVariable .RELEASE_BRANCH_PATTERN )
191
+ branch_pattern = configuration .get_value (
192
+ ConfigurationVariable .RELEASE_BRANCH_PATTERN )
174
193
if not branch_pattern or not branch_name :
175
194
return False
176
195
is_release : Optional [Any ] = re .search (branch_pattern , str (branch_name ))
@@ -213,7 +232,8 @@ def get_current_branch(self) -> Any:
213
232
try :
214
233
return self .repo .active_branch
215
234
except TypeError as e :
216
- logger .warning (f"Could not determine the branch name using GitPython: { e } " )
235
+ logger .warning (
236
+ f"Could not determine the branch name using GitPython: { e } " )
217
237
current_branch = self ._get_branch_from_advanced_feature ()
218
238
if not current_branch :
219
239
current_branch = self ._get_branch_from_abbreviation ("HEAD" )
@@ -222,13 +242,17 @@ def get_current_branch(self) -> Any:
222
242
def _get_branch_from_advanced_feature (self ) -> Any :
223
243
if version .parse (self .git_version ()) >= version .parse ("2.22" ):
224
244
current_branch = self .repo .git .branch (show_current = True )
225
- current_branch = current_branch if isinstance (current_branch , str ) else current_branch .decode ("utf-8" )
245
+ current_branch = current_branch if isinstance (current_branch ,
246
+ str ) else current_branch .decode (
247
+ "utf-8" )
226
248
return self .get_branch (current_branch )
227
249
return None
228
250
229
251
def _get_branch_from_abbreviation (self , abbreviation : str ) -> Any :
230
252
current_branch = self .repo .git .rev_parse ("--abbrev-ref" , abbreviation )
231
- current_branch = current_branch if isinstance (current_branch , str ) else current_branch .decode ("utf-8" )
253
+ current_branch = current_branch if isinstance (current_branch ,
254
+ str ) else current_branch .decode (
255
+ "utf-8" )
232
256
return self .get_branch (current_branch .strip ())
233
257
234
258
def get_commit_count (self ) -> int :
@@ -282,7 +306,8 @@ def merge(self, branch: Any) -> None:
282
306
current_branch = self .get_current_branch ()
283
307
merge_base = self .repo .merge_base (branch , current_branch )
284
308
self .repo .index .merge_tree (current_branch , base = merge_base )
285
- self .commit (f"Merge from { str (branch )} " , parent_commits = (branch .commit , current_branch .commit ))
309
+ self .commit (f"Merge from { str (branch )} " ,
310
+ parent_commits = (branch .commit , current_branch .commit ))
286
311
287
312
def get_remote_url (self ) -> str :
288
313
"""Gets the URL of the remote repository.
@@ -315,7 +340,9 @@ def set_remote_url(self, url: str) -> None:
315
340
remote = self ._get_remote ()
316
341
if remote :
317
342
self .repo .delete_remote (str (remote ))
318
- self .repo .create_remote (configuration .get_value (ConfigurationVariable .REMOTE_ALIAS ), url = url )
343
+ self .repo .create_remote (
344
+ configuration .get_value (ConfigurationVariable .REMOTE_ALIAS ),
345
+ url = url )
319
346
320
347
def get_remote_branch (self , branch_name : str ) -> Optional [Any ]:
321
348
"""Gets the branch present in the remote repository.
@@ -342,7 +369,8 @@ def set_upstream_branch(self, branch_name: str) -> None:
342
369
branch_name: name of the remote branch.
343
370
"""
344
371
if self .remote_branch_exists (branch_name ):
345
- self .repo .git .branch ("--set-upstream-to" , self .get_remote_branch (branch_name ))
372
+ self .repo .git .branch ("--set-upstream-to" ,
373
+ self .get_remote_branch (branch_name ))
346
374
347
375
def delete_branch (self , branch : Any ) -> None :
348
376
"""Deletes a branch.
@@ -384,17 +412,21 @@ def remote_branch_exists(self, branch_name: str) -> bool:
384
412
"""
385
413
return self .get_remote_branch (branch_name ) is not None
386
414
387
- def _get_specific_changes (self , change_type : Optional [str ], commit1 : Any , commit2 : Any ) -> List [str ]:
415
+ def _get_specific_changes (self , change_type : Optional [str ], commit1 : Any ,
416
+ commit2 : Any ) -> List [str ]:
388
417
diff = commit1 .diff (commit2 )
389
418
if change_type :
390
419
change_type = change_type .upper ()
391
420
change_type = change_type if change_type in diff .change_type else None
392
- diff_iterator = diff .iter_change_type (change_type ) if change_type else diff
393
- changes = [change .a_path if change .a_path else change .b_path for change in diff_iterator ]
421
+ diff_iterator = diff .iter_change_type (
422
+ change_type ) if change_type else diff
423
+ changes = [change .a_path if change .a_path else change .b_path for change
424
+ in diff_iterator ]
394
425
return changes
395
426
396
427
def get_changes_list (
397
- self , commit1 : Any , commit2 : Any , change_type : Optional [str ] = None , dir : Optional [str ] = None
428
+ self , commit1 : Any , commit2 : Any ,
429
+ change_type : Optional [str ] = None , dir : Optional [str ] = None
398
430
) -> List [str ]:
399
431
"""Gets change list.
400
432
@@ -414,7 +446,8 @@ def get_changes_list(
414
446
if dir :
415
447
windows_path = dir .replace ("/" , "\\ " )
416
448
linux_path = dir .replace ("\\ " , "/" )
417
- return [change for change in changes if (linux_path in change ) or (windows_path in change )]
449
+ return [change for change in changes if
450
+ (linux_path in change ) or (windows_path in change )]
418
451
else :
419
452
return changes
420
453
@@ -425,19 +458,22 @@ def pull_all(self) -> None:
425
458
def pull (self ) -> None :
426
459
"""Pulls changes on current branch from the remote repository."""
427
460
if self .remote_branch_exists (self .get_current_branch ()):
428
- self .repo .git .pull (self ._get_remote (), self .get_current_branch (), quiet = True )
461
+ self .repo .git .pull (self ._get_remote (), self .get_current_branch (),
462
+ quiet = True )
429
463
430
464
def force_pull (self ) -> None :
431
465
"""Force pulls changes from the remote repository."""
432
- self .repo .git .pull (self ._get_remote (), self .get_current_branch (), quiet = True , force = True )
466
+ self .repo .git .pull (self ._get_remote (), self .get_current_branch (),
467
+ quiet = True , force = True )
433
468
434
469
def push (self ) -> None :
435
470
"""Pushes commits.
436
471
437
472
Pushes changes to the remote repository.
438
473
Pushes also relevant annotated tags when pushing branches out.
439
474
"""
440
- self .repo .git .push ("--follow-tags" , "--set-upstream" , self ._get_remote (), self .get_current_branch ())
475
+ self .repo .git .push ("--follow-tags" , "--set-upstream" ,
476
+ self ._get_remote (), self .get_current_branch ())
441
477
442
478
def push_tag (self ) -> None :
443
479
"""Pushes commits and tags.
@@ -500,21 +536,25 @@ def git_version(self) -> str:
500
536
Returns:
501
537
the version of git in use.
502
538
"""
503
- return "." .join ([str (element ) for element in self .repo .git .version_info ])
539
+ return "." .join (
540
+ [str (element ) for element in self .repo .git .version_info ])
504
541
505
542
def _get_remote (self ) -> Optional [Any ]:
506
543
try :
507
- return self .repo .remote (configuration .get_value (ConfigurationVariable .REMOTE_ALIAS ))
544
+ return self .repo .remote (
545
+ configuration .get_value (ConfigurationVariable .REMOTE_ALIAS ))
508
546
except (IndexError , ValueError ) as e :
509
547
logger .warning (e )
510
548
return None
511
549
512
550
def list_files_added_on_current_branch (self ) -> List [str ]:
513
551
"""Returns a list of files changed against master branch."""
514
- master_branch_commit = self .repo .commit (configuration .get_value (ConfigurationVariable .MASTER_BRANCH ))
552
+ master_branch_commit = self .repo .commit (
553
+ configuration .get_value (ConfigurationVariable .MASTER_BRANCH ))
515
554
current_branch_commit = self .repo .commit (self .get_current_branch ())
516
555
changes = self .get_changes_list (
517
- self .get_branch_point (master_branch_commit , current_branch_commit ), current_branch_commit , change_type = "a"
556
+ self .get_branch_point (master_branch_commit , current_branch_commit ),
557
+ current_branch_commit , change_type = "a"
518
558
)
519
559
return changes
520
560
@@ -537,7 +577,8 @@ def uncommitted_changes(self) -> List[Path]:
537
577
if not status :
538
578
return []
539
579
540
- return [Path (self .root ).joinpath (line .strip ().split (" " )[- 1 ]) for line in status .splitlines ()]
580
+ return [Path (self .root ).joinpath (line .strip ().split (" " )[- 1 ]) for line
581
+ in status .splitlines ()]
541
582
542
583
@staticmethod
543
584
def _apply_modifications (destination : Path , modified_file : Path ) -> None :
@@ -573,8 +614,10 @@ class ProjectGitWrapper(GitWrapper):
573
614
def __init__ (self ) -> None :
574
615
"""Creates a Git Wrapper."""
575
616
super ().__init__ (
576
- path = Path (configuration .get_value (ConfigurationVariable .PROJECT_ROOT )),
577
- repo = Repo (configuration .get_value (ConfigurationVariable .PROJECT_ROOT )),
617
+ path = Path (
618
+ configuration .get_value (ConfigurationVariable .PROJECT_ROOT )),
619
+ repo = Repo (
620
+ configuration .get_value (ConfigurationVariable .PROJECT_ROOT )),
578
621
)
579
622
580
623
@@ -615,7 +658,8 @@ def __init__(self, path: Path, initial_path: Path, repo: Repo) -> None:
615
658
@staticmethod
616
659
def wrap (repo : GitWrapper , initial_location : Path ) -> "GitClone" :
617
660
"""Wraps around around a repository."""
618
- return GitClone (repo = repo .repo , path = repo .root , initial_path = initial_location )
661
+ return GitClone (repo = repo .repo , path = repo .root ,
662
+ initial_path = initial_location )
619
663
620
664
@property
621
665
def initial_location (self ) -> Path :
@@ -638,26 +682,31 @@ def get_corresponding_path(self, path_in_initial_repo: Path) -> Path:
638
682
return Path (self .root ).joinpath (path_in_initial_repo )
639
683
try :
640
684
# Tyring to find if the path corresponds to a file/directory present in intial repository
641
- return Path (self .root ).joinpath (path_in_initial_repo .relative_to (self .initial_location ))
685
+ return Path (self .root ).joinpath (
686
+ path_in_initial_repo .relative_to (self .initial_location ))
642
687
except ValueError :
643
688
return path_in_initial_repo
644
689
645
690
646
691
class GitTempClone :
647
692
"""Context manager providing a temporary cloned repository."""
648
693
649
- def __init__ (self , desired_branch_name : Optional [str ], repository_to_clone : GitWrapper ):
694
+ def __init__ (self , desired_branch_name : Optional [str ],
695
+ repository_to_clone : GitWrapper ):
650
696
"""Constructor.
651
697
652
698
Args:
653
699
desired_branch_name: the branch to consider. I
654
700
repository_to_clone: the repository to clone. If not specified, the project repository will be used.
655
701
"""
656
702
self ._temporary_dir = TemporaryDirectory ()
657
- logger .info (f"Creating a temporary repository in { self ._temporary_dir } " )
703
+ logger .info (
704
+ f"Creating a temporary repository in { self ._temporary_dir } " )
658
705
self ._repo = repository_to_clone
659
- _current_branch_name = desired_branch_name if desired_branch_name else str (self ._repo .get_current_branch ())
660
- self ._clone = GitClone .wrap (self ._repo .clone (self ._temporary_dir .path ), initial_location = self ._repo .root )
706
+ _current_branch_name = desired_branch_name if desired_branch_name else str (
707
+ self ._repo .get_current_branch ())
708
+ self ._clone = GitClone .wrap (self ._repo .clone (self ._temporary_dir .path ),
709
+ initial_location = self ._repo .root )
661
710
self ._clone .checkout (_current_branch_name )
662
711
self ._repo .apply_uncommitted_changes (self ._clone )
663
712
@@ -688,4 +737,5 @@ def __init__(self, desired_branch_name: Optional[str] = None):
688
737
system will try to identify the current branch in the repository which
689
738
will work in most cases but probably not on CI.
690
739
"""
691
- super ().__init__ (desired_branch_name = desired_branch_name , repository_to_clone = ProjectGitWrapper ())
740
+ super ().__init__ (desired_branch_name = desired_branch_name ,
741
+ repository_to_clone = ProjectGitWrapper ())
0 commit comments