Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions ecs_composex/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,16 @@ def main():
root_stack = generate_full_template(settings)
process_stacks(root_stack, settings)

if settings.deploy:
deploy(settings, root_stack)
elif settings.plan:
plan(settings, root_stack, apply=args.apply, cleanup=args.cleanup)
try:
if settings.deploy:
deploy(settings, root_stack)
return 0
elif settings.plan:
plan(settings, root_stack, apply=args.apply, cleanup=args.cleanup)
except Exception as error:
LOG.error("Failed to execute the command successfully")
LOG.exception(error)
return 2
return 0


Expand Down
194 changes: 104 additions & 90 deletions ecs_composex/common/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,18 @@
"""
Common functions and variables fetched from AWS.
"""
from __future__ import annotations

from typing import TYPE_CHECKING, Union

if TYPE_CHECKING:
from boto3.session import Session
from ecs_composex.common.settings import ComposeXSettings
from ecs_composex.common.stacks import ComposeXStack

import re
import secrets
from copy import deepcopy
from string import ascii_lowercase
from datetime import datetime as dt
from time import sleep

from botocore.exceptions import ClientError
Expand All @@ -20,16 +28,11 @@
from ecs_composex.iam import ROLE_ARN_ARG


def get_cross_role_session(session, arn, region_name=None, session_name=None):
def get_cross_role_session(
session: Session, arn: str, region_name: str = None, session_name: str = None
) -> Session:
"""
Function to override ComposeXSettings session to specific session for Lookup

:param boto3.session.Session session: The original session fetching the credentials for X-Role
:param str arn:
:param str region_name: Name of region for session
:param str session_name: Override name of the session
:return: boto3 session from lookup settings
:rtype: boto3.session.Session
"""
if not session_name:
session_name = "ComposeX@Lookup"
Expand All @@ -42,14 +45,9 @@ def get_cross_role_session(session, arn, region_name=None, session_name=None):
raise


def define_lookup_role_from_info(info, session):
def define_lookup_role_from_info(info: dict, session: Session) -> Session:
"""
Function to override ComposeXSettings session to specific session for Lookup

:param info:
:param session:
:return: boto3 session from lookup settings
:rtype: boto3.session.Session
"""
if not keyisset(ROLE_ARN_ARG, info):
return session
Expand Down Expand Up @@ -77,13 +75,9 @@ def set_filters_from_tags_list(tags: list) -> list:
return filters


def define_tagsgroups_filter_tags(tags) -> list:
def define_tagsgroups_filter_tags(tags: list[dict]) -> list:
"""
Function to create the filters out of tags list

:param list tags: list of Key/Value dict
:return: filters
:rtype: list
"""
if isinstance(tags, list):
return set_filters_from_tags_list(tags)
Expand All @@ -100,13 +94,11 @@ def define_tagsgroups_filter_tags(tags) -> list:
raise TypeError("Tags must be one of", [list, dict], "Got", type(tags))


def get_resources_from_tags(session, aws_resource_search, search_tags):
def get_resources_from_tags(
session: Session, aws_resource_search: str, search_tags: list
) -> Union[dict, None]:
"""

:param boto3.session.Session session: The boto3 session for API calls
:param str aws_resource_search: AWS Service short code, ie. rds, ec2
:param list search_tags: The tags to search the resource with.
:return:
Function to retrieve AWS Resources ARNs from the tags using the Resource Groups Tagging API
"""
try:
client = session.client("resourcegroupstaggingapi")
Expand All @@ -120,16 +112,14 @@ def get_resources_from_tags(session, aws_resource_search, search_tags):
return None


