Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 0 additions & 17 deletions gh_org_mgr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,6 @@

"""Global init file"""

import logging
from importlib.metadata import version

__version__ = version("github-org-manager")


def configure_logger(debug: bool = False) -> logging.Logger:
"""Set logging options"""
log = logging.getLogger()
logging.basicConfig(
encoding="utf-8",
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
if debug:
log.setLevel(logging.DEBUG)
else:
log.setLevel(logging.INFO)

return log
149 changes: 52 additions & 97 deletions gh_org_mgr/_gh_org.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,18 @@
from github.NamedUser import NamedUser
from github.Organization import Organization
from github.Repository import Repository
from github.Requester import Requester
from github.Team import Team
from jwt.exceptions import InvalidKeyError

from ._gh_api import get_github_secrets_from_env, run_graphql_query
from ._helpers import (
compare_two_dicts,
compare_two_lists,
dict_to_pretty_string,
sluggify_teamname,
)
from ._stats import OrgChanges


@dataclass
Expand All @@ -47,6 +55,7 @@ class GHorg: # pylint: disable=too-many-instance-attributes, too-many-lines
configured_repos_collaborators: dict[str, dict[str, str]] = field(default_factory=dict)
archived_repos: list[Repository] = field(default_factory=list)
unconfigured_team_repo_permissions: dict[str, dict[str, str]] = field(default_factory=dict)
stats: OrgChanges = field(default_factory=OrgChanges)

# Re-usable Constants
TEAM_CONFIG_FIELDS: dict[str, dict[str, str | None]] = field( # pylint: disable=invalid-name
Expand All @@ -61,12 +70,6 @@ class GHorg: # pylint: disable=too-many-instance-attributes, too-many-lines
# --------------------------------------------------------------------------
# Helper functions
# --------------------------------------------------------------------------
def _sluggify_teamname(self, team: str) -> str:
"""Slugify a GitHub team name"""
# TODO: this is very naive, no other special chars are
# supported, or multiple spaces etc.
return team.replace(" ", "-")

# amazonq-ignore-next-line
def login(
self, orgname: str, token: str = "", app_id: str | int = "", app_private_key: str = ""
Expand Down Expand Up @@ -117,88 +120,9 @@ def ratelimit(self):
"Current rate limit: %s/%s (reset: %s)", core.remaining, core.limit, core.reset
)

def pretty_print_dict(self, dictionary: dict) -> str:
"""Convert a dict to a pretty-printed output"""

# Censor sensible fields
def censor_half_string(string: str) -> str:
"""Censor 50% of a string (rounded up)"""
half1 = int(len(string) / 2)
half2 = len(string) - half1
return string[:half1] + "*" * (half2)

sensible_keys = ["gh_token", "gh_app_private_key"]
for key in sensible_keys:
if value := dictionary.get(key, ""):
dictionary[key] = censor_half_string(value)

# Print dict nicely
def pretty(d, indent=0):
string = ""
for key, value in d.items():
string += " " * indent + str(key) + ":\n"
if isinstance(value, dict):
string += pretty(value, indent + 1)
else:
string += " " * (indent + 1) + str(value) + "\n"

return string

return pretty(dictionary)

def pretty_print_dataclass(self) -> str:
"""Convert this dataclass to a pretty-printed output"""
return self.pretty_print_dict(asdict(self))

def compare_two_lists(self, list1: list[str], list2: list[str]):
"""
Compares two lists of strings and returns a tuple containing elements
missing in each list and common elements.

Args:
list1 (list of str): The first list of strings.
list2 (list of str): The second list of strings.

Returns:
tuple: A tuple containing three lists:
1. The first list contains elements in `list2` that are missing in `list1`.
2. The second list contains elements that are present in both `list1` and `list2`.
3. The third list contains elements in `list1` that are missing in `list2`.

Example:
>>> list1 = ["apple", "banana", "cherry"]
>>> list2 = ["banana", "cherry", "date", "fig"]
>>> compare_lists(list1, list2)
(['date', 'fig'], ['banana', 'cherry'], ['apple'])
"""
# Convert lists to sets for easier comparison
set1, set2 = set(list1), set(list2)

# Elements in list2 that are missing in list1
missing_in_list1 = list(set2 - set1)

# Elements present in both lists
common_elements = list(set1 & set2)

# Elements in list1 that are missing in list2
missing_in_list2 = list(set1 - set2)

# Return the result as a tuple
return (missing_in_list1, common_elements, missing_in_list2)

def compare_two_dicts(self, dict1: dict, dict2: dict) -> dict[str, dict[str, str | int | None]]:
"""Compares two dictionaries. Assume that the keys are the same. Output
a dict with keys that have differing values"""
# Create an empty dictionary to store differences
differences = {}

# Iterate through the keys (assuming both dictionaries have the same keys)
for key in dict1:
# Compare the values for each key
if dict1[key] != dict2[key]:
differences[key] = {"dict1": dict1[key], "dict2": dict2[key]}

return differences
return dict_to_pretty_string(asdict(self), sensible_keys=["gh_token", "gh_app_private_key"])

def _resolve_gh_username(self, username: str, teamname: str) -> NamedUser | None:
"""Turn a username into a proper GitHub user object"""
Expand Down Expand Up @@ -293,7 +217,7 @@ def sync_org_owners(self, dry: bool = False, force: bool = False) -> None:
return

# Get differences between the current and configured owners
owners_remove, owners_ok, owners_add = self.compare_two_lists(
owners_remove, owners_ok, owners_add = compare_two_lists(
self.configured_org_owners, [user.login for user in self.current_org_owners]
)
# Compare configured (lower-cased) owners with lower-cased list of current owners
Expand All @@ -314,6 +238,7 @@ def sync_org_owners(self, dry: bool = False, force: bool = False) -> None:
for user in owners_add:
if gh_user := self._resolve_gh_username(user, "<org owners>"):
logging.info("Adding user '%s' as organization owner", gh_user.login)
self.stats.add_owner(gh_user.login)
if not dry:
self.org.add_to_members(gh_user, "admin")

Expand All @@ -325,6 +250,7 @@ def sync_org_owners(self, dry: bool = False, force: bool = False) -> None:
"Will make them a normal member",
gh_user.login,
)
self.stats.degrade_owner(gh_user.login)
# Handle authenticated user being the same as the one you want to degrade
if self._is_user_authenticated_user(gh_user):
logging.warning(
Expand Down Expand Up @@ -370,9 +296,10 @@ def create_missing_teams(self, dry: bool = False):
for team, attributes in self.configured_teams.items():
if team not in existent_team_names:
if parent := attributes.get("parent"): # type: ignore
parent_id = self.org.get_team_by_slug(self._sluggify_teamname(parent)).id
parent_id = self.org.get_team_by_slug(sluggify_teamname(parent)).id

logging.info("Creating team '%s' with parent ID '%s'", team, parent_id)
self.stats.create_team(team)
# NOTE: We do not specify any team settings (description etc)
# here, this will happen later
if not dry:
Expand All @@ -385,6 +312,7 @@ def create_missing_teams(self, dry: bool = False):

else:
logging.info("Creating team '%s' without parent", team)
self.stats.create_team(team)
if not dry:
self.org.create_team(
team,
Expand All @@ -410,7 +338,7 @@ def _prepare_team_config_for_sync(
# team coming from config, and valid string
elif isinstance(parent, str) and parent:
team_config["parent_team_id"] = self.org.get_team_by_slug(
self._sluggify_teamname(parent)
sluggify_teamname(parent)
).id
# empty from string, so probably default value
elif isinstance(parent, str) and not parent:
Expand Down Expand Up @@ -471,16 +399,17 @@ def sync_current_teams_settings(self, dry: bool = False) -> None:
)

# Compare settings and update if necessary
if differences := self.compare_two_dicts(configured_team_configs, current_team_configs):
if differences := compare_two_dicts(configured_team_configs, current_team_configs):
# Log differences
logging.info(
"Team settings for '%s' differ from the configuration. Updating them:",
team.name,
)
for setting, diff in differences.items():
logging.info(
"Setting '%s': '%s' --> '%s'", setting, diff["dict2"], diff["dict1"]
)
change_str = f"Setting '{setting}': '{diff['dict2']}' --> '{diff['dict1']}'"
logging.info(change_str)
self.stats.edit_team_config(team.name, new_config=change_str)

# Execute team setting changes
if not dry:
try:
Expand All @@ -489,7 +418,7 @@ def sync_current_teams_settings(self, dry: bool = False) -> None:
logging.critical(
"Team '%s' settings could not be edited. Error: \n%s",
team.name,
self.pretty_print_dict(exc.data),
dict_to_pretty_string(exc.data),
)
sys.exit(1)
else:
Expand Down Expand Up @@ -611,6 +540,7 @@ def sync_teams_members(self, dry: bool = False) -> None: # pylint: disable=too-
team.name,
config_role,
)
self.stats.pending_team_member(team=team.name, user=gh_user.login)
continue

logging.info(
Expand All @@ -619,6 +549,7 @@ def sync_teams_members(self, dry: bool = False) -> None: # pylint: disable=too-
team.name,
config_role,
)
self.stats.add_team_member(team=team.name, user=gh_user.login)
if not dry:
self._add_or_update_user_in_team(team=team, user=gh_user, role=config_role)

Expand All @@ -633,6 +564,7 @@ def sync_teams_members(self, dry: bool = False) -> None: # pylint: disable=too-
team.name,
config_role,
)
self.stats.change_team_member_role(team=team.name, user=gh_user.login)
if not dry:
self._add_or_update_user_in_team(team=team, user=gh_user, role=config_role)

Expand All @@ -651,6 +583,7 @@ def sync_teams_members(self, dry: bool = False) -> None: # pylint: disable=too-
gh_user.login,
team.name,
)
self.stats.remove_team_member(team=team.name, user=gh_user.login)
if not dry:
team.remove_membership(gh_user)
else:
Expand All @@ -675,6 +608,7 @@ def get_unconfigured_teams(
if delete_unconfigured_teams:
for team in unconfigured_teams:
logging.info("Deleting team '%s' as it is not configured locally", team.name)
self.stats.delete_team(team=team.name, deleted=True)
if not dry:
team.delete()
else:
Expand All @@ -684,6 +618,8 @@ def get_unconfigured_teams(
"configured locally: %s. Taking no action about these teams.",
", ".join(unconfigured_teams_str),
)
for team in unconfigured_teams:
self.stats.delete_team(team=team.name, deleted=False)

def get_members_without_team(
self, dry: bool = False, remove_members_without_team: bool = False
Expand Down Expand Up @@ -713,6 +649,7 @@ def get_members_without_team(
"Removing user '%s' from organisation as they are not member of any team",
user.login,
)
self.stats.remove_member_without_team(user=user.login, removed=True)
if not dry:
self.org.remove_from_membership(user)
else:
Expand All @@ -722,6 +659,8 @@ def get_members_without_team(
"member of any team: %s",
", ".join(members_without_team_str),
)
for user in members_without_team:
self.stats.remove_member_without_team(user=user.login, removed=False)

# --------------------------------------------------------------------------
# Repos
Expand Down Expand Up @@ -754,7 +693,7 @@ def _create_perms_changelist_for_teams(

# Convert team name to Team object
try:
team = self.org.get_team_by_slug(self._sluggify_teamname(team_name))
team = self.org.get_team_by_slug(sluggify_teamname(team_name))
# Team not found, probably because a new team should be created, but it's a dry-run
except UnknownObjectException:
logging.debug(
Expand All @@ -763,12 +702,21 @@ def _create_perms_changelist_for_teams(
)
# Initialise a new Team() object with the name, manually
team = Team(
requester=None, # type: ignore
requester=Requester(
auth=None,
base_url="https://api.github.com",
timeout=10,
user_agent="",
per_page=100,
verify=True,
retry=3,
pool_size=200,
),
headers={}, # No headers required
attributes={
"id": 0,
"name": team_name,
"slug": self._sluggify_teamname(team_name),
"slug": sluggify_teamname(team_name),
},
completed=True, # Mark as fully initialized
)
Expand Down Expand Up @@ -840,6 +788,7 @@ def sync_repo_permissions(self, dry: bool = False, ignore_archived: bool = False
team.name,
perm,
)
self.stats.change_repo_team_permissions(repo=repo.name, team=team.name, perm=perm)
if not dry:
# Update permissions or newly add a team to a repo
team.update_team_repository(repo, perm)
Expand All @@ -865,6 +814,10 @@ def sync_repo_permissions(self, dry: bool = False, ignore_archived: bool = False
self._document_unconfigured_team_repo_permissions(
team=team, team_permission=teams[team], repo_name=repo.name
)
# Collect this status in the stats
self.stats.document_unconfigured_team_permissions(
team=team.name, repo=repo.name, perm=teams[team]
)
# Abort handling the repo sync as we don't touch unconfigured teams
continue
# Handle: Team is configured, but contains no config
Expand All @@ -883,6 +836,7 @@ def sync_repo_permissions(self, dry: bool = False, ignore_archived: bool = False
# Remove if any mismatch has been found
if remove:
logging.info("Removing team '%s' from repository '%s'", team.name, repo.name)
self.stats.remove_team_from_repo(repo=repo.name, team=team.name)
if not dry:
team.remove_from_repos(repo)

Expand Down Expand Up @@ -1328,5 +1282,6 @@ def sync_repo_collaborator_permissions(self, dry: bool = False):
)

# Remove collaborator
self.stats.remove_repo_collaborator(repo=repo.name, user=username)
if not dry:
repo.remove_from_collaborators(username)
Loading