diff --git a/data-collection/deploy/deploy-in-linked-account.yaml b/data-collection/deploy/deploy-in-linked-account.yaml index e86b8776..da96e9dc 100644 --- a/data-collection/deploy/deploy-in-linked-account.yaml +++ b/data-collection/deploy/deploy-in-linked-account.yaml @@ -414,6 +414,8 @@ Resources: - "servicequotas:GetServiceQuota" - "servicequotas:GetAWSDefaultServiceQuota" - "servicequotas:ListServices" + - "servicequotas:ListServiceQuotas" + - "servicequotas:ListAWSDefaultServiceQuotas" - "rds:DescribeAccountAttributes" - "elasticloadbalancing:DescribeAccountLimits" - "dynamodb:DescribeLimits" @@ -422,6 +424,8 @@ Resources: - "autoscaling:DescribeAccountLimits" - "route53:GetAccountLimit" - "datapipeline:GetAccountLimits" + - "cloudwatch:GetMetricStatistics" + - "cloudwatch:GetMetricData" Resource: "*" # Wildcard required as actions do not support resource-level permissions Roles: - Ref: LambdaRole diff --git a/data-collection/deploy/module-service-quotas.yaml b/data-collection/deploy/module-service-quotas.yaml index 2e607334..bbb84b5c 100644 --- a/data-collection/deploy/module-service-quotas.yaml +++ b/data-collection/deploy/module-service-quotas.yaml @@ -135,45 +135,60 @@ Resources: Properties: FunctionName: !Sub '${ResourcePrefix}${CFDataName}-Lambda' Description: !Sub "Lambda function to retrieve ${CFDataName}" - Runtime: python3.12 + Runtime: python3.13 Architectures: [x86_64] Code: ZipFile: | import os import json import logging - from datetime import date, datetime - + from datetime import date, datetime, timedelta import boto3 + from botocore.config import Config + import concurrent.futures + import threading + import time BUCKET = os.environ['BUCKET_NAME'] ROLE_NAME = os.environ['ROLE_NAME'] MODULE_NAME = os.environ['MODULE_NAME'] REGIONS = [r.strip() for r in os.environ['REGIONS'].split(',') if r] + CLOUDWATCH_BATCH_SIZE = 400 + PARALLEL_WORKERS = 5 + config = Config(retries={'mode': 'adaptive','max_attempts': 10}) logger = logging.getLogger(__name__) logger.setLevel(getattr(logging, os.environ.get('LOG_LEVEL', 'INFO').upper(), logging.INFO)) + log_lock = threading.Lock() - def lambda_handler(event, context): #pylint: disable=unused-argument - logger.info(f"Incoming event: {json.dumps(event)}") - key = "account" - if key not in event: - logger.error(f"Lambda event parameter '{key}' not defined (fatal) in {MODULE_NAME} module. Please do not trigger this Lambda manually. " - f"Find the corresponding {MODULE_NAME} state machine in Step Functions and trigger from there." - ) - raise RuntimeError(f"(MissingParameterError) Lambda event missing '{key}' parameter") + def thread_safe_log(message, level=logging.INFO): + with log_lock: + if level == logging.INFO: + logger.info(message) + elif level == logging.ERROR: + logger.error(message) + elif level == logging.WARNING: + logger.warning(message) - account = json.loads(event[key]) + def lambda_handler(event, context): + thread_safe_log(f"Incoming event: {json.dumps(event)}") + + if "account" not in event: + raise RuntimeError("(MissingParameterError) Lambda event missing 'account' parameter") + + account = json.loads(event["account"]) if isinstance(event["account"], str) else event["account"] + regions = event.get("regions", REGIONS) + payload = event.get("payload", []) + try: - main(account, ROLE_NAME, MODULE_NAME, BUCKET, REGIONS) - except Exception as exc: #pylint: disable=broad-exception-caught - logger.error(f'Error in account {account}: {exc}') - return { - 'statusCode': 200 - } + main(account, ROLE_NAME, MODULE_NAME, BUCKET, regions, payload) + except Exception as exc: + thread_safe_log(f'Error in account {account}: {exc}', logging.ERROR) + raise + + return {'statusCode': 200} def get_session_with_role(role_name, account_id): - logger.debug(f"Assuming role '{role_name}' in account '{account_id}'") credentials = boto3.client('sts').assume_role( RoleArn=f"arn:aws:iam::{account_id}:role/{role_name}", RoleSessionName="data_collection" @@ -185,70 +200,301 @@ Resources: ) def to_json(obj): - return json.dumps( - obj, - default=lambda x: - x.isoformat() if isinstance(x, (date, datetime)) else None - ) + return json.dumps(obj, default=lambda x: x.isoformat() if isinstance(x, (date, datetime)) else None) + + def get_all_services(client): + start_time = datetime.now() + services = [] + paginator = client.get_paginator('list_services') + for page in paginator.paginate(): + services.extend(page['Services']) + duration = (datetime.now() - start_time).total_seconds() + thread_safe_log(f"API:list_services took {duration:.2f}s and found {len(services)} services") + return services + + def get_service_quotas(client, service_code): + start_time = datetime.now() + quotas = [] + max_retries = 20 + backoff_time = 1 + + paginator = client.get_paginator('list_service_quotas') + for retry in range(max_retries): + try: + for page in paginator.paginate(ServiceCode=service_code, PaginationConfig={'PageSize': 100}): + quotas.extend(page['Quotas']) + break + except client.exceptions.TooManyRequestsException: + if retry == max_retries - 1: + thread_safe_log(f"Maximum retries reached for {service_code}, giving up", logging.ERROR) + raise + sleep_time = backoff_time + (0.1 * retry * (0.5 + 0.5 * (time.time() % 1))) + thread_safe_log(f"Rate limited when getting quotas for {service_code}, retry {retry+1}/{max_retries}", logging.WARNING) + time.sleep(sleep_time) + backoff_time *= 2 + + # Get default values + default_quotas = {} + default_paginator = client.get_paginator('list_aws_default_service_quotas') + for retry in range(max_retries): + try: + for page in default_paginator.paginate(ServiceCode=service_code, PaginationConfig={'PageSize': 100}): + for quota in page.get('Quotas', []): + if 'QuotaCode' in quota and 'Value' in quota: + default_quotas[quota['QuotaCode']] = quota['Value'] + break + except client.exceptions.TooManyRequestsException: + if retry == max_retries - 1: + thread_safe_log(f"Maximum retries reached for default quotas of {service_code}, giving up", logging.ERROR) + break + sleep_time = backoff_time + (0.1 * retry * (0.5 + 0.5 * (time.time() % 1))) + thread_safe_log(f"Rate limited when getting default quotas for {service_code}, retry {retry+1}/{max_retries}", logging.WARNING) + time.sleep(sleep_time) + backoff_time *= 2 + + # Apply defaults + for quota in quotas: + if quota.get('QuotaCode') in default_quotas: + quota['DefaultValue'] = default_quotas[quota['QuotaCode']] + + duration = (datetime.now() - start_time).total_seconds() + thread_safe_log(f"Total processing time for {service_code}: {duration:.2f}s") + return quotas + + def process_service_worker(args): + service, i, total_services, session, region = args + service_code = service['ServiceCode'] + service_name = service['ServiceName'] + thread_safe_log(f"[{i}/{total_services}] Processing {service_name} ({service_code})") + + service_quotas_client = session.client('service-quotas', region_name=region, config=config) + + try: + quotas = get_service_quotas(service_quotas_client, service_code) + for quota in quotas: + quota['ServiceCode'] = service_code + quota['ServiceName'] = service_name + thread_safe_log(f"Processed {len(quotas)} quotas for {service_name}") + return quotas + except Exception as e: + thread_safe_log(f"Error processing {service_name}: {str(e)}", logging.ERROR) + return [] + + def prepare_cloudwatch_metrics_worker(quotas_chunk): + metrics_to_query = [] + for quota in quotas_chunk: + if 'UsageMetric' in quota: + usage_metric = quota['UsageMetric'] + if not all(key in usage_metric for key in ['MetricNamespace', 'MetricName']): + continue + + statistic = usage_metric.get('MetricStatisticRecommendation', 'Maximum') + metric_query = { + 'quota': quota, + 'metric_data': { + 'Id': f"m_{len(metrics_to_query)}", + 'MetricStat': { + 'Metric': { + 'Namespace': usage_metric['MetricNamespace'], + 'MetricName': usage_metric['MetricName'], + }, + 'Period': 3600, + 'Stat': statistic, + }, + } + } + + if 'MetricDimensions' in usage_metric: + dimensions = [] + for key, value in usage_metric['MetricDimensions'].items(): + dimensions.append({'Name': key, 'Value': value}) + if dimensions: + metric_query['metric_data']['MetricStat']['Metric']['Dimensions'] = dimensions + + metrics_to_query.append(metric_query) + + return metrics_to_query - def main(account, role_name, module_name, bucket, regions): # pylint: disable=too-many-locals + def get_cloudwatch_metrics_batch(batch_args): + batch_index, batch, start_time, end_time, session = batch_args + cloudwatch_client = session.client('cloudwatch', config=config) + + metric_data_queries = [item['metric_data'] for item in batch] + metric_id_to_quota = {item['metric_data']['Id']: item['quota'] for item in batch} + + try: + response = cloudwatch_client.get_metric_data( + MetricDataQueries=metric_data_queries, + StartTime=start_time, + EndTime=end_time + ) + + results = {} + for result in response.get('MetricDataResults', []): + metric_id = result['Id'] + values = result.get('Values', []) + value = max(values) if values else None + + if metric_id in metric_id_to_quota: + quota = metric_id_to_quota[metric_id] + quota_key = f"{quota['ServiceCode']}:{quota['QuotaCode']}" + results[quota_key] = value + + thread_safe_log(f"Processed CloudWatch batch {batch_index + 1}") + return results + except Exception as e: + thread_safe_log(f"Error getting CloudWatch metrics batch {batch_index + 1}: {str(e)}", logging.ERROR) + return {} + + def enrich_quota_worker(quota_args): + quota, usage_metrics = quota_args + enriched_quota = { + 'ServiceCode': quota.get('ServiceCode'), + 'ServiceName': quota.get('ServiceName'), + 'QuotaCode': quota.get('QuotaCode', 'unknown'), + 'QuotaName': quota.get('QuotaName', 'unknown'), + 'CurrentValue': quota.get('Value'), + 'DefaultValue': quota.get('DefaultValue'), + 'Unit': quota.get('Unit', ''), + 'Adjustable': quota.get('Adjustable', False), + 'GlobalQuota': quota.get('GlobalQuota'), + 'QuotaArn': quota.get('QuotaArn', ''), + 'QuotaAppliedAtLevel': quota.get('QuotaAppliedAtLevel', '') + } + + if 'UsageMetric' in quota: + enriched_quota['UsageMetric'] = quota['UsageMetric'] + quota_key = f"{quota['ServiceCode']}:{quota['QuotaCode']}" + if quota_key in usage_metrics and usage_metrics[quota_key] is not None: + enriched_quota['CurrentUsage'] = usage_metrics[quota_key] + + if 'Period' in quota: + enriched_quota['Period'] = quota['Period'] + + return enriched_quota + + def main(account, role_name, module_name, bucket, regions, payload): s3_client = boto3.client("s3") account_id = account["account_id"] payer_id = account["payer_id"] session = get_session_with_role(role_name, account_id) - - for region in regions: - logger.info(f"Processing region {region} for account {account_id}") + collection_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + for region in [regions[0]]: # Only first element + thread_safe_log(f"Processing region {region} for account {account_id}") try: - quotas_client = session.client("service-quotas", region_name=region) - logger.debug(f"Start looping through services in {region}") - quota_history = list( - quotas_client - .get_paginator('list_requested_service_quota_change_history') - .paginate() - .search("RequestedQuotas") - ) - if not quota_history: - logger.debug(f"No change history in {region}") - continue - # Store history - history_key = f'{module_name}/{module_name}-history/payer_id={payer_id}/account_id={account_id}/region={region}/history.json' - s3_client.put_object( - Bucket=bucket, - Key=history_key, - Body="\n".join([to_json(item) for item in quota_history]), - ContentType='application/json' - ) - logger.info(f"Uploaded {len(quota_history)} history records for {region} to s3://{BUCKET}/{history_key}") - - # Store current quotas - json_lines_quota = [] - for item in quota_history: - try: - quota_result = quotas_client.get_service_quota( - ServiceCode=item['ServiceCode'], - QuotaCode=item['QuotaCode'] - )['Quota'] - quota_result['DefaultValue'] = quotas_client.get_aws_default_service_quota( - ServiceCode=item['ServiceCode'], - QuotaCode=item['QuotaCode'] - )['Quota']['Value'] - json_lines_quota.append(to_json(quota_result)) - except Exception as e: - logger.error(f"Error getting quota for {item['ServiceCode']}/{item['QuotaCode']}: {e}") - continue - - if json_lines_quota: - quota_key = f'{MODULE_NAME}/{MODULE_NAME}-data/payer_id={payer_id}/account_id={account_id}/region={region}/quotas.json' + quotas_client = session.client("service-quotas", region_name=region, config=config) + + # Process history (unchanged) + quota_history = [] + paginator = quotas_client.get_paginator('list_requested_service_quota_change_history') + for page in paginator.paginate(): + quota_history.extend(page.get('RequestedQuotas', [])) + + if quota_history: + # Add collection_date to history records + for item in quota_history: + item['collection_date'] = collection_date + + history_key = f'{module_name}/{module_name}-history/payer_id={payer_id}/account_id={account_id}/region={region}/history.json' + s3_client.put_object( + Bucket=bucket, + Key=history_key, + Body="\n".join([to_json(item) for item in quota_history]), + ContentType='application/json' + ) + thread_safe_log(f"Uploaded {len(quota_history)} history records") + + # Get all services and process in parallel + services = get_all_services(quotas_client) + total_services = len(services) + thread_safe_log(f"Found {total_services} services to check") + + service_args = [(service, i, total_services, session, region) for i, service in enumerate(services, 1)] + + all_quotas = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: + futures = {executor.submit(process_service_worker, arg): arg for arg in service_args} + for future in concurrent.futures.as_completed(futures): + service_quotas = future.result() + all_quotas.extend(service_quotas) + + thread_safe_log(f"Collected {len(all_quotas)} total quotas across all services") + + # Get CloudWatch metrics if payload specified + usage_metrics = {} + if payload: + # Check if payload contains "*" for all services + if "*" in payload: + filtered_quotas = all_quotas + else: + # Filter quotas for specified services + filtered_quotas = [q for q in all_quotas if q.get('ServiceCode') in payload] + + if filtered_quotas: + # Prepare CloudWatch metric queries in parallel + chunk_size = max(1, len(filtered_quotas) // PARALLEL_WORKERS) + quota_chunks = [filtered_quotas[i:i + chunk_size] for i in range(0, len(filtered_quotas), chunk_size)] + + all_metrics = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_WORKERS) as executor: + futures = list(executor.map(prepare_cloudwatch_metrics_worker, quota_chunks)) + for metrics in futures: + all_metrics.extend(metrics) + + # Fix IDs to ensure uniqueness + for i, metric in enumerate(all_metrics): + metric['metric_data']['Id'] = f"m_{i}" + + # Get CloudWatch metrics in batches + end_time = datetime.utcnow() + query_start_time = end_time - timedelta(hours=24) + + batches = [] + for i in range(0, len(all_metrics), CLOUDWATCH_BATCH_SIZE): + batches.append(all_metrics[i:i + CLOUDWATCH_BATCH_SIZE]) + + batch_args = [(i, batch, query_start_time, end_time, session) for i, batch in enumerate(batches)] + + with concurrent.futures.ThreadPoolExecutor(max_workers=min(PARALLEL_WORKERS, len(batches))) as executor: + futures = list(executor.map(get_cloudwatch_metrics_batch, batch_args)) + for batch_results in futures: + usage_metrics.update(batch_results) + + thread_safe_log(f"Retrieved {len(usage_metrics)} CloudWatch metric values") + + # Enrich quotas in parallel + args_list = [(quota, usage_metrics) for quota in all_quotas] + enriched_quotas = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=PARALLEL_WORKERS) as executor: + futures = list(executor.map(enrich_quota_worker, args_list)) + enriched_quotas.extend(futures) + + # Add collection_date to all enriched quotas + for quota in enriched_quotas: + quota['collection_date'] = collection_date + + # Deduplicate quotas + deduplicated_quotas = {} + for quota in enriched_quotas: + key = f"{quota['ServiceCode']}:{quota['QuotaCode']}" + deduplicated_quotas[key] = quota + + final_quotas = list(deduplicated_quotas.values()) + + # Store enriched quotas + if final_quotas: + quota_key = f'{module_name}/{module_name}-data/payer_id={payer_id}/account_id={account_id}/region={region}/quotas.json' s3_client.put_object( - Bucket=BUCKET, + Bucket=bucket, Key=quota_key, - Body="\n".join(json_lines_quota), + Body="\n".join([to_json(quota) for quota in final_quotas]), ContentType='application/json' ) - logger.info(f"Uploaded {len(json_lines_quota)} quota records for {region} to s3://{BUCKET}/{quota_key}") + thread_safe_log(f"Uploaded {len(final_quotas)} enriched and deduplicated quota records") + except Exception as e: - logger.error(f"Error processing region {region} for account {account_id}: {e}") + thread_safe_log(f"Error processing region {region}: {e}", logging.ERROR) continue Handler: 'index.lambda_handler' @@ -342,4 +588,4 @@ Resources: Type: Custom::LambdaAnalyticsExecutor Properties: ServiceToken: !Ref LambdaAnalyticsARN - Name: !Ref CFDataName + Name: !Ref CFDataName \ No newline at end of file