def handle_multi_results(arns, name, res_type, regexp, allow_multi=False):
def handle_multi_results(
arns: list[str], name: str, res_type: str, regexp: str, allow_multi: bool = False
) -> Union[str, list[str]]:
"""
Function to evaluate more than one result to see if we can match an unique name.
Function to evaluate more than one result to see if we can match a unique name.

:param list arns:
:param str name:
:param str res_type:
:param str regexp:
:raises LookupError:
:return: The ARN of the resource matching the name.
:return: The ARN(s) of the resource matching the name. Supports to return multiple ARNs
"""
found = 0
found_arn = None
Expand Down Expand Up @@ -161,16 +151,15 @@ def handle_multi_results(arns, name, res_type, regexp, allow_multi=False):


def handle_search_results(
arns, name, res_types, aws_resource_search, allow_multi=False
):
arns: list[str],
name: str,
res_types,
aws_resource_search: str,
allow_multi: bool = False,
) -> Union[str, list[str]]:
"""
Function to parse tag resource search results

:param list arns:
:param str name:
:param dict res_types:
:param str aws_resource_search:
:return:
"""
if not arns:
raise LookupError(
Expand All @@ -190,13 +179,11 @@ def handle_search_results(
return arns[0]


def validate_search_input(res_types, res_type):
def validate_search_input(res_types: dict, res_type: str) -> None:
"""
Function to validate the search query

:param dict res_types:
:param str res_type:
:return:
:raises: KeyError
"""

if not isinstance(res_type, str):
Expand All @@ -209,8 +196,12 @@ def validate_search_input(res_types, res_type):


def find_aws_resource_arn_from_tags_api(
info, session, aws_resource_search, types=None, allow_multi=False
):
info: dict,
session: Session,
aws_resource_search: str,
types: dict = None,
allow_multi: bool = False,
) -> Union[str, list[str]]:
"""
Function to find the RDS DB based on info

Expand All @@ -231,25 +222,30 @@ def find_aws_resource_arn_from_tags_api(
resources_r = get_resources_from_tags(session, aws_resource_search, search_tags)
LOG.debug(search_tags)
if not resources_r or not keyisset("ResourceTagMappingList", resources_r):
arns = []
resource_arns = []
else:
arns = [i["ResourceARN"] for i in resources_r["ResourceTagMappingList"]]
resource_arns = [
i["ResourceARN"] for i in resources_r["ResourceTagMappingList"]
]
return handle_search_results(
arns, name, res_types, aws_resource_search, allow_multi=allow_multi
resource_arns, name, res_types, aws_resource_search, allow_multi=allow_multi
)


def assert_can_create_stack(client, name):
def assert_can_create_stack(client, name: str) -> bool:
"""
Checks whether a stack already exists or not

:raises: LookupError
:raises: ClientError
"""
try:
stack_r = client.describe_stacks(StackName=name)
if not keyisset("Stacks", stack_r):
return True
stacks = stack_r["Stacks"]
if len(stacks) != 1:
raise LookupError("Too many stacks found with machine name", name)
raise LookupError("Too many stacks found with stack name", name)
stack = stacks[0]
if stack["StackStatus"] == "REVIEW_IN_PROGRESS":
return stack
Expand All @@ -263,7 +259,7 @@ def assert_can_create_stack(client, name):
raise error


def assert_can_update_stack(client, name):
def assert_can_update_stack(client, name) -> bool:
"""
Checks whether a stack already exists or not
"""
Expand All @@ -283,12 +279,13 @@ def assert_can_update_stack(client, name):
return False


def validate_stack_availability(settings, root_stack):
def validate_can_deploy_stack_from_settings(
settings: ComposeXSettings, root_stack: ComposeXStack
) -> None:
"""
Function to check that the stack can be updated
:param settings:
:param root_stack:
:return:

:raises: ValueError
"""
if not settings.upload:
raise RuntimeError(
Expand All @@ -301,14 +298,11 @@ def validate_stack_availability(settings, root_stack):
)


def deploy(settings, root_stack):
def deploy(settings: ComposeXSettings, root_stack: ComposeXStack) -> Union[str, None]:
"""
Function to deploy (create or update) the stack to CFN.
:param ComposeXSettings settings:
:param ComposeXStack root_stack:
:return:
"""
validate_stack_availability(settings, root_stack)
validate_can_deploy_stack_from_settings(settings, root_stack)
client = settings.session.client("cloudformation")
if assert_can_create_stack(client, settings.name):
res = client.create_stack(
Expand Down Expand Up @@ -336,16 +330,28 @@ def deploy(settings, root_stack):
return None


def get_change_set_status(client, change_set_name, settings):
def get_change_set_status(
client, change_set_name: str, settings: ComposeXSettings
) -> str:
"""
Function to determine whether we can create a new changeset.

If it already exists in a failed status, we raise an exception to report we cannot go forward until user fixes it
in their AWS account.
If the changeset already exists, in a pending state, we wait for it to get to a ready status.
If the changeset already exists, in a ready status, we dump a display of expected changes and return the status.

"""
pending_statuses = [
"CREATE_PENDING",
"CREATE_IN_PROGRESS",
"DELETE_PENDING",
"DELETE_IN_PROGRESS",
"REVIEW_IN_PROGRESS",
"UPDATE_ROLLBACK_IN_PROGRESS",
]
success_statuses = ["CREATE_COMPLETE", "DELETE_COMPLETE"]
failed_statuses = ["DELETE_FAILED", "FAILED"]
failed_statuses = ["DELETE_FAILED", "FAILED", "UPDATE_ROLLBACK_FAILED"]
ready = False
status = None
while not ready:
Expand Down Expand Up @@ -381,20 +387,18 @@ def get_change_set_status(client, change_set_name, settings):
return status


def plan(settings, root_stack, apply=None, cleanup=None):
def plan(
settings: ComposeXSettings,
root_stack: ComposeXStack,
apply: bool = None,
cleanup: bool = None,
) -> None:
"""
Function to create a recursive change-set and return diffs
:param ComposeXSettings settings:
:param ComposeXStack root_stack:
:param apply: Optional[bool] - Whether to apply the change-set (True/False). Default is None (prompt user).
:param cleanup: Optional[bool] - Whether to clean up the change-set (True/False). Default is None (prompt user).
:return:
"""
validate_stack_availability(settings, root_stack)
validate_can_deploy_stack_from_settings(settings, root_stack)
client = settings.session.client("cloudformation")
change_set_name = f"{settings.name}" + "".join(
secrets.choice(ascii_lowercase) for _ in range(10)
)
change_set_name = f"{settings.name}" + "_ecs_compose_x_" + dt.now().strftime("%s")
if assert_can_create_stack(client, settings.name) or assert_can_update_stack(
client, settings.name
):
Expand All @@ -410,20 +414,30 @@ def plan(settings, root_stack, apply=None, cleanup=None):
)
status = get_change_set_status(client, change_set_name, settings)
if status:
if apply is None:
apply_q = input("Want to apply? [yN]: ")
apply = apply_q.lower() in ["y", "yes"]

if apply:
client.execute_change_set(
ChangeSetName=change_set_name,
StackName=settings.name,
DisableRollback=settings.disable_rollback,
)
else:
if cleanup is None:
delete_q = input("Cleanup ChangeSet ? [yN]: ")
cleanup = delete_q.lower() in ["y", "yes"]
plan_user_input(settings, client, change_set_name, apply, cleanup)


def plan_user_input(
settings: ComposeXSettings,
client,
change_set_name: str,
apply: bool = None,
cleanup: bool = None,
) -> None:
if apply is None:
apply_q = input("Want to apply? [yN]: ")
apply = apply_q.lower() in ["y", "yes"]

if apply:
client.execute_change_set(
ChangeSetName=change_set_name,
StackName=settings.name,
DisableRollback=settings.disable_rollback,
)
else:
if cleanup is None:
delete_q = input("Cleanup ChangeSet ? [yN]: ")
cleanup = delete_q.lower() in ["y", "yes"]

if cleanup:
client.delete_stack(StackName=settings.name)
if cleanup:
client.delete_stack(StackName=settings.name)