diff --git a/main.py b/main.py index e37868e..5cc2471 100644 --- a/main.py +++ b/main.py @@ -2,19 +2,20 @@ import sys +from github_actions.common import AllowEnvironments +from github_actions.common import Credentials +from github_actions.common import ExtendedEnvironmentSelector +from github_actions.common import GitHubContext +from github_actions.common import ProjectInitializer, GETTenant +from github_actions.common import RMKInstaller + from src.actions.actions import RMKCLIExecutor -from src.actions.init_project import ProjectInitializer, GETTenant -from src.credentials.cluster_provider_credentials import Credentials -from src.input_output.input import ArgumentParser -from src.select_environment.allowed_environments import AllowEnvironments -from src.select_environment.select_environment import ExtendedEnvironmentSelector -from src.utils.github_environment_variables import GitHubContext -from src.utils.install_rmk import RMKInstaller +from src.input_output.input import GitLabflowCDArgumentParser if __name__ == "__main__": try: """Parse command-line arguments""" - args = ArgumentParser().parse_args() + args = GitLabflowCDArgumentParser().parse_args() """Retrieve GitHub Action environment variables""" github_context = GitHubContext.from_env( diff --git a/requirements.txt b/requirements.txt index b33ca47..0a94613 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,5 @@ GitPython~=3.1.44 PyGithub~=2.5.0 slack_sdk~=3.34.0 packaging~=24.2 + +git+https://github.com/edenlabllc/github_actions.common.git@v1#egg=github_actions.common diff --git a/src/actions/actions.py b/src/actions/actions.py index 92d2a20..da8746b 100644 --- a/src/actions/actions.py +++ b/src/actions/actions.py @@ -4,9 +4,9 @@ from collections import defaultdict from git import Repo -from ..input_output.output import GitHubOutput -from ..utils.cmd import BaseCommand, CMDInterface -from ..utils.github_environment_variables import GitHubContext +from github_actions.common import GitHubOutput +from github_actions.common import BaseCommand, CMDInterface +from github_actions.common import GitHubContext class DestroyCommand(BaseCommand): @@ -162,7 +162,7 @@ def _normalize_error_block(block: list) -> str: @staticmethod def _capi_controller_name_prefix(provider: str) -> str | None: - return {"aws":"capa", "azure":"capz", "gcp":"capg"}.get(provider) + return {"aws": "capa", "azure": "capz", "gcp": "capg"}.get(provider) def _get_capi_resources_info(self) -> str | None: print("Fetching all Cluster API (CAPI) resources...") diff --git a/src/actions/init_project.py b/src/actions/init_project.py deleted file mode 100644 index 5e669f3..0000000 --- a/src/actions/init_project.py +++ /dev/null @@ -1,78 +0,0 @@ -import json -import os - -from argparse import Namespace -from git import Repo, GitCommandError - -from ..utils.cmd import BaseCommand, CMDInterface - - -class RMKConfigInitCommand(BaseCommand, CMDInterface): - def __init__(self, environment: str, args: Namespace): - super().__init__(environment) - self.cluster_provider = args.rmk_cluster_provider - self.github_token = args.github_token - self.slack_notification = args.rmk_slack_notifications - self.slack_channel = args.rmk_slack_channel - self.slack_message_details = args.rmk_slack_message_details - self.slack_webhook = args.rmk_slack_webhook - - def execute(self): - self.run() - - def run(self): - """Configure Slack notifications if enabled.""" - os.environ["RMK_GITHUB_TOKEN"] = self.github_token - if self.slack_notification == "true": - os.environ["RMK_SLACK_WEBHOOK"] = self.slack_webhook - os.environ["RMK_SLACK_CHANNEL"] = self.slack_channel - - flags_slack_message_details = "" - if self.slack_message_details.splitlines(): - flags_slack_message_details = " ".join( - [f'--slack-message-details="{detail}"' for detail in self.slack_message_details.splitlines()] - ) - - self.run_command(f"rmk config init --cluster-provider={self.cluster_provider}" - f" --progress-bar=false --slack-notifications {flags_slack_message_details}") - else: - self.run_command(f"rmk config init --cluster-provider={self.cluster_provider} --progress-bar=false") - - -class GETTenant(BaseCommand, CMDInterface): - def __init__(self, environment: str): - super().__init__(environment) - - def execute(self) -> str: - return self.run() - - def run(self) -> str: - output = self.run_command(f"rmk --log-format=json config view", True) - rmk_config = json.loads(output) - return rmk_config["config"]["Tenant"] - - -class ProjectInitializer: - GIT_CONFIG = { - "name": "github-actions", - "email": "github-actions@github.com", - } - - def __init__(self, environment: str): - print("Initialize project repository.") - self.environment = environment - self.configure_git() - - def configure_git(self): - """Configure Git user settings.""" - try: - repo = Repo(".") - repo.config_writer().set_value("user", "name", self.GIT_CONFIG["name"]).release() - repo.config_writer().set_value("user", "email", self.GIT_CONFIG["email"]).release() - except GitCommandError as err: - raise ValueError(f"failed to configure Git: {err}") - - def configure_rmk_init(self, args: Namespace): - """Configure Slack notifications using SlackConfigCommand.""" - rmk_init = RMKConfigInitCommand(self.environment, args) - rmk_init.execute() diff --git a/src/credentials/cluster_provider_credentials.py b/src/credentials/cluster_provider_credentials.py deleted file mode 100644 index dd8dea7..0000000 --- a/src/credentials/cluster_provider_credentials.py +++ /dev/null @@ -1,139 +0,0 @@ -import json -import os - -from dataclasses import dataclass, field -from typing import Dict, Optional - - -@dataclass -class AWSConfig: - AWS_ACCESS_KEY_ID: str - AWS_SECRET_ACCESS_KEY: str - AWS_REGION: str - - -@dataclass -class AzureConfig: - AZURE_CLIENT_ID: str - AZURE_CLIENT_SECRET: str - AZURE_LOCATION: str - AZURE_SUBSCRIPTION_ID: str - AZURE_TENANT_ID: str - - -@dataclass -class GCPConfig: - GOOGLE_APPLICATION_CREDENTIALS: str - GCP_REGION: str - - -@dataclass -class ClusterProviders: - aws: Optional[AWSConfig] = field(default=None) - azure: Optional[AzureConfig] = field(default=None) - gcp: Optional[GCPConfig] = field(default=None) - - -@dataclass -class EnvironmentConfig: - cluster_providers: ClusterProviders - - -class Credentials: - def __init__(self, json_data: str): - self.environments: Dict[str, EnvironmentConfig] = self._parse_json(json_data) - - @staticmethod - def _parse_json(json_data: str) -> Dict[str, EnvironmentConfig]: - """Parses the JSON input and validates required fields.""" - try: - data = json.loads(json_data) - if not isinstance(data, dict): - raise ValueError("invalid JSON format: expected a dictionary") - except json.JSONDecodeError as err: - raise ValueError(f"failed to parse JSON: {err}") - - environments = {} - for env_name, env_data in data.items(): - try: - cluster_providers = env_data.get("cluster_providers", {}) - - environments[env_name] = EnvironmentConfig( - cluster_providers=ClusterProviders( - aws=AWSConfig(**cluster_providers["aws"]) if "aws" in cluster_providers else None, - azure=AzureConfig(**cluster_providers["azure"]) if "azure" in cluster_providers else None, - gcp=GCPConfig(**cluster_providers["gcp"]) if "gcp" in cluster_providers else None, - ) - ) - except TypeError as err: - raise ValueError(f"invalid structure for environment '{env_name}': {err}") - - return environments - - def get_environment(self, env_name: str) -> Optional[EnvironmentConfig]: - return self.environments.get(env_name, None) - - def list_environments(self) -> list: - return list(self.environments.keys()) - - @staticmethod - def save_gcp_credentials(credentials_content: str) -> str: - """Save GCP credentials content to a file and return its path with validation.""" - if not credentials_content: - raise ValueError("GCP credentials content is empty or invalid") - - if isinstance(credentials_content, dict): - credentials_json = credentials_content - else: - try: - credentials_json = json.loads(json.dumps(credentials_content)) - if not isinstance(credentials_json, dict): - raise ValueError("invalid GCP credentials format, expected a JSON object") - except json.JSONDecodeError as err: - raise ValueError(f"failed to parse GCP credentials JSON: {err}") - - file_path = "gcp-credentials.json" - try: - with open(file_path, "w") as cred_file: - json.dump(credentials_json, cred_file, indent=4) - except IOError as err: - raise IOError(f"failed to write GCP credentials file: {err}") - - return os.path.abspath(file_path) - - def set_env_variables(self, env_name: str, provider: str): - """Set environment variables based on the selected cluster provider.""" - env_config = self.get_environment(env_name) - if not env_config: - raise ValueError(f"environment '{env_name}' not found in credentials values") - - providers = env_config.cluster_providers - - match provider.lower(): - case "aws": - os.environ.update({ - "AWS_ACCESS_KEY_ID": providers.aws.AWS_ACCESS_KEY_ID, - "AWS_SECRET_ACCESS_KEY": providers.aws.AWS_SECRET_ACCESS_KEY, - "AWS_REGION": providers.aws.AWS_REGION, - }) - case "azure": - os.environ.update({ - "AZURE_CLIENT_ID": providers.azure.AZURE_CLIENT_ID, - "AZURE_CLIENT_SECRET": providers.azure.AZURE_CLIENT_SECRET, - "AZURE_LOCATION": providers.azure.AZURE_LOCATION, - "AZURE_SUBSCRIPTION_ID": providers.azure.AZURE_SUBSCRIPTION_ID, - "AZURE_TENANT_ID": providers.azure.AZURE_TENANT_ID, - }) - case "gcp": - credentials_path = self.save_gcp_credentials(providers.gcp.GOOGLE_APPLICATION_CREDENTIALS) - os.environ.update({ - "GOOGLE_APPLICATION_CREDENTIALS": credentials_path, - "GCP_REGION": providers.gcp.GCP_REGION, - }) - case _: - raise ValueError(f"invalid provider '{provider}', supported providers: aws, azure, gcp") - - print(f"Credentials as environment variables set for {env_name} with cluster provider: {provider}.") - - def __repr__(self): - return f"Credentials(environments={self.environments})" diff --git a/src/input_output/input.py b/src/input_output/input.py index 6b980bb..6c9c954 100644 --- a/src/input_output/input.py +++ b/src/input_output/input.py @@ -1,24 +1,7 @@ -import os -import argparse +from github_actions.common.input_output.input import ArgumentParser -class ArgumentParser: - class EnvDefault(argparse.Action): - def __init__(self, envvar, required=True, default=None, **kwargs): - if envvar: - if envvar in os.environ: - default = os.environ.get(envvar, default) - if required and default: - required = False - super(ArgumentParser.EnvDefault, self).__init__(default=default, required=required, metavar=envvar, **kwargs) - - def __call__(self, parser, namespace, values, option_string=None): - setattr(namespace, self.dest, values) - - def __init__(self): - self.parser = argparse.ArgumentParser() - self.setup_arguments() - +class GitLabflowCDArgumentParser(ArgumentParser): def setup_arguments(self): self.parser.add_argument("--allowed-environments", action=self.EnvDefault, envvar="INPUT_ALLOWED_ENVIRONMENTS", @@ -95,6 +78,3 @@ def setup_arguments(self): self.parser.add_argument("--rmk-version", action=self.EnvDefault, envvar="INPUT_RMK_VERSION", type=str, default='latest') - - def parse_args(self): - return self.parser.parse_args() diff --git a/src/input_output/output.py b/src/input_output/output.py deleted file mode 100644 index 7f1ce17..0000000 --- a/src/input_output/output.py +++ /dev/null @@ -1,14 +0,0 @@ -import os - -from typing import Dict - -class GitHubOutput: - def __init__(self): - self.is_github_actions_runner = 'GITHUB_OUTPUT' in os.environ - - def output_dict(self, body: Dict[str, str]): - if self.is_github_actions_runner: - with open(os.environ['GITHUB_OUTPUT'], 'a') as f: - for key in body: print(f"{key}={body[key]}", file=f) - else: - print("Skip output counters as GitHub Actions outputs.") diff --git a/src/notification/slack_notification.py b/src/notification/slack_notification.py deleted file mode 100644 index 525ebb8..0000000 --- a/src/notification/slack_notification.py +++ /dev/null @@ -1,74 +0,0 @@ -from argparse import Namespace - -from slack_sdk.webhook import WebhookClient -from github import Github, GithubException - -from src.utils.github_environment_variables import GitHubContext - - -class SlackNotifier: - ICONS = { - "Success": "https://img.icons8.com/doodle/48/000000/add.png", - "Failure": "https://img.icons8.com/office/40/000000/minus.png", - "Skip": "https://img.icons8.com/ios-filled/50/000000/0-degrees.png", - } - - def __init__(self, github_context: GitHubContext, args: Namespace, status, message, additional_info=None, tenant=None): - self.branch = github_context.ref_name - self.status = status - self.message = message - self.additional_info = additional_info or {} - self.tenant = tenant or "" - self.icon_url = self.ICONS.get(status, self.ICONS["Skip"]) - self.github_context = github_context - - if not args.github_token or not args.github_token.strip(): - raise ValueError("GitHub token is missing or empty") - self.github_client = Github(args.github_token) - - if not args.rmk_slack_webhook or not args.rmk_slack_webhook.strip(): - raise ValueError("Slack webhook token is missing or empty") - self.webhook_client = WebhookClient(args.rmk_slack_webhook) - - def get_action_job_url(self): - try: - repo = self.github_client.get_repo(self.github_context.repository) - runs = repo.get_workflow_runs() - for run in runs: - if str(run.id) == self.github_context.run_id: - jobs = run.jobs() - job_id = jobs[0].id if jobs.totalCount > 0 else None - if job_id: - return f"{self.github_context.get_github_url()}/actions/runs/{ self.github_context.run_id}/job/{job_id}" - except GithubException as err: - raise ValueError(f"accessing GitHub API: {err}") - - def construct_payload(self, action_job_url, action_run_by): - payload_text = ( - f"*Action run by*: {action_run_by}\n" - f"*Action job URL*: {action_job_url}\n" - f"*Tenant*: {self.tenant}\n" - f"*Branch*: {self.branch}\n" - f"*Status*: {self.status}\n" - f"*Message*: {self.message}\n" - ) - for key, value in self.additional_info.items(): - payload_text += f"*{key}*: {value}\n" - - return { - "username": "GitLabFlow Action", - "icon_url": self.icon_url, - "text": payload_text - } - - def notify(self): - try: - action_job_url = self.get_action_job_url() - action_run_by = self.github_context.actor \ - if self.github_context.event_name == "workflow_dispatch" else "ci-cd-fhir-user" - payload = self.construct_payload(action_job_url, action_run_by) - response = self.webhook_client.send_dict(payload) - except Exception as err: - raise ValueError(f"sending webhook request: {err}") - - return response.status_code diff --git a/src/select_environment/allowed_environments.py b/src/select_environment/allowed_environments.py deleted file mode 100644 index 25cafa3..0000000 --- a/src/select_environment/allowed_environments.py +++ /dev/null @@ -1,14 +0,0 @@ -from argparse import Namespace - - -class AllowEnvironments: - def __init__(self, args: Namespace, environment: str): - self.args = args - self.environment = environment - - def validate(self): - environments = self.args.allowed_environments.split(",") - if len([env for env in environments if env == self.environment]) == 0: - raise ValueError(f"environment {self.environment} is not allowed") - - print(f"Environment {self.environment} is allowed") diff --git a/src/select_environment/select_environment.py b/src/select_environment/select_environment.py deleted file mode 100644 index cfcb6a5..0000000 --- a/src/select_environment/select_environment.py +++ /dev/null @@ -1,56 +0,0 @@ -import re - -from abc import ABC, abstractmethod -from git import Repo - -from src.utils.github_environment_variables import GitHubContext - - -class EnvironmentSelectorInterface(ABC): - @abstractmethod - def select_environment(self, github_context: GitHubContext) -> str: - pass - - -class EnvironmentSelector(EnvironmentSelectorInterface): - TASK_NUM_REGEXP = r"[a-z]+-\d+" - SEMVER_REGEXP = r"v\d+\.\d+\.\d+(-rc)?$" - - PREFIX_FEATURE_BRANCH = "feature" - PREFIX_RELEASE_BRANCH = "release" - - SELECT_FEATURE_BRANCHES = fr"{PREFIX_FEATURE_BRANCH}/{TASK_NUM_REGEXP}" - SELECT_RELEASE_BRANCHES = fr"{PREFIX_RELEASE_BRANCH}/{TASK_NUM_REGEXP}|{PREFIX_RELEASE_BRANCH}/{SEMVER_REGEXP}" - - SELECT_ORIGIN_FEATURE_BRANCHES = fr"origin/{PREFIX_FEATURE_BRANCH}/{TASK_NUM_REGEXP}" - SELECT_ORIGIN_RELEASE_BRANCHES = fr"origin/{PREFIX_RELEASE_BRANCH}/{TASK_NUM_REGEXP}|origin/{PREFIX_RELEASE_BRANCH}/{SEMVER_REGEXP}" - - SELECT_ALL_BRANCHES = fr"{SELECT_FEATURE_BRANCHES}|{SELECT_RELEASE_BRANCHES}" - SELECT_ORIGIN_ALL_BRANCHES = fr"{SELECT_ORIGIN_FEATURE_BRANCHES}|{SELECT_ORIGIN_RELEASE_BRANCHES}" - - def select_environment(self, github_context: GitHubContext) -> str: - if re.match(r"^(develop|staging|production)$", github_context.ref_name, re.IGNORECASE): - return github_context.ref_name - - if re.match(EnvironmentSelector.SELECT_FEATURE_BRANCHES, github_context.ref_name, re.IGNORECASE): - return "develop" - - if re.match(EnvironmentSelector.SELECT_RELEASE_BRANCHES, github_context.ref_name, re.IGNORECASE): - if re.search(EnvironmentSelector.SEMVER_REGEXP, github_context.ref_name, re.IGNORECASE): - if "-rc" in github_context.ref_name: - return "staging" - return "production" - return "staging" - - raise ValueError(f"environment '{github_context.ref_name}' not allowed for environment selection") - - -class ExtendedEnvironmentSelector(EnvironmentSelector): - def select_environment(self, github_context: GitHubContext) -> str: - if github_context.event_name == "pull_request": - repo = Repo(".") - github_context.ref_name = repo.active_branch.name - - if github_context.ref_name.startswith("hotfix/"): - return "production" - return super().select_environment(github_context) diff --git a/src/utils/cmd.py b/src/utils/cmd.py deleted file mode 100644 index 25d9bd7..0000000 --- a/src/utils/cmd.py +++ /dev/null @@ -1,46 +0,0 @@ -import subprocess - -from abc import ABC, abstractmethod -from argparse import Namespace - -from src.notification.slack_notification import SlackNotifier -from src.utils.github_environment_variables import GitHubContext - - -class CMDInterface(ABC): - @abstractmethod - def execute(self): - pass - - -class BaseCommand(ABC): - def __init__(self, environment: str): - self.environment = environment - - @abstractmethod - def run(self): - pass - - @staticmethod - def run_command(cmd: str, capture_output: bool = False): - try: - if capture_output: - result = subprocess.run(cmd, shell=True, check=True, text=True, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - return result.stdout.strip() - else: - subprocess.run(cmd, shell=True, check=True) - return None - except subprocess.CalledProcessError as err: - raise ValueError(f"command '{cmd}' failed with exit code {err.returncode}") - - def notify_slack(self, github_context: GitHubContext, args: Namespace, status, message, - additional_info=None, tenant=None, slack_message_log_output: bool = True): - notifier = SlackNotifier(github_context, args, status=status, message=message, - additional_info=additional_info, tenant=tenant) - response_code = notifier.notify() - print(f"Slack notification sent with response code: {response_code}") - if slack_message_log_output: - print(f"Slack notification - Status: {status}, Environment: {self.environment}, Message: {message}") - else: - print(f"Slack notification - Status: {status}, Environment: {self.environment}") diff --git a/src/utils/github_environment_variables.py b/src/utils/github_environment_variables.py deleted file mode 100644 index a83b378..0000000 --- a/src/utils/github_environment_variables.py +++ /dev/null @@ -1,132 +0,0 @@ -# GITHUB_ACTOR=user -# GITHUB_API_URL=https://api.github.com/ -# GITHUB_BASE_REF=feature/FFS-2-test -# GITHUB_EVENT_NAME=workflow_dispatch -# GITHUB_HEAD_REF=feature/FFS-3-test -# GITHUB_REF=refs/heads/feature-branch-1 -# GITHUB_REF_NAME=feature/FFS-123-test -# GITHUB_REF_TYPE=branch -# GITHUB_REPOSITORY=octocat/Hello-World -# GITHUB_REPOSITORY_OWNER=octocat -# GITHUB_RUN_ATTEMPT=1 -# GITHUB_RUN_ID=1658821493 -# GITHUB_RUN_NUMBER=1 -# GITHUB_SERVER_URL=https://github.com -# GITHUB_SHA=ffac537e6cbbf934b08745a378932722df287a53 - -import os - -from dataclasses import dataclass -from typing import Optional, Dict, List - - -@dataclass -class GitHubContext: - actor: str - api_url: str - base_ref: str - event_name: str - head_ref: str - ref: str - ref_name: str - ref_type: str - repository: str - repository_owner: str - run_attempt: str - run_id: str - run_number: str - server_url: str - sha: str - - @staticmethod - def from_env(github_custom_ref="", github_custom_ref_name="") -> "GitHubContext": - required_env_vars = [ - "GITHUB_ACTOR", - "GITHUB_API_URL", - "GITHUB_BASE_REF", - "GITHUB_EVENT_NAME", - "GITHUB_HEAD_REF", - "GITHUB_REF", - "GITHUB_REF_NAME", - "GITHUB_REF_TYPE", - "GITHUB_REPOSITORY", - "GITHUB_REPOSITORY_OWNER", - "GITHUB_RUN_ATTEMPT", - "GITHUB_RUN_ID", - "GITHUB_RUN_NUMBER", - "GITHUB_SERVER_URL", - "GITHUB_SHA", - ] - - missing_vars = [var for var in required_env_vars if os.getenv(var) is None] - if missing_vars: - raise ValueError(f"missing required environment variables: {', '.join(missing_vars)}") - - return GitHubContext( - actor=os.getenv("GITHUB_ACTOR"), - api_url=os.getenv("GITHUB_API_URL"), - base_ref=os.getenv("GITHUB_BASE_REF"), - event_name=os.getenv("GITHUB_EVENT_NAME"), - head_ref=os.getenv("GITHUB_HEAD_REF"), - ref=github_custom_ref if github_custom_ref else os.getenv("GITHUB_REF"), - ref_name=github_custom_ref_name if github_custom_ref_name else os.getenv("GITHUB_REF_NAME"), - ref_type=os.getenv("GITHUB_REF_TYPE"), - repository=os.getenv("GITHUB_REPOSITORY"), - repository_owner=os.getenv("GITHUB_REPOSITORY_OWNER"), - run_attempt=os.getenv("GITHUB_RUN_ATTEMPT"), - run_id=os.getenv("GITHUB_RUN_ID"), - run_number=os.getenv("GITHUB_RUN_NUMBER"), - server_url=os.getenv("GITHUB_SERVER_URL"), - sha=os.getenv("GITHUB_SHA"), - ) - - def to_list(self) -> List[str]: - """Return context attributes as a list.""" - return [ - f"actor: {self.actor}", - f"api_url: {self.api_url}", - f"base_ref: {self.base_ref}", - f"event_name: {self.event_name}", - f"head_ref: {self.head_ref}", - f"ref: {self.ref}", - f"ref_name: {self.ref_name}", - f"ref_type: {self.ref_type}", - f"repository: {self.repository}", - f"repository_owner: {self.repository_owner}", - f"run_attempt: {self.run_attempt}", - f"run_id: {self.run_id}", - f"run_number: {self.run_number}", - f"server_url: {self.server_url}", - f"sha: {self.sha}", - ] - - def to_string(self) -> str: - """Return context attributes as a single formatted string.""" - return " | ".join(self.to_list()) - - def search_key(self, key: str) -> Optional[str]: - """Quick search for a specific key in the context attributes.""" - context_dict = self.__dict__ - return context_dict.get(key, None) - - def validate_repository_format(self) -> bool: - """Check if the repository follows the format 'owner/repo'.""" - return "/" in self.repository and len(self.repository.split("/")) == 2 - - def get_github_url(self) -> str: - """Generate the GitHub repository URL.""" - return f"{self.server_url}/{self.repository}" - - def get_env_as_dict(self) -> Dict[str, str]: - """Return all attributes as a dictionary.""" - return self.__dict__ - - def get_repository_name(self) -> str: - """Extract and return the repository name from 'owner/repo'.""" - if self.validate_repository_format(): - return self.repository.split("/")[1] - return "Invalid Repository Format" - - def get_action_job_api_url(self) -> str: - """Generate the GitHub Actions job API URL.""" - return f"{self.api_url}/repos/{self.repository}/actions/runs/{self.run_id}/attempts/{self.run_attempt}/jobs" diff --git a/src/utils/install_rmk.py b/src/utils/install_rmk.py deleted file mode 100644 index 67d7235..0000000 --- a/src/utils/install_rmk.py +++ /dev/null @@ -1,38 +0,0 @@ -import subprocess -import requests - -from argparse import Namespace -from packaging import version - - -class RMKInstaller: - def __init__(self, args: Namespace): - self.version = args.rmk_version - self.url = args.rmk_download_url - self.verify_rmk_version() - self.install_rmk() - - def verify_rmk_version(self): - print("Verifying RMK installation version...") - if self.version != "latest": - if version.parse('v0.45.2') > version.parse(self.version): - raise Exception(f"version {self.version} of RMK is not correct, " + - "the version for RMK must be at least v0.45.2 or greater") - - def install_rmk(self): - print("Installing RMK.") - try: - response = requests.get(self.url) - response.raise_for_status() - except requests.RequestException as err: - raise Exception(f"downloading RMK installer file:\n{err}") - - try: - subprocess.run( - ["bash", "-s", "--", self.version], - check=True, - text=True, - input=response.text - ) - except subprocess.CalledProcessError as err: - raise Exception(f"installing RMK:\n{err}")