From 7d9923651cde9a1c2115f1a687f1d9f021ca613d Mon Sep 17 00:00:00 2001 From: Anuj Sharma Date: Tue, 11 Nov 2025 12:57:07 -0800 Subject: [PATCH 1/7] Refactor deployment pipeline and add Elasticsearch observability - Move Docker image building from app-template-generator to TemplateStorageStack - Add Elasticsearch-based observability integration with new elastic.py module - Remove CloudFormation deployment service in favor of unified deployment approach - Update UI components and configuration models to support new deployment flow - Enhance stack definitions with force deployment capabilities --- app-template-generator.py | 155 +----- .../common/custom_bedrock_provider.py | 91 ++-- .../common/memory/elasticsearch.py | 215 +++----- .../common/observability/__init__.py | 32 +- .../common/observability/elastic.py | 236 ++++++++ .../configuration-api/app/api/config.py | 77 ++- .../configuration-api/app/api/deployment.py | 154 +++--- .../app/models/agent_config.py | 8 + .../app/models/form_schema.py | 89 +-- .../app/models/ssm_data_models.py | 17 +- .../app/services/agent_config_service.py | 234 +++++++- .../cloudformation_deployment_service.py | 514 ------------------ .../app/services/deployment_service.py | 508 +++++++++++++---- .../app/services/parameter_initialization.py | 4 +- .../app/utils/dependencies.py | 24 - .../ui-react-cloudscape/package-lock.json | 21 + .../ui-react-cloudscape/package.json | 2 + application_src/ui-react-cloudscape/server.js | 95 +++- .../src/components/AgentMapping.js | 78 ++- .../src/components/AgentWizard.js | 145 +++-- .../src/components/ChatInterface.js | 108 ++-- .../src/services/configuration.js | 21 + stacks/common/base.py | 40 +- stacks/multi_agent/stack.py | 150 ++++- stacks/template_storage/stack.py | 99 +++- 25 files changed, 1860 insertions(+), 1257 deletions(-) create mode 100644 application_src/common/observability/elastic.py delete mode 100644 application_src/configuration-api/app/services/cloudformation_deployment_service.py diff --git a/app-template-generator.py b/app-template-generator.py index 98dda69..764fe7b 100644 --- a/app-template-generator.py +++ b/app-template-generator.py @@ -1,118 +1,31 @@ #!/usr/bin/env python3 """ CDK app for generating agent template without deployment. -This app synthesizes the MultiAgentStack as a standalone CloudFormation template, -builds and pushes the Docker image to ECR, and uploads everything to S3. +This app synthesizes the MultiAgentStack as a standalone CloudFormation template +and prepares it for upload to S3 during deployment. + +The Docker image build is now handled by TemplateStorageStack during deployment, +following the same pattern as template upload to S3. This runs as a pre-hook before 'cdk deploy' to ensure the template -is always up-to-date with the stack definition and available in S3. +is always up-to-date with the stack definition. """ import os import json import boto3 import sys -import shlex -import subprocess -from aws_cdk import App, Fn, Environment, Stack -from aws_cdk import aws_ec2 as ec2, aws_ecs as ecs, aws_s3 as s3, aws_ecr_assets as ecr_assets +import logging +from aws_cdk import App, Fn, Environment from stacks.multi_agent.stack import MultiAgentStack from helper.config import Config - -def build_and_push_agent_image(app: App, account_id: str, region: str) -> str: - """ - Build and push the agent Docker image to ECR using direct Docker commands. - - Args: - app: CDK app instance - account_id: AWS account ID - region: AWS region - - Returns: - ECR image URI with stable tag - """ - print("\n🐳 Building and pushing agent Docker image to ECR...") - - import hashlib - - # Use stable tag based on directory content hash - ecr_repo = f"{account_id}.dkr.ecr.{region}.amazonaws.com/cdk-hnb659fds-container-assets-{account_id}-{region}" - stable_tag = "agent-instance-latest" - image_uri = f"{ecr_repo}:{stable_tag}" - - # Check if Docker/Podman is available - use safe validated default - docker_executable = os.environ.get('CDK_DOCKER', 'docker') - - # SECURITY: Validate docker executable to prevent command injection - allowed_executables = ['docker', 'podman'] - if docker_executable not in allowed_executables: - raise ValueError(f"Invalid container runtime specified: {docker_executable}. Allowed: {allowed_executables}") - - try: - # 1. Login to ECR - separate password retrieval and login for security - print(f" Logging in to ECR...") - - # Get ECR password securely - validate region parameter - if not region or not region.replace('-', '').replace('_', '').isalnum(): - raise ValueError(f"Invalid AWS region: {region}") - - password_result = subprocess.run( - ['aws', 'ecr', 'get-login-password', '--region', region], - check=True, - capture_output=True, - text=True - ) - - # Login to ECR with password from stdin - validate parameters - if not account_id or not account_id.isdigit() or len(account_id) != 12: - raise ValueError(f"Invalid AWS account ID: {account_id}") - - ecr_endpoint = f"{account_id}.dkr.ecr.{region}.amazonaws.com" - - subprocess.run( - [docker_executable, 'login', '--username', 'AWS', '--password-stdin', ecr_endpoint], - input=password_result.stdout, - check=True, - capture_output=True, - text=True - ) - - # 2. Build image - validate parameters - print(f" Building Docker image...") - if not image_uri or '://' in image_uri.split(':')[0]: # Basic URI validation - raise ValueError(f"Invalid image URI: {image_uri}") - - subprocess.run( - [docker_executable, 'build', - '-t', image_uri, - '--platform', 'linux/arm64', - '-f', 'application_src/multi-agent/agent-instance/Dockerfile', - 'application_src'], - check=True, - capture_output=True - ) - - # 3. Push to ECR - print(f" Pushing image to ECR...") - subprocess.run( - [docker_executable, 'push', image_uri], - check=True, - capture_output=True - ) - - print(f"āœ“ Docker image built and pushed to ECR: {image_uri}") - return image_uri - - except subprocess.CalledProcessError as e: - print(f"āš ļø Warning: Failed to build/push Docker image: {e}") - print(f" Using placeholder - manual push required") - return image_uri - - +logger = logging.getLogger(__name__) def main(): """Generate the template and save to file.""" + logger.info("Generating agent template...") + # Get actual AWS account and region try: sts = boto3.client('sts') @@ -125,10 +38,6 @@ def main(): # Create app app = App() - # Step 1: Build and push Docker image to ECR BEFORE creating the stack - # This ensures the image exists in ECR before the CloudFormation template references it - agent_image_uri = build_and_push_agent_image(app, account_id, region) - # Get configuration environment = app.node.try_get_context("environment") or os.environ.get("ENVIRONMENT", "development") conf = Config(environment=environment) @@ -136,7 +45,7 @@ def main(): # Get project name from config (should match app.py) project_name_from_config = conf.get('ProjectName') - print(f"\nšŸ“¦ Using CloudFormation exports from VPC stack...") + logger.info("Using CloudFormation exports from VPC stack...") # Import VPC ID using CloudFormation export token # The MultiAgentStack will handle importing the VPC and all related resources @@ -151,10 +60,10 @@ def main(): # Import VPC Lattice service network ARN using CloudFormation export service_network_arn = Fn.import_value(f"{project_name_from_config}-vpc:ExportsOutputFnGetAttservicenetworkArnD9BDB9C7") - print(f"āœ“ All infrastructure references use CloudFormation imports") - print(f" - VPC ID: {{Fn::ImportValue: {project_name_from_config}-VpcId}}") - print(f" - Cluster: {{Fn::ImportValue: {project_name_from_config}-ClusterName}}") - print(f" - Bucket: {{Fn::ImportValue: {project_name_from_config}-AccessLogBucketName}}") + logger.info("All infrastructure references use CloudFormation imports") + logger.debug(f" - VPC ID: {{Fn::ImportValue: {project_name_from_config}-VpcId}}") + logger.debug(f" - Cluster: {{Fn::ImportValue: {project_name_from_config}-ClusterName}}") + logger.debug(f" - Bucket: {{Fn::ImportValue: {project_name_from_config}-AccessLogBucketName}}") # Create the agent stack - MultiAgentStack will import all resources internally # Pass tokens directly - no boto3 queries needed! @@ -183,25 +92,17 @@ def main(): with open(template_path, 'r', encoding='utf-8') as f: template_content = json.load(f) - # Replace Docker image reference with stable tag - task_def = template_content['Resources']['agenttaskdefinitionF56FAA50'] - container = task_def['Properties']['ContainerDefinitions'][0] - - # Update image to use stable tag - container['Image'] = { - 'Fn::Sub': f"{account_id}.dkr.ecr.{region}.${{AWS::URLSuffix}}/cdk-hnb659fds-container-assets-{account_id}-{region}:agent-instance-latest" - } - - print(f"āœ“ Updated template to use stable Docker image tag: agent-instance-latest") + # Log that template uses ImageTag parameter (will be updated by TemplateStorageStack) + logger.info("Template uses ImageTag parameter (default will be updated during deployment)") # Save to standard location for easy access output_path = "cdk.out/GenericAgentTemplate.json" with open(output_path, 'w', encoding='utf-8') as f: json.dump(template_content, f, indent=2) - print(f"āœ“ Template generated: {output_path}") - print(f" Stack name: {agent_stack.stack_name}") - print(f" Template size: {len(json.dumps(template_content))} bytes") + logger.info(f"Template generated: {output_path}") + logger.debug(f" Stack name: {agent_stack.stack_name}") + logger.debug(f" Template size: {len(json.dumps(template_content))} bytes") # List resources for reference if 'Resources' in template_content: @@ -211,18 +112,18 @@ def main(): rtype = resource.get('Type', 'Unknown') resource_types[rtype] = resource_types.get(rtype, 0) + 1 - print(f" Total resources: {resource_count}") - print(f" Resource breakdown:") + logger.debug(f" Total resources: {resource_count}") + logger.debug(" Resource breakdown:") for rtype, count in sorted(resource_types.items()): - print(f" - {rtype}: {count}") + logger.debug(f" - {rtype}: {count}") - # Note: Template will be automatically uploaded to S3 by TemplateStorageStack - # using CDK's BucketDeployment construct during 'cdk deploy' - print(f"\nšŸ“¤ Template will be uploaded to S3 by TemplateStorageStack during deployment") + logger.info("Template and Docker image will be deployed by TemplateStorageStack") + logger.debug(" - Template: Uploaded to S3 during deployment") + logger.debug(" - Docker Image: Built and pushed to ECR during deployment") return True else: - print(f"Error: Template not found at {template_path}") + logger.error(f"Template not found at {template_path}") return False diff --git a/application_src/common/custom_bedrock_provider.py b/application_src/common/custom_bedrock_provider.py index 87d858c..da4c497 100644 --- a/application_src/common/custom_bedrock_provider.py +++ b/application_src/common/custom_bedrock_provider.py @@ -103,9 +103,9 @@ def _initialize_client(self): tcp_keepalive=True # Enable TCP keepalive for long connections ) ) - logger.info(f"āœ… Custom Bedrock client initialized for {self.config['model_id']} in region {self.config['region']}") + logger.info(f"Custom Bedrock client initialized for {self.config['model_id']} in region {self.config['region']}") except Exception as e: - logger.warning(f"āŒ Failed to initialize custom Bedrock client: {e}") + logger.error(f"Failed to initialize custom Bedrock client: {e}") raise def _is_throttling_error(self, error: Exception) -> bool: @@ -128,7 +128,7 @@ def _is_throttling_error(self, error: Exception) -> bool: # Check error code first (most reliable) for code in throttling_codes: if code in error_code: - logger.warning(f"🚨 BEDROCK THROTTLING CODE: {error_code}") + logger.warning(f"Bedrock throttling detected - error code: {error_code}") return True # Check error message for throttling indicators @@ -140,7 +140,7 @@ def _is_throttling_error(self, error: Exception) -> bool: for msg in throttling_messages: if msg in error_message: - logger.warning(f"🚨 BEDROCK THROTTLING MESSAGE: {error_message}") + logger.warning(f"Bedrock throttling detected - message: {error_message}") return True return False @@ -167,16 +167,17 @@ def _is_connection_error(self, error: Exception) -> bool: for pattern in connection_errors: if pattern in error_message: - logger.warning(f"šŸ”Œ CONNECTION ERROR DETECTED: {error_message}") + logger.warning(f"Connection error detected: {error_message}") return True return False def _format_messages_for_bedrock(self, messages: Messages) -> list[dict[str, Any]]: - """Convert Strands Messages to Bedrock converse_stream format.""" + """Convert Strands Messages to Bedrock converse_stream format with validation.""" bedrock_messages = [] + tool_use_ids = set() # Track tool use IDs to validate results - for message in messages: + for msg_idx, message in enumerate(messages): role = message.get('role', 'user') content_blocks = message.get('content', []) @@ -190,9 +191,13 @@ def _format_messages_for_bedrock(self, messages: Messages) -> list[dict[str, Any elif isinstance(block, dict) and 'toolUse' in block: # Handle tool use blocks tool_use = block['toolUse'] + tool_use_id = tool_use.get('toolUseId') + if tool_use_id: + tool_use_ids.add(tool_use_id) + logger.debug(f"Registered tool use ID: {tool_use_id}") bedrock_content.append({ "toolUse": { - "toolUseId": tool_use.get('toolUseId'), + "toolUseId": tool_use_id, "name": tool_use.get('name'), "input": tool_use.get('input', {}) } @@ -200,12 +205,21 @@ def _format_messages_for_bedrock(self, messages: Messages) -> list[dict[str, Any elif isinstance(block, dict) and 'toolResult' in block: # Handle tool result blocks tool_result = block['toolResult'] + tool_use_id = tool_result.get('toolUseId') + + # Validate that this tool result matches a previous tool use + if tool_use_id not in tool_use_ids: + logger.warning(f"Tool result ID {tool_use_id} at message {msg_idx} doesn't match any previous tool use. Available IDs: {tool_use_ids}") + # Skip this invalid tool result to prevent ValidationException + continue + bedrock_content.append({ "toolResult": { - "toolUseId": tool_result.get('toolUseId'), + "toolUseId": tool_use_id, "content": tool_result.get('content', []) } }) + logger.debug(f"Validated tool result ID: {tool_use_id}") # Only add messages with actual content if bedrock_content: @@ -214,16 +228,17 @@ def _format_messages_for_bedrock(self, messages: Messages) -> list[dict[str, Any "content": bedrock_content }) else: - logger.warning(f"āš ļø Skipping empty message with role: {role}") + logger.warning(f"Skipping empty message with role: {role}") # Ensure we have at least one message if not bedrock_messages: - logger.warning("āš ļø No valid messages found, adding default message") + logger.warning("No valid messages found, adding default message") bedrock_messages.append({ "role": "user", "content": [{"text": "Hello"}] }) + logger.debug(f"Formatted {len(bedrock_messages)} messages with {len(tool_use_ids)} tool use IDs") return bedrock_messages def _format_request_body( @@ -296,9 +311,9 @@ async def stream( request = self._format_request_body(messages, system_prompt, tool_specs, **kwargs) if tool_specs: - logger.info(f"šŸ› ļø Custom Bedrock request includes {len(tool_specs)} tools") + logger.debug(f"Custom Bedrock request includes {len(tool_specs)} tools") - logger.info(f"šŸŽÆ Custom Bedrock streaming with {self.config['model_id']}") + logger.info(f"Custom Bedrock streaming with {self.config['model_id']}") # Use converse_stream API like official SDK response = await asyncio.get_event_loop().run_in_executor( @@ -315,10 +330,10 @@ async def stream( # This follows the same pattern as the official SDK yield chunk - logger.info(f"āœ… Custom Bedrock streaming completed for {self.config['model_id']}") + logger.info(f"Custom Bedrock streaming completed for {self.config['model_id']}") except Exception as e: - logger.error(f"🚨 Custom Bedrock streaming error: {type(e).__name__}: {e}") + logger.exception("Custom Bedrock streaming error") # Check if this is throttling and convert to Strands exception if self._is_throttling_error(e): @@ -326,7 +341,7 @@ async def stream( raise ModelThrottledException(f"Custom Bedrock throttling detected: {e}") from e elif self._is_connection_error(e): # Connection errors - treat as throttling to trigger model switching - logger.warning(f"šŸ”Œ Connection error treated as throttling: {e}") + logger.warning(f"Connection error treated as throttling: {e}") raise ModelThrottledException(f"Custom Bedrock connection error: {e}") from e else: # Re-raise non-throttling errors as-is @@ -415,7 +430,7 @@ def __init__(self, available_models: Optional[list[str]] = None, model_manager=N self.current_model_index = 0 self.model_cooldowns = {} # model_id -> cooldown_until_timestamp - logger.info(f"āœ… Model Switching Bedrock Provider initialized with {len(self.available_models)} models from shared config") + logger.info(f"Model Switching Bedrock Provider initialized with {len(self.available_models)} models from shared config") def get_next_available_model(self) -> Optional[str]: """ @@ -433,7 +448,7 @@ def get_next_available_model(self) -> Optional[str]: ] for model_id in expired_models: del self.model_cooldowns[model_id] - logger.info(f"šŸ”„ Model {model_id} cooldown expired") + logger.info(f"Model {model_id} cooldown expired") # Find next available model for i in range(len(self.available_models)): @@ -442,10 +457,10 @@ def get_next_available_model(self) -> Optional[str]: if next_model not in self.model_cooldowns: self.current_model_index = next_index - logger.info(f"šŸŽÆ Next available model: {next_model}") + logger.info(f"Next available model: {next_model}") return next_model - logger.warning("āŒ No models available - all in cooldown") + logger.warning("No models available - all in cooldown") return None def put_model_in_cooldown(self, model_id: str, cooldown_seconds: Optional[int] = None): @@ -455,7 +470,7 @@ def put_model_in_cooldown(self, model_id: str, cooldown_seconds: Optional[int] = cooldown_until = time.time() + cooldown_seconds self.model_cooldowns[model_id] = cooldown_until - logger.warning(f"🚨 Model {model_id} in cooldown for {cooldown_seconds}s (using shared config)") + logger.warning(f"Model {model_id} in cooldown for {cooldown_seconds}s (using shared config)") def create_switching_model(self, initial_model_id: Optional[str] = None, **kwargs) -> 'SwitchingBedrockModel': """ @@ -507,15 +522,15 @@ def __init__( # Initialize with the first model self.current_model = CustomBedrockModel(model_id=initial_model_id, **kwargs) - logger.info(f"šŸ”§ Switching model initialized with {initial_model_id}") + logger.info(f"Switching model initialized with {initial_model_id}") def _switch_to_model(self, model_id: str): """Switch to a specific model.""" try: self.current_model = CustomBedrockModel(model_id=model_id, **self.kwargs) - logger.info(f"šŸ”„ Switched to model: {model_id}") + logger.info(f"Switched to model: {model_id}") except Exception as e: - logger.warning(f"🚨 Error in stream: {e}") + logger.error(f"Error in stream: {e}") raise async def stream( @@ -532,22 +547,22 @@ async def stream( while self.switches_attempted <= self.max_switches: try: - logger.info(f"šŸŽÆ Attempting stream with model: {self.current_model.get_config()['model_id']} (switch attempt {self.switches_attempted})") + logger.info(f"Attempting stream with model: {self.current_model.get_config()['model_id']} (switch attempt {self.switches_attempted})") async for event in self.current_model.stream(messages, tool_specs, system_prompt, **kwargs): yield event # Success - stream completed - logger.info(f"āœ… Stream completed successfully with model: {self.current_model.get_config()['model_id']}") + logger.info(f"Stream completed successfully with model: {self.current_model.get_config()['model_id']}") return except ModelThrottledException as e: self.switches_attempted += 1 current_model_id = self.current_model.get_config()['model_id'] - logger.warning(f"🚨 Model {current_model_id} throttled (attempt {self.switches_attempted})") + logger.warning(f"Model {current_model_id} throttled (attempt {self.switches_attempted})") if self.switches_attempted > self.max_switches: - logger.error(f"āŒ Exceeded max model switches ({self.max_switches})") + logger.error(f"Exceeded max model switches ({self.max_switches})") raise Exception(f"All model switching attempts failed. Last error: {e}") # Put current model in cooldown @@ -558,15 +573,15 @@ async def stream( if next_model: # Switch to next model self._switch_to_model(next_model) - logger.warning(f"šŸ”„ IMMEDIATE MODEL SWITCH: {current_model_id} → {next_model}") + logger.warning(f"Immediate model switch: {current_model_id} -> {next_model}") continue # No more models available - logger.error(f"āŒ No alternative models available") + logger.error("No alternative models available") raise Exception(f"No alternative models available. Last error: {e}") except Exception as e: - logger.warning(f"🚨 Non-throttling error with model {self.current_model.get_config()['model_id']}: {e}") + logger.exception(f"Non-throttling error with model {self.current_model.get_config()['model_id']}") raise e raise Exception(f"Stream failed after {self.switches_attempted} model switches") @@ -604,21 +619,21 @@ async def structured_output( while self.switches_attempted <= self.max_switches: try: - logger.info(f"šŸŽÆ Attempting structured output with model: {self.current_model.get_config()['model_id']} (switch attempt {self.switches_attempted})") + logger.info(f"Attempting structured output with model: {self.current_model.get_config()['model_id']} (switch attempt {self.switches_attempted})") result = await self.current_model.structured_output(messages, schema, tool_specs, system_prompt, **kwargs) # Success - structured output completed - logger.info(f"āœ… Structured output completed successfully with model: {self.current_model.get_config()['model_id']}") + logger.info(f"Structured output completed successfully with model: {self.current_model.get_config()['model_id']}") return result except ModelThrottledException as e: self.switches_attempted += 1 current_model_id = self.current_model.get_config()['model_id'] - logger.warning(f"🚨 Model {current_model_id} throttled during structured output (attempt {self.switches_attempted})") + logger.warning(f"Model {current_model_id} throttled during structured output (attempt {self.switches_attempted})") if self.switches_attempted > self.max_switches: - logger.error(f"āŒ Exceeded max model switches ({self.max_switches}) for structured output") + logger.error(f"Exceeded max model switches ({self.max_switches}) for structured output") raise Exception(f"All model switching attempts failed for structured output. Last error: {e}") # Put current model in cooldown @@ -629,15 +644,15 @@ async def structured_output( if next_model: # Switch to next model self._switch_to_model(next_model) - logger.warning(f"šŸ”„ IMMEDIATE MODEL SWITCH for structured output: {current_model_id} → {next_model}") + logger.warning(f"Immediate model switch for structured output: {current_model_id} -> {next_model}") continue # No more models available - logger.error(f"āŒ No alternative models available for structured output") + logger.error("No alternative models available for structured output") raise Exception(f"No alternative models available for structured output. Last error: {e}") except Exception as e: - logger.warning(f"🚨 Non-throttling error during structured output with model {self.current_model.get_config()['model_id']}: {e}") + logger.exception(f"Non-throttling error during structured output with model {self.current_model.get_config()['model_id']}") raise e raise Exception(f"Structured output failed after {self.switches_attempted} model switches") diff --git a/application_src/common/memory/elasticsearch.py b/application_src/common/memory/elasticsearch.py index b2e88a6..7ce8cec 100644 --- a/application_src/common/memory/elasticsearch.py +++ b/application_src/common/memory/elasticsearch.py @@ -6,11 +6,11 @@ """ import logging -from typing import Any +from typing import Any, Dict, List -from strands_tools.elasticsearch_memory import ElasticsearchMemoryTool +from strands_tools.elasticsearch_memory import elasticsearch_memory -from .base import BaseMemory +from .base import BaseMemoryProvider as BaseMemory logger = logging.getLogger(__name__) @@ -23,160 +23,95 @@ class ElasticsearchMemory(BaseMemory): Elasticsearch as the backend storage system. """ - def __init__(self, config: list[dict[str, Any]]): + def __init__(self, config: Dict[str, Any]): """ Initialize Elasticsearch memory provider. Args: - config: List of configuration dictionaries containing: - - cloud_id: Elasticsearch cloud deployment ID - - api_key: API key for authentication - - index_name: Index name for storing memories (optional) - - dimensions: Vector dimensions for embeddings (optional) - - Raises: - ValueError: If required configuration is missing + config: Configuration dictionary for the memory provider """ super().__init__(config) + self.provider_name = "elasticsearch" - # Extract configuration parameters - config_dict = {item['name']: item['config'] for item in config} - - # Validate required parameters - required_params = ['cloud_id', 'api_key'] - for param in required_params: - if param not in config_dict: - raise ValueError(f"Missing required parameter: {param}") - - # Extract configuration with defaults - cloud_id = config_dict['cloud_id'] - api_key = config_dict['api_key'] - index_name = config_dict.get('index_name', 'agent_memory') - dimensions = int(config_dict.get('dimensions', 1024)) - - # Initialize the Elasticsearch memory tool - try: - self.memory_tool = ElasticsearchMemoryTool( - cloud_id=cloud_id, - api_key=api_key, - index_name=index_name, - dimensions=dimensions - ) - logger.info( - f"Initialized Elasticsearch memory with index: {index_name}, " - f"dimensions: {dimensions}" - ) - except Exception as e: - logger.error(f"Failed to initialize Elasticsearch memory: {e}") - raise + logger.info("Initialized Elasticsearch memory provider") - def save(self, session_id: str, human_message: str, ai_message: str) -> None: - """ - Save conversation to Elasticsearch memory. - - Args: - session_id: Unique identifier for the conversation session - human_message: The user's message - ai_message: The AI's response - """ + def initialize(self) -> list: + """Initialize the Elasticsearch memory provider and get the tools.""" try: - # Construct memory entry with context - memory_text = f"User: {human_message}\nAssistant: {ai_message}" + # Get provider configuration + provider_config = self.get_provider_config() - # Metadata for filtering and organization - metadata = { - "session_id": session_id, - "type": "conversation", - "timestamp": self._get_timestamp() - } + if not provider_config: + logger.warning("No Elasticsearch configuration found") + return [] - # Store in Elasticsearch - result = self.memory_tool.add_memory( - memory=memory_text, - metadata=metadata - ) + # Create wrapped elasticsearch_memory tool + from strands import tool - logger.debug( - f"Saved conversation to Elasticsearch for session {session_id}: " - f"{result.get('message', 'Success')}" - ) + @tool + def elasticsearch_memory_tool(action: str, content: str = None, query: str = None, + session_id: str = None) -> Dict[str, Any]: + """ + Elasticsearch memory tool for storing and retrieving information. + + Args: + action: The action to perform ('store', 'retrieve', or 'clear') + content: The content to store (for 'store' action) + query: The query to search for (for 'retrieve' action) + session_id: Session ID for organizing memories + + Returns: + Dictionary with the results of the operation + """ + try: + # Create function call parameters + function_params = { + "action": action, + **provider_config + } + + if action == "store" and content: + function_params.update({ + "memory": content, + "session_id": session_id or "default", + "timestamp": ElasticsearchMemory._get_timestamp() + }) + elif action == "retrieve" and query: + function_params.update({ + "query": query, + "n_results": 5, + "session_id": session_id or "default" + }) + elif action == "clear": + function_params.update({ + "session_id": session_id or "default" + }) + + # Call elasticsearch_memory function + result = elasticsearch_memory(function_params) + + logger.debug(f"Elasticsearch memory operation {action} completed") + return result + + except Exception as e: + error_msg = f"Error in elasticsearch_memory tool: {str(e)}" + logger.error(error_msg) + return {"status": "error", "message": error_msg} - except Exception as e: - logger.error(f"Failed to save memory for session {session_id}: {e}") - raise - - def get_context(self, session_id: str, query: str | None = None) -> str: - """ - Retrieve relevant context from Elasticsearch memory. - - Args: - session_id: Unique identifier for the conversation session - query: Optional search query for semantic retrieval - - Returns: - Formatted string containing relevant conversation history - """ - try: - # Use query if provided, otherwise use session_id for filtering - search_query = query if query else f"session:{session_id}" - - # Retrieve memories from Elasticsearch - result = self.memory_tool.get_memories( - query=search_query, - n_results=5 # Configurable number of results - ) - - memories = result.get('memories', []) - - if not memories: - logger.debug(f"No memories found for session {session_id}") - return "" + self.tools = [elasticsearch_memory_tool] + logger.info(f"Successfully created {len(self.tools)} Elasticsearch memory tools") - # Format memories for context - context_parts = [] - for i, memory in enumerate(memories, 1): - memory_text = memory.get('memory', '') - context_parts.append(f"[Memory {i}]\n{memory_text}") - - context = "\n\n".join(context_parts) - logger.debug( - f"Retrieved {len(memories)} memories for session {session_id}" - ) - - return context + return self.tools except Exception as e: - logger.error( - f"Failed to retrieve context for session {session_id}: {e}" - ) - # Return empty context on error to allow conversation to continue - return "" + logger.error(f"Error initializing Elasticsearch memory provider: {str(e)}") + return [] - def clear(self, session_id: str) -> None: - """ - Clear memory for a specific session. - - Note: The strands-tools Elasticsearch memory tool clears ALL memories. - For session-specific clearing, we would need to implement filtering, - which is not currently supported by the underlying tool. - - Args: - session_id: Unique identifier for the conversation session - """ - try: - logger.warning( - f"Clearing ALL Elasticsearch memories (session-specific " - f"clearing not supported by strands-tools). " - f"Session ID: {session_id}" - ) - - result = self.memory_tool.clear_memories() - - logger.info(f"Cleared Elasticsearch memories: {result.get('message')}") - - except Exception as e: - logger.error(f"Failed to clear memories for session {session_id}: {e}") - raise + def get_tools(self) -> List: + """Get memory tools - following the same pattern as other providers.""" + if not hasattr(self, 'tools') or not self.tools: + return self.initialize() + return self.tools @staticmethod def _get_timestamp() -> str: diff --git a/application_src/common/observability/__init__.py b/application_src/common/observability/__init__.py index 1ec42fd..62024d0 100644 --- a/application_src/common/observability/__init__.py +++ b/application_src/common/observability/__init__.py @@ -3,10 +3,16 @@ This module provides a factory for creating observability providers. """ +import logging + from config import Config from .base import BaseObservabilityProvider from .langfuse import LangfuseObservabilityProvider from .dynatrace import DynatraceObservabilityProvider +from .elastic import ElasticObservabilityProvider + +logger = logging.getLogger(__name__) + class ObservabilityFactory: """Factory for creating observability providers.""" @@ -14,45 +20,49 @@ class ObservabilityFactory: @staticmethod def create(agent_name="qa_agent"): """Create an observability provider based on configuration.""" - print(f"šŸ­ ObservabilityFactory.create() called for agent: {agent_name}") + logger.debug(f"ObservabilityFactory.create() called for agent: {agent_name}") # Create a config instance with the specified agent_name agent_config = Config(agent_name) obs_config = agent_config.get_observability_config() - print(f"šŸ“‹ Observability config for {agent_name}: {obs_config}") + logger.debug(f"Observability config for {agent_name}: enabled={obs_config.get('enabled')}, provider={obs_config.get('provider')}") if not obs_config["enabled"]: - print("āŒ Observability is disabled") + logger.info("Observability is disabled") return None provider = obs_config.get("provider") if not provider: - print("āŒ No observability provider specified, disabling observability") + logger.warning("No observability provider specified, disabling observability") return None provider = provider.lower() - print(f"šŸ”§ Creating observability provider: {provider}") + logger.info(f"Creating observability provider: {provider}") if provider == "langfuse": - print("āœ… Creating Langfuse observability provider") + logger.debug("Creating Langfuse observability provider") return LangfuseObservabilityProvider(obs_config) elif provider == "dynatrace": - print("āœ… Creating Dynatrace observability provider") + logger.debug("Creating Dynatrace observability provider") return DynatraceObservabilityProvider(obs_config) + elif provider == "elastic": + logger.debug("Creating Elastic observability provider") + return ElasticObservabilityProvider(obs_config) else: - print(f"āŒ Unknown observability provider: {provider}") + logger.error(f"Unknown observability provider: {provider}") return None + def get_trace_attributes(agent_name="qa_agent"): """Get trace attributes for use with Strands Agent.""" - print(f"šŸ” Getting trace attributes for agent: {agent_name}...") + logger.debug(f"Getting trace attributes for agent: {agent_name}") obs_provider = ObservabilityFactory.create(agent_name) if obs_provider: trace_attrs = obs_provider.get_trace_attributes() - print(f"āœ… Trace attributes retrieved: {trace_attrs}") + logger.debug(f"Trace attributes retrieved: {trace_attrs}") return trace_attrs else: - print("āŒ No observability provider available") + logger.warning("No observability provider available") return {} diff --git a/application_src/common/observability/elastic.py b/application_src/common/observability/elastic.py new file mode 100644 index 0000000..b173dba --- /dev/null +++ b/application_src/common/observability/elastic.py @@ -0,0 +1,236 @@ +""" +Elastic observability provider for GenAI-In-A-Box agent. +This module provides an observability provider for Elastic with proper OpenTelemetry initialization. +""" + +import logging +import os +import uuid +from typing import Dict, Any +from .base import BaseObservabilityProvider + +logger = logging.getLogger(__name__) + + +class ElasticObservabilityProvider(BaseObservabilityProvider): + """Observability provider for Elastic.""" + + def __init__(self, config: Dict[str, Any]): + """Initialize the Elastic observability provider.""" + super().__init__(config) + self.provider_name = "elastic" + + def initialize(self) -> Dict[str, Any]: + """Initialize the Elastic observability provider and get the trace attributes.""" + try: + provider_config = self.get_provider_config() + + # Get Elastic configuration + api_key = provider_config.get("api_key", "") + otlp_endpoint = provider_config.get("otlp_endpoint", "") + + logger.debug("Elastic credentials check:") + logger.debug(f" API Key: {'Present' if api_key else 'Missing'}") + logger.debug(f" OTLP Endpoint: {'Configured' if otlp_endpoint else 'Missing'}") + + if not api_key: + logger.error("Elastic API key (api_key) is required") + return {} + + if not otlp_endpoint: + logger.error("Elastic OTLP endpoint (otlp_endpoint) is required") + return {} + + # Set up environment variables for Elastic (CRITICAL for Strands integration) + os.environ["ELASTIC_API_KEY"] = api_key + os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = otlp_endpoint + os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=ApiKey {api_key}" + + logger.info("Elastic environment variables configured successfully") + + # CRITICAL: Initialize OpenTelemetry for Elastic + try: + self._initialize_opentelemetry(otlp_endpoint, api_key) + logger.info("OpenTelemetry initialized successfully for Elastic") + except Exception as otel_error: + logger.warning(f"OpenTelemetry initialization failed: {otel_error}") + logger.warning("Traces will not be sent to Elastic") + # Don't return empty dict - still provide trace attributes for debugging + + # Get service name from config or environment + # Priority: agent_name from config > AGENT_NAME env var > SERVICE_NAME env var > default + service_name = ( + self.config.get("agent_name") or + os.environ.get('AGENT_NAME') or + os.environ.get('SERVICE_NAME') or + 'genai-in-a-box' + ) + + # Get service version from config or environment + service_version = ( + self.config.get("agent_version") or + os.environ.get('SERVICE_VERSION') or + '1.0.0' + ) + + # Get optional dataset routing configuration + dataset = provider_config.get("dataset", "generic.otel") + namespace = provider_config.get("namespace", "default") + + self.trace_attributes = { + "session.id": f"{service_name}-session-{uuid.uuid4()}", + "user.id": f"{service_name}-user", + "service.name": service_name, + "service.version": service_version, + "deployment.environment": os.environ.get('ENVIRONMENT', 'production'), + "data_stream.dataset": dataset, + "data_stream.namespace": namespace + } + + logger.info("Elastic observability provider initialized successfully") + logger.debug(f"Trace attributes: {self.trace_attributes}") + return self.trace_attributes + + except Exception as e: + logger.exception("Error initializing Elastic observability provider") + return {} + + def _initialize_opentelemetry(self, otlp_endpoint: str, api_key: str): + """Initialize OpenTelemetry with OTLP exporter for Elastic.""" + try: + # Import OpenTelemetry components + from opentelemetry import trace + from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter + from opentelemetry.sdk.trace import TracerProvider + from opentelemetry.sdk.trace.export import BatchSpanProcessor + from opentelemetry.sdk.resources import Resource + + logger.debug("OpenTelemetry packages imported successfully") + + # Get dataset and namespace from config + provider_config = self.get_provider_config() + dataset = provider_config.get("dataset", "generic.otel") + namespace = provider_config.get("namespace", "default") + + # Get service name and version (same logic as trace_attributes) + service_name = ( + self.config.get("agent_name") or + os.environ.get('AGENT_NAME') or + os.environ.get('SERVICE_NAME') or + 'genai-in-a-box' + ) + service_version = ( + self.config.get("agent_version") or + os.environ.get('SERVICE_VERSION') or + '1.0.0' + ) + + # Create resource with service information + # Include data stream routing attributes for Elastic + resource = Resource.create({ + "service.name": service_name, + "service.version": service_version, + "deployment.environment": os.environ.get('ENVIRONMENT', 'production'), + "data_stream.dataset": dataset, + "data_stream.namespace": namespace + }) + + # Set up tracer provider + tracer_provider = TracerProvider(resource=resource) + trace.set_tracer_provider(tracer_provider) + + logger.debug("TracerProvider configured") + + # Create OTLP exporter with Elastic API Key authentication + headers = {"Authorization": f"ApiKey {api_key}"} + + logger.debug(f"Data Stream: traces-{dataset}-{namespace}") + logger.debug("OTLP Endpoint Configuration:") + logger.debug(f" Base endpoint from config: {otlp_endpoint}") + + # Ensure the endpoint has the correct OTLP traces path + # Elastic OTLP endpoint should end with /v1/traces + if not otlp_endpoint.endswith('/v1/traces'): + if otlp_endpoint.endswith('/'): + otlp_endpoint = otlp_endpoint + 'v1/traces' + else: + otlp_endpoint = otlp_endpoint + '/v1/traces' + + logger.debug(f" Final OTLP traces endpoint: {otlp_endpoint}") + logger.debug(" Headers: Authorization=ApiKey [REDACTED]") + + otlp_exporter = OTLPSpanExporter( + endpoint=otlp_endpoint, + headers=headers + ) + + logger.debug("OTLP Exporter created successfully") + logger.debug(f" Exporter endpoint: {otlp_exporter._endpoint}") + logger.debug(f" Exporter will send traces to: {otlp_endpoint}") + + # Wrap the exporter to add detailed error logging and resilience + class ResilientOTLPSpanExporter: + def __init__(self, wrapped_exporter): + self._wrapped = wrapped_exporter + self._failed_exports = 0 + self._max_failures = 5 # Stop trying after 5 consecutive failures + + def export(self, spans): + # Skip export if we've had too many failures + if self._failed_exports >= self._max_failures: + from opentelemetry.sdk.trace.export import SpanExportResult + logger.warning(f"OTLP Export Skipped: Too many consecutive failures ({self._failed_exports})") + return SpanExportResult.FAILURE + + try: + logger.debug("OTLP Export Debug:") + logger.debug(f" Sending {len(spans)} spans to: {self._wrapped._endpoint}") + logger.debug(f" Request headers: {self._wrapped._headers}") + result = self._wrapped.export(spans) + logger.debug(f" Export result: {result}") + + # Reset failure counter on success + if result.name == 'SUCCESS': + self._failed_exports = 0 + else: + self._failed_exports += 1 + logger.warning(f"Export failed, failure count: {self._failed_exports}") + + return result + except Exception as e: + self._failed_exports += 1 + logger.error(f"OTLP Export Error (failure {self._failed_exports}/{self._max_failures}):") + logger.error(f" Error type: {type(e).__name__}") + logger.error(f" Error message: {str(e)}") + logger.error(f" Endpoint attempted: {self._wrapped._endpoint}") + + # Only log full traceback for first few failures to reduce log spam + if self._failed_exports <= 3: + logger.exception("OTLP Export Exception details:") + + # Return failure instead of raising to prevent crash + from opentelemetry.sdk.trace.export import SpanExportResult + return SpanExportResult.FAILURE + + def shutdown(self): + return self._wrapped.shutdown() + + def force_flush(self, timeout_millis: int = 30000): + return self._wrapped.force_flush(timeout_millis) + + # Wrap the exporter for resilience and debugging + resilient_exporter = ResilientOTLPSpanExporter(otlp_exporter) + + # Add span processor with resilient exporter + span_processor = BatchSpanProcessor(resilient_exporter) + tracer_provider.add_span_processor(span_processor) + + logger.info("OpenTelemetry configured with OTLP exporter for Elastic") + + except ImportError as import_error: + logger.error(f"Missing OpenTelemetry dependencies: {import_error}") + logger.error("Install with: pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp") + raise + except Exception as setup_error: + logger.exception("OpenTelemetry setup failed") + raise diff --git a/application_src/configuration-api/app/api/config.py b/application_src/configuration-api/app/api/config.py index 50c7ffa..bddf9ae 100644 --- a/application_src/configuration-api/app/api/config.py +++ b/application_src/configuration-api/app/api/config.py @@ -165,7 +165,7 @@ async def get_agent_config( result = agent_service.load_agent_configuration(agent_name) # Convert result to dict for manipulation - config_dict = result.dict() if hasattr(result, 'dict') else dict(result) + config_dict = result.model_dump(mode='json') if hasattr(result, 'model_dump') else dict(result) return config_dict @@ -735,6 +735,71 @@ async def create_system_prompt( raise HTTPException(status_code=500, detail="Internal server error occurred") +@config_router.post('/update-deployment/{agent_name}') +async def update_agent_deployment( + agent_name: str, + current_user: UserInfo = Depends(get_current_user), + _: None = Depends(RequirePermission("config:update")) +) -> Dict[str, Any]: + """ + Trigger CloudFormation stack update to force new ECS task deployment. + + This endpoint updates the CloudFormation stack for the agent, which will: + 1. Force ECS to deploy new tasks with latest configuration + 2. Maintain zero-downtime deployment + 3. Pull latest container images if updated + + Args: + agent_name: Name of the agent to update deployment for + + Returns: + Dictionary with update status and stack information + + Raises: + HTTPException: If update fails or agent stack not found + """ + try: + logger.info(f"Triggering deployment update for agent: {agent_name}") + + # Get deployment service + from ..services import DeploymentService + from ..utils.dependencies import get_deployment_service + + deployment_service = get_deployment_service() + + # Update the agent stack to force new deployment + update_result = await deployment_service.update_agent_stack( + agent_name=agent_name, + parameters=None # Use existing parameters, just trigger update + ) + + logger.info(f"Successfully triggered deployment update for agent: {agent_name}") + + return { + "status": "success", + "message": f"Deployment update initiated for agent '{agent_name}'", + "agent_name": agent_name, + "stack_name": update_result.get("stack_name"), + "stack_status": update_result.get("status"), + "details": update_result + } + + except ValueError as e: + # Agent stack not found + log_exception_safely(logger, e, f"Agent stack not found for '{agent_name}'") + raise HTTPException( + status_code=404, + detail=f"No CloudFormation stack found for agent '{agent_name}'" + ) + except Exception as e: + logger.error(f"Error updating deployment for agent '{agent_name}'") + log_exception_safely(logger, e, f"Error updating deployment for '{agent_name}'") + raise HTTPException( + status_code=500, + detail="Internal server error during deployment update" + ) + + @config_router.post('/refresh-agent/{agent_name}') async def refresh_agent_instances( agent_name: str, @@ -819,7 +884,7 @@ async def refresh_agent_instances( # Check if this agent matches our target if current_agent_name == agent_name: matching_agents_found += 1 - logger.info(f"āœ… Found matching agent '{agent_name}' at {agent_url}, triggering refresh") + logger.info(f"Found matching agent '{agent_name}' at {agent_url}, triggering refresh") # Call the agent's /config/load endpoint with its own name load_url = f"{agent_url.rstrip('/')}/config/load" @@ -839,7 +904,7 @@ async def refresh_agent_instances( "timestamp": response_data.get("timestamp", "unknown") } successful_refreshes.append(agent_url) - logger.info(f"āœ… Successfully refreshed agent '{agent_name}' at {agent_url}") + logger.info(f"Successfully refreshed agent '{agent_name}' at {agent_url}") else: refresh_results[agent_url] = { @@ -851,7 +916,7 @@ async def refresh_agent_instances( "timestamp": None } failed_refreshes.append({"url": agent_url, "reason": "refresh failed"}) - logger.error(f"āŒ Agent '{agent_name}' at {agent_url} refresh failed") + logger.error(f"Agent '{agent_name}' at {agent_url} refresh failed") else: # Agent doesn't match, skip it refresh_results[agent_url] = { @@ -861,7 +926,7 @@ async def refresh_agent_instances( "checked": True, "matches_target": False } - logger.info(f"ā­ļø Agent '{current_agent_name}' at {agent_url} does not match target '{agent_name}', skipped") + logger.info(f"Agent '{current_agent_name}' at {agent_url} does not match target '{agent_name}', skipped") except httpx.TimeoutException as e: log_exception_safely(logger, e, f"Timeout checking agent at {agent_url}") @@ -873,7 +938,7 @@ async def refresh_agent_instances( "error": "Request timeout after 30 seconds" } failed_refreshes.append({"url": agent_url, "reason": "timeout"}) - logger.error(f"ā° Agent at {agent_url} timed out") + logger.error(f"Agent at {agent_url} timed out") except httpx.RequestError as e: log_exception_safely(logger, e, f"Request error checking agent at {agent_url}") diff --git a/application_src/configuration-api/app/api/deployment.py b/application_src/configuration-api/app/api/deployment.py index 8d95dd0..30b9993 100644 --- a/application_src/configuration-api/app/api/deployment.py +++ b/application_src/configuration-api/app/api/deployment.py @@ -13,9 +13,9 @@ from fastapi import APIRouter, HTTPException, BackgroundTasks, Depends, Body from pydantic import BaseModel, Field -from app.services.cloudformation_deployment_service import CloudFormationDeploymentService +from app.services.deployment_service import DeploymentService from app.services.agent_config_service import AgentConfigService -from app.utils.dependencies import get_cloudformation_deployment_service, get_agent_config_service +from app.utils.dependencies import get_deployment_service, get_agent_config_service from app.models import AgentConfigRequest # Authentication middleware imports @@ -65,7 +65,7 @@ async def deploy_agent_stack( background_tasks: BackgroundTasks, current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:deploy")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ Deploy a new agent stack using CloudFormation API. @@ -100,11 +100,14 @@ async def deploy_agent_stack( detail="Agent name must contain only letters, numbers, underscores, and hyphens" ) - # Deploy the stack using CloudFormation - result = cfn_service.deploy_agent_stack( - agent_name=request.agent_name, - parameters=request.parameters, - timeout_minutes=request.timeout_minutes + # Deploy the stack using CloudFormation template from S3 + # Use consistent naming pattern throughout all operations + new_stack_name = deployment_service.get_stack_name_from_agent(request.agent_name) + + result = await deployment_service.create_agent_stack( + new_agent_name=request.agent_name, + new_stack_name=new_stack_name, + model_config=None # Will read from SSM ) logger.info(f"Successfully deployed stack: {result['stack_name']}") @@ -136,14 +139,14 @@ async def get_stack_status( agent_name: str, current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:read")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ Get the current status of an agent's CloudFormation stack. Args: agent_name: Name of the agent - cfn_service: Injected CloudFormation deployment service + deployment_service: Injected deployment service Returns: Current stack status and information @@ -154,19 +157,21 @@ async def get_stack_status( try: logger.info(f"Retrieving status for agent: {agent_name}") - status_info = cfn_service.get_stack_info(agent_name) + # Find the stack for this agent first + stack_info = await deployment_service.find_agent_stack_by_name(agent_name) + + if not stack_info: + raise HTTPException(status_code=404, detail="Agent stack not found") + + status_info = await deployment_service.get_stack_status(stack_info['stack_name']) return StackStatusResponse(**status_info) - except RuntimeError as e: + except HTTPException: + raise + except Exception as e: logger.error("Error retrieving stack status") log_exception_safely(logger, "Error retrieving stack status", e) - if 'not found' in str(e).lower(): - raise HTTPException(status_code=404, detail="Agent stack not found") - raise HTTPException(status_code=500, detail="Failed to retrieve stack status") - except Exception as e: - logger.error("Unexpected error retrieving stack status") - log_exception_safely(logger, "Unexpected error retrieving stack status", e) raise HTTPException(status_code=500, detail="Failed to retrieve stack status") @@ -174,13 +179,13 @@ async def get_stack_status( async def list_agent_stacks( current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:read")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ List all agent stacks with their current status. Args: - cfn_service: Injected CloudFormation deployment service + deployment_service: Injected deployment service Returns: List of agent stacks with status information @@ -191,21 +196,17 @@ async def list_agent_stacks( try: logger.info("Retrieving list of agent stacks") - stacks = cfn_service.list_agent_stacks() + stacks = await deployment_service.list_agent_stacks() return { "stacks": stacks, "count": len(stacks) } - except RuntimeError as e: + except Exception as e: logger.error("Error listing agent stacks") log_exception_safely(logger, "Error listing agent stacks", e) raise HTTPException(status_code=500, detail="Failed to list agent stacks") - except Exception as e: - logger.error("Unexpected error listing agent stacks") - log_exception_safely(logger, "Unexpected error listing agent stacks", e) - raise HTTPException(status_code=500, detail="Failed to list stacks") @deployment_router.delete("/stack/{agent_name}") @@ -214,7 +215,7 @@ async def delete_agent_stack( timeout_minutes: int = 30, current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:delete")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ Delete an agent stack using CloudFormation API. @@ -222,7 +223,7 @@ async def delete_agent_stack( Args: agent_name: Name of the agent timeout_minutes: Maximum time to wait for deletion - cfn_service: Injected CloudFormation deployment service + deployment_service: Injected deployment service Returns: Deletion confirmation @@ -233,29 +234,27 @@ async def delete_agent_stack( try: logger.info(f"Deleting stack for agent: {agent_name}") - result = cfn_service.delete_agent_stack( - agent_name=agent_name, - timeout_minutes=timeout_minutes - ) + # Find the stack for this agent first + stack_info = await deployment_service.find_agent_stack_by_name(agent_name) + + if not stack_info: + raise HTTPException(status_code=404, detail="Agent stack not found") + + result = await deployment_service.delete_stack(stack_info['stack_name']) return { "message": f"Stack deletion completed for agent: {agent_name}", "stack_name": result['stack_name'], - "agent_name": result['agent_name'], - "status": result['status'], - "deleted_at": result['deleted_at'] + "agent_name": agent_name, + "status": result['status'] } - except RuntimeError as e: + except HTTPException: + raise + except Exception as e: logger.error("Error deleting stack") log_exception_safely(logger, "Error deleting stack", e) - if 'not found' in str(e).lower(): - raise HTTPException(status_code=404, detail="Agent stack not found") raise HTTPException(status_code=500, detail="Failed to delete agent stack") - except Exception as e: - logger.error("Unexpected error deleting stack") - log_exception_safely(logger, "Unexpected error deleting stack", e) - raise HTTPException(status_code=500, detail="Failed to delete stack") @deployment_router.put("/stack/{agent_name}") @@ -265,7 +264,7 @@ async def update_agent_stack( timeout_minutes: int = 30, current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:update")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ Update an existing agent stack using CloudFormation API. @@ -274,7 +273,7 @@ async def update_agent_stack( agent_name: Name of the agent parameters: CloudFormation parameters for update timeout_minutes: Maximum time to wait for update - cfn_service: Injected CloudFormation deployment service + deployment_service: Injected deployment service Returns: Update confirmation @@ -285,10 +284,9 @@ async def update_agent_stack( try: logger.info(f"Updating stack for agent: {agent_name}") - result = cfn_service.update_agent_stack( + result = await deployment_service.update_agent_stack( agent_name=agent_name, - parameters=parameters, - timeout_minutes=timeout_minutes + parameters=parameters ) return { @@ -298,22 +296,13 @@ async def update_agent_stack( "outputs": result.get('outputs', {}) } - except RuntimeError as e: + except ValueError as e: + log_exception_safely(logger, e, "Agent stack not found") + raise HTTPException(status_code=404, detail="Agent stack not found") + except Exception as e: logger.error("Error updating stack") log_exception_safely(logger, "Error updating stack", e) - if 'not found' in str(e).lower(): - raise HTTPException(status_code=404, detail="Agent stack not found") - if 'No updates' in str(e): - return { - "message": f"No updates needed for agent: {agent_name}", - "stack_name": cfn_service._get_stack_name(agent_name), - "status": "UP_TO_DATE" - } raise HTTPException(status_code=500, detail="Failed to update agent stack") - except Exception as e: - logger.error("Unexpected error updating stack") - log_exception_safely(logger, "Unexpected error updating stack", e) - raise HTTPException(status_code=500, detail="Failed to update stack") @deployment_router.post("/create-agent") @@ -322,12 +311,12 @@ async def create_agent( background_tasks: BackgroundTasks = BackgroundTasks(), current_user: UserInfo = Depends(get_current_user), _: None = Depends(RequirePermission("agent:deploy")), - cfn_service: CloudFormationDeploymentService = Depends(get_cloudformation_deployment_service) + deployment_service: DeploymentService = Depends(get_deployment_service) ): """ Deploy CloudFormation stack for an agent asynchronously. - This endpoint initiates CloudFormation stack creation in the background + This endpoint initiates CloudFormation stack creation using DeploymentService and returns immediately, preventing health check failures. The UI workflow is: @@ -339,7 +328,7 @@ async def create_agent( request: Request body with: - new_agent_name: Name of the agent to deploy background_tasks: FastAPI background tasks - cfn_service: Injected CloudFormation deployment service + deployment_service: Injected deployment service Returns: Immediate response with deployment initiation status @@ -363,30 +352,13 @@ async def create_agent( detail="Agent name must contain only letters, numbers, underscores, and hyphens" ) - # Initiate CloudFormation stack creation (non-blocking) - cfn_parameters = { - "AgentName": agent_name - } + # Use consistent naming pattern throughout all operations + new_stack_name = deployment_service.get_stack_name_from_agent(agent_name) - stack_name = cfn_service._get_stack_name(agent_name) - - # Download template and create stack (fast operations) - template_body = cfn_service._download_template("GenericAgentTemplate.json") - cfn_parameters_list = cfn_service._convert_parameters(cfn_parameters) - - # Create stack (returns immediately, doesn't wait for completion) - cfn_service.cfn_client.create_stack( - StackName=stack_name, - TemplateBody=template_body, - Parameters=cfn_parameters_list, - Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'], - Tags=[ - {'Key': 'ManagedBy', 'Value': 'ConfigurationAPI'}, - {'Key': 'ProjectName', 'Value': cfn_service.project_name}, - {'Key': 'AgentName', 'Value': agent_name}, - {'Key': 'DeployedAt', 'Value': datetime.now().isoformat()} - ], - TimeoutInMinutes=30 + result = await deployment_service.create_agent_stack( + new_agent_name=agent_name, + new_stack_name=new_stack_name, + model_config=None # Will read from SSM ) logger.info(f"Stack creation initiated for agent: {agent_name}") @@ -395,8 +367,8 @@ async def create_agent( "status": "initiated", "message": f"Infrastructure deployment initiated for agent '{agent_name}'", "agent_name": agent_name, - "stack_name": stack_name, - "deployment_status": "CREATE_IN_PROGRESS", + "stack_name": new_stack_name, + "deployment_status": result['status'], "outputs": {}, "deployed_at": datetime.now().isoformat() } @@ -404,11 +376,7 @@ async def create_agent( except ValueError as e: logger.error("Validation error creating agent") log_exception_safely(logger, "Validation error creating agent", e) - raise HTTPException(status_code=400, detail="Invalid agent creation parameters") - except RuntimeError as e: - logger.error("Runtime error creating agent") - log_exception_safely(logger, "Runtime error creating agent", e) - raise HTTPException(status_code=500, detail="Agent creation failed") + raise HTTPException(status_code=400, detail=str(e)) except Exception as e: logger.error("Error creating agent") log_exception_safely(logger, "Error creating agent", e) diff --git a/application_src/configuration-api/app/models/agent_config.py b/application_src/configuration-api/app/models/agent_config.py index a6dd44b..cb61660 100644 --- a/application_src/configuration-api/app/models/agent_config.py +++ b/application_src/configuration-api/app/models/agent_config.py @@ -71,6 +71,10 @@ class AgentConfigRequest(BaseModel): # Model configuration model_id: str = Field(..., description="Primary model identifier") + model_ids: Optional[List[str]] = Field( + default=None, + description="Multiple model identifiers for dynamic model switching" + ) judge_model_id: str = Field(..., description="Judge model identifier") embedding_model_id: str = Field(..., description="Embedding model identifier") region_name: str = Field(..., description="AWS region name") @@ -162,6 +166,10 @@ class AgentConfigResponse(BaseModel): # Model configuration model_id: str = Field(..., description="Primary model identifier") + model_ids: Optional[List[str]] = Field( + default=None, + description="Multiple model identifiers for dynamic model switching" + ) judge_model_id: str = Field(..., description="Judge model identifier") embedding_model_id: str = Field(..., description="Embedding model identifier") region_name: str = Field(..., description="AWS region name") diff --git a/application_src/configuration-api/app/models/form_schema.py b/application_src/configuration-api/app/models/form_schema.py index 619a589..f6fdb5a 100644 --- a/application_src/configuration-api/app/models/form_schema.py +++ b/application_src/configuration-api/app/models/form_schema.py @@ -722,25 +722,13 @@ def get_observability_schemas() -> Dict[str, ProviderFormSchema]: provider_label="Langfuse", description="Open-source LLM observability and analytics platform", fields=[ - FormField( - name="enabled", - type=FieldType.CHECKBOX, - label="Enable Observability Integration", - help_text="Enable Langfuse observability and analytics", - required=False, - default_value=False - ), FormField( name="public_key", type=FieldType.TEXT, label="Langfuse Public Key", placeholder="Enter Langfuse public key", help_text="Your Langfuse project public key", - required=True, - conditional={ - "field": "enabled", - "value": True - } + required=True ), FormField( name="secret_key", @@ -749,11 +737,7 @@ def get_observability_schemas() -> Dict[str, ProviderFormSchema]: placeholder="Enter Langfuse secret key", help_text="Your Langfuse project secret key", required=True, - secure=True, - conditional={ - "field": "enabled", - "value": True - } + secure=True ), FormField( name="host", @@ -762,11 +746,7 @@ def get_observability_schemas() -> Dict[str, ProviderFormSchema]: placeholder="https://cloud.langfuse.com", help_text="Langfuse host URL (defaults to cloud.langfuse.com)", required=False, - default_value="https://cloud.langfuse.com", - conditional={ - "field": "enabled", - "value": True - } + default_value="https://cloud.langfuse.com" ) ] ), @@ -776,25 +756,13 @@ def get_observability_schemas() -> Dict[str, ProviderFormSchema]: provider_label="Dynatrace", description="Full-stack observability platform", fields=[ - FormField( - name="enabled", - type=FieldType.CHECKBOX, - label="Enable Observability Integration", - help_text="Enable Dynatrace observability and monitoring", - required=False, - default_value=False - ), FormField( name="environment_url", type=FieldType.URL, label="Dynatrace Environment URL", placeholder="https://your-environment.live.dynatrace.com", help_text="Your Dynatrace environment URL", - required=True, - conditional={ - "field": "enabled", - "value": True - } + required=True ), FormField( name="api_token", @@ -803,11 +771,50 @@ def get_observability_schemas() -> Dict[str, ProviderFormSchema]: placeholder="Enter Dynatrace API token", help_text="API token with appropriate permissions", required=True, - secure=True, - conditional={ - "field": "enabled", - "value": True - } + secure=True + ) + ] + ), + + "elastic": ProviderFormSchema( + provider_name="elastic", + provider_label="Elastic Observability", + description="Elastic Cloud Managed OTLP Endpoint for OpenTelemetry-based observability", + fields=[ + FormField( + name="otlp_endpoint", + type=FieldType.URL, + label="Elastic OTLP Endpoint", + placeholder="https://your-cluster.elastic-cloud.com:443", + help_text="Your Elastic Cloud Managed OTLP endpoint URL (found in Elastic Cloud console)", + required=True + ), + FormField( + name="api_key", + type=FieldType.PASSWORD, + label="Elastic API Key", + placeholder="Enter Elastic API key", + help_text="API key for authentication with Elastic Cloud", + required=True, + secure=True + ), + FormField( + name="dataset", + type=FieldType.TEXT, + label="Data Stream Dataset (Optional)", + placeholder="generic.otel", + help_text="Dataset name for routing logs to dedicated data streams (default: generic.otel)", + required=False, + default_value="generic.otel" + ), + FormField( + name="namespace", + type=FieldType.TEXT, + label="Data Stream Namespace (Optional)", + placeholder="default", + help_text="Namespace for data stream organization (default: default)", + required=False, + default_value="default" ) ] ) diff --git a/application_src/configuration-api/app/models/ssm_data_models.py b/application_src/configuration-api/app/models/ssm_data_models.py index 80b54fd..b4b32eb 100644 --- a/application_src/configuration-api/app/models/ssm_data_models.py +++ b/application_src/configuration-api/app/models/ssm_data_models.py @@ -34,6 +34,17 @@ class ProviderType(str, Enum): YES = "Yes" DEFAULT = "default" CUSTOM = "custom" + # Memory providers + MEM0 = "mem0" + ELASTICSEARCH = "elasticsearch" + BEDROCK_AGENTCORE = "bedrock_agentcore" + OPENSEARCH = "opensearch" + # Knowledge base providers + BEDROCK = "bedrock" + BEDROCK_KB = "bedrock_kb" + CUSTOM_KB = "custom_kb" + # Guardrail providers + BEDROCK_GUARDRAIL = "bedrock" class StreamingType(str, Enum): @@ -94,6 +105,10 @@ class SSMAgentConfiguration(BaseModel): ..., description="Primary model identifier with region prefix (e.g. us.anthropic.claude-3-5-sonnet-20241022-v2:0)" ) + model_ids: Optional[List[str]] = Field( + default=None, + description="Multiple model identifiers for dynamic model switching (optional, for multi-model support)" + ) judge_model_id: str = Field( ..., description="Judge model identifier with region prefix for evaluation tasks" @@ -506,7 +521,7 @@ def validate_agent_configuration(config_data: Dict[str, Any]) -> Dict[str, Any]: return { "valid": True, - "model": validated_config.dict(), + "model": validated_config.model_dump(mode='json'), "errors": [], "warnings": [] } diff --git a/application_src/configuration-api/app/services/agent_config_service.py b/application_src/configuration-api/app/services/agent_config_service.py index 6911820..7e70931 100644 --- a/application_src/configuration-api/app/services/agent_config_service.py +++ b/application_src/configuration-api/app/services/agent_config_service.py @@ -37,6 +37,10 @@ def save_agent_configuration(self, config_request: AgentConfigRequest) -> Dict[s """ Save agent configuration including system prompt and settings. + CRITICAL: This method preserves existing configuration data when editing. + When updating an existing agent, it merges new data with existing data to prevent + loss of configurations that weren't modified in the current request. + Args: config_request: Agent configuration request data @@ -65,9 +69,109 @@ def save_agent_configuration(self, config_request: AgentConfigRequest) -> Dict[s if not success: raise Exception("Failed to store system prompt") - # Prepare configuration data (excluding system prompt content) - config_data = config_request.dict() - config_data.pop('system_prompt', None) # Remove prompt content from config + # CRITICAL FIX: Load existing configuration to preserve data during edits + existing_config = self._get_agent_config(agent_name) + + # Prepare new configuration data (excluding system prompt content) + # Use model_dump() for Pydantic v2 with proper nested model serialization + new_config_data = config_request.model_dump(mode='json', exclude_none=True) + new_config_data.pop('system_prompt', None) # Remove prompt content from config + + # Normalize cache_prompt and cache_tools values to valid enum values + # UI may send 'default' which needs to be converted to 'False' + if new_config_data.get('cache_prompt') == 'default': + new_config_data['cache_prompt'] = 'False' + logger.info(f"Normalized cache_prompt from 'default' to 'False'") + + if new_config_data.get('cache_tools') == 'default': + new_config_data['cache_tools'] = 'False' + logger.info(f"Normalized cache_tools from 'default' to 'False'") + + # CRITICAL FIX: Merge new data with existing data to preserve unmodified fields + if existing_config: + logger.info(f"Merging with existing configuration for agent: {agent_name}") + + # Start with existing config as base + merged_config = existing_config.copy() + + # Component types that have provider_details pattern + component_types = ['memory', 'knowledge_base', 'observability', 'guardrail'] + + # Track which components have existing configurations + components_to_preserve = {} + for component_type in component_types: + details_key = f"{component_type}_details" if component_type == 'knowledge_base' else f"{component_type}_provider_details" + existing_details = existing_config.get(details_key, []) + + # Check if this component has meaningful existing configuration + if isinstance(existing_details, list) and len(existing_details) > 0: + # Check if any provider has non-empty config + has_config = any( + isinstance(provider, dict) and + isinstance(provider.get('config', {}), dict) and + len(provider.get('config', {})) > 0 + for provider in existing_details + ) + + if has_config: + components_to_preserve[component_type] = { + 'details_key': details_key, + 'enabled_key': component_type, + 'provider_key': f"{component_type}_provider", + 'existing_details': existing_details, + 'existing_enabled': existing_config.get(component_type, 'False'), + 'existing_provider': existing_config.get(f"{component_type}_provider", 'default') + } + logger.info(f"Component '{component_type}' has existing configuration to potentially preserve") + + # Update with new values + for key, value in new_config_data.items(): + # Check if this is a component field that should be preserved + should_preserve = False + + for component_type, preserve_info in components_to_preserve.items(): + # Check if this key belongs to a component with existing config + if key == preserve_info['details_key']: + # If new value is empty list, preserve existing details + if isinstance(value, list) and len(value) == 0: + logger.info(f"Preserving existing {key} (has {len(preserve_info['existing_details'])} items)") + should_preserve = True + break + + # Also preserve the enabled flag if details are being preserved + elif key == preserve_info['enabled_key']: + new_details = new_config_data.get(preserve_info['details_key'], []) + if isinstance(new_details, list) and len(new_details) == 0: + # New request has empty details, check if we should preserve + if value in ['False', 'false', False, 'No', 'default']: + logger.info(f"Preserving existing {key} enabled status: {preserve_info['existing_enabled']}") + merged_config[key] = preserve_info['existing_enabled'] + should_preserve = True + break + + # Also preserve the provider name if details are being preserved + elif key == preserve_info['provider_key']: + new_details = new_config_data.get(preserve_info['details_key'], []) + if isinstance(new_details, list) and len(new_details) == 0: + if value in ['default', 'No', 'no']: + logger.info(f"Preserving existing {key}: {preserve_info['existing_provider']}") + merged_config[key] = preserve_info['existing_provider'] + should_preserve = True + break + + # If we should preserve this field, skip the update + if should_preserve: + continue + + # Otherwise, update the field with new value + merged_config[key] = value + + config_data = merged_config + logger.info(f"Configuration merged successfully, preserved {len(components_to_preserve)} component configurations") + else: + # New agent - use new config as-is + config_data = new_config_data + logger.info(f"Creating new configuration for agent: {agent_name}") # Store main configuration success = self._store_agent_config(agent_name, config_data) @@ -112,10 +216,8 @@ def load_agent_configuration(self, agent_name: str) -> AgentConfigResponse: if config_data is None: raise Exception(f"Agent '{agent_name}' not found") - logger.info(f"DEBUG CONFIG LOAD: Raw config data from SSM: {config_data}") - logger.info(f"DEBUG CONFIG LOAD: Data types in SSM config:") - for key, value in config_data.items(): - logger.info(f" {key}: {type(value).__name__} = {value}") + logger.debug("Raw config data loaded from SSM") + logger.debug(f"Data types in SSM config: {list(config_data.keys())}") # Retrieve system prompt if specified system_prompt_name = config_data.get('system_prompt_name', '') @@ -136,9 +238,116 @@ def load_agent_configuration(self, agent_name: str) -> AgentConfigResponse: if 'mcp_servers' not in config_data: config_data['mcp_servers'] = "" - logger.info(f"DEBUG CONFIG LOAD: Final config data before Pydantic model:") - for key, value in config_data.items(): - logger.info(f" {key}: {type(value).__name__} = {value}") + # Convert nested dictionaries to Pydantic models + # This ensures proper deserialization of complex nested structures + from ..models.agent_config import ThinkingConfig, ProviderConfig, ToolConfig + + # Helper function to safely convert nested structures + def convert_to_provider_config(item): + """Safely convert item to ProviderConfig, handling various input types.""" + if isinstance(item, dict): + return ProviderConfig(**item) + elif isinstance(item, ProviderConfig): + return item + else: + logger.warning(f"Unexpected provider config type: {type(item)}, item: {item}") + # Try to handle string or other types gracefully + if isinstance(item, str): + return ProviderConfig(name=item, config={}) + return item + + def convert_to_tool_config(item): + """Safely convert item to ToolConfig, handling various input types.""" + if isinstance(item, dict): + return ToolConfig(**item) + elif isinstance(item, ToolConfig): + return item + else: + logger.warning(f"Unexpected tool config type: {type(item)}, item: {item}") + if isinstance(item, str): + return ToolConfig(name=item, config={}) + return item + + # Convert thinking config - handle missing or invalid data + if 'thinking' in config_data: + if isinstance(config_data['thinking'], dict): + config_data['thinking'] = ThinkingConfig(**config_data['thinking']) + elif not isinstance(config_data['thinking'], ThinkingConfig): + logger.warning(f"Invalid thinking config type: {type(config_data['thinking'])}, using default") + config_data['thinking'] = ThinkingConfig(type="standard", budget_tokens=100000) + else: + config_data['thinking'] = ThinkingConfig(type="standard", budget_tokens=100000) + + # Convert tools list - handle empty, None, or invalid data + if 'tools' in config_data: + if isinstance(config_data['tools'], list): + config_data['tools'] = [convert_to_tool_config(tool) for tool in config_data['tools']] + elif config_data['tools'] is None: + config_data['tools'] = [] + else: + logger.warning(f"Invalid tools type: {type(config_data['tools'])}, using empty list") + config_data['tools'] = [] + else: + config_data['tools'] = [] + + # Convert memory provider details - handle all cases + if 'memory_provider_details' in config_data: + if isinstance(config_data['memory_provider_details'], list): + config_data['memory_provider_details'] = [ + convert_to_provider_config(provider) for provider in config_data['memory_provider_details'] + ] + elif config_data['memory_provider_details'] is None: + config_data['memory_provider_details'] = [] + else: + logger.warning(f"Invalid memory_provider_details type: {type(config_data['memory_provider_details'])}") + config_data['memory_provider_details'] = [] + else: + config_data['memory_provider_details'] = [] + + # Convert knowledge base details - handle all cases + if 'knowledge_base_details' in config_data: + if isinstance(config_data['knowledge_base_details'], list): + config_data['knowledge_base_details'] = [ + convert_to_provider_config(provider) for provider in config_data['knowledge_base_details'] + ] + elif config_data['knowledge_base_details'] is None: + config_data['knowledge_base_details'] = [] + else: + logger.warning(f"Invalid knowledge_base_details type: {type(config_data['knowledge_base_details'])}") + config_data['knowledge_base_details'] = [] + else: + config_data['knowledge_base_details'] = [] + + # Convert observability provider details - handle all cases + if 'observability_provider_details' in config_data: + if isinstance(config_data['observability_provider_details'], list): + config_data['observability_provider_details'] = [ + convert_to_provider_config(provider) for provider in config_data['observability_provider_details'] + ] + elif config_data['observability_provider_details'] is None: + config_data['observability_provider_details'] = [] + else: + logger.warning(f"Invalid observability_provider_details type: {type(config_data['observability_provider_details'])}") + config_data['observability_provider_details'] = [] + else: + config_data['observability_provider_details'] = [] + + # Convert guardrail provider details - handle all cases + if 'guardrail_provider_details' in config_data: + if isinstance(config_data['guardrail_provider_details'], list): + config_data['guardrail_provider_details'] = [ + convert_to_provider_config(provider) for provider in config_data['guardrail_provider_details'] + ] + elif config_data['guardrail_provider_details'] is None: + config_data['guardrail_provider_details'] = [] + else: + logger.warning(f"Invalid guardrail_provider_details type: {type(config_data['guardrail_provider_details'])}") + config_data['guardrail_provider_details'] = [] + else: + config_data['guardrail_provider_details'] = [] + + logger.debug("Final config data converted to Pydantic models") + logger.debug(f"Available config fields: {list(config_data.keys())}") logger.info(f"Successfully loaded configuration for agent: {agent_name}") return AgentConfigResponse(**config_data) @@ -461,7 +670,7 @@ def _store_agent_config(self, agent_name: str, config_data: Dict) -> bool: # Don't fail silently - log the error but still attempt to store for backward compatibility logger.warning(f"Storing potentially incomplete configuration for {agent_name}") else: - logger.info(f"āœ… Configuration validation passed for {agent_name} - conforms to SSM data model") + logger.info(f"Configuration validation passed for {agent_name} - conforms to SSM data model") # Use standardized path from SSM parameter paths config_path = SSMParameterPaths.agent_config(agent_name) @@ -877,7 +1086,8 @@ def update_agent_tools(self, tools_request: AgentToolsUpdateRequest) -> Dict[str raise Exception(f"Agent '{agent_name}' configuration not found") # Update only the tools field - current_config['tools'] = [tool.dict() for tool in tools_request.tools] + # Use model_dump() for Pydantic v2 with proper nested model serialization + current_config['tools'] = [tool.model_dump(mode='json') for tool in tools_request.tools] # Store updated configuration success = self._store_agent_config(agent_name, current_config) diff --git a/application_src/configuration-api/app/services/cloudformation_deployment_service.py b/application_src/configuration-api/app/services/cloudformation_deployment_service.py deleted file mode 100644 index e08d15f..0000000 --- a/application_src/configuration-api/app/services/cloudformation_deployment_service.py +++ /dev/null @@ -1,514 +0,0 @@ -""" -CloudFormation Deployment Service - -Manages dynamic agent stack deployment using CloudFormation API directly. -Replaces subprocess-based CDK deployment with native CloudFormation operations. - -Key Features: -- Downloads CloudFormation templates from S3 -- Creates/updates/deletes agent stacks with proper tagging -- Monitors deployment status with comprehensive error handling -- Provides deployment metadata and outputs -""" - -import logging -import time -import json -from typing import Any -from datetime import datetime, timezone - -import sys -import os -sys.path.append(os.path.join(os.path.dirname(__file__), '../../../../')) -from common.secure_logging_utils import log_exception_safely - -import boto3 -from botocore.exceptions import ClientError - -logger = logging.getLogger(__name__) - - -class CloudFormationDeploymentService: - """Service for managing CloudFormation stack deployments.""" - - def __init__( - self, - region: str, - project_name: str, - template_bucket: str - ): - """ - Initialize CloudFormation deployment service. - - Args: - region: AWS region for deployments - project_name: Project name for stack naming - template_bucket: S3 bucket containing CloudFormation templates - """ - self.region = region - self.project_name = project_name - self.template_bucket = template_bucket - - # Initialize AWS clients - self.cfn_client = boto3.client('cloudformation', region_name=region) - self.s3_client = boto3.client('s3', region_name=region) - - logger.info( - f"Initialized CloudFormation service: region={region}, " - f"project={project_name}, bucket={template_bucket}" - ) - - def deploy_agent_stack( - self, - agent_name: str, - parameters: dict[str, Any], - timeout_minutes: int = 30 - ) -> dict[str, Any]: - """ - Deploy a new agent stack using CloudFormation. - - Args: - agent_name: Name of the agent - parameters: CloudFormation parameters for the stack - timeout_minutes: Maximum time to wait for deployment - - Returns: - Deployment result with stack outputs and metadata - - Raises: - RuntimeError: If deployment fails - """ - stack_name = self._get_stack_name(agent_name) - template_key = "GenericAgentTemplate.json" - - logger.info(f"Starting deployment of agent stack: {stack_name}") - - try: - # Download template from S3 - template_body = self._download_template(template_key) - - # Prepare CloudFormation parameters - cfn_parameters = self._convert_parameters(parameters) - - # Create stack with proper tags - self.cfn_client.create_stack( - StackName=stack_name, - TemplateBody=template_body, - Parameters=cfn_parameters, - Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'], - Tags=[ - {'Key': 'ManagedBy', 'Value': 'ConfigurationAPI'}, - {'Key': 'ProjectName', 'Value': self.project_name}, - {'Key': 'AgentName', 'Value': agent_name}, - {'Key': 'DeployedAt', 'Value': datetime.now(timezone.utc).isoformat()} - ], - TimeoutInMinutes=timeout_minutes - ) - - logger.info(f"CloudFormation stack creation initiated: {stack_name}") - - # Wait for stack creation to complete - result = self._wait_for_stack_complete(stack_name, 'CREATE_COMPLETE', timeout_minutes) - - logger.info(f"Agent stack deployed successfully: {stack_name}") - return result - - except ClientError as e: - log_exception_safely(logger, e, f"Failed to deploy agent stack {stack_name}") - raise RuntimeError(f"Failed to deploy agent stack {stack_name}") from e - - def update_agent_stack( - self, - agent_name: str, - parameters: dict[str, Any], - timeout_minutes: int = 30 - ) -> dict[str, Any]: - """ - Update an existing agent stack. - - Args: - agent_name: Name of the agent - parameters: CloudFormation parameters for the stack - timeout_minutes: Maximum time to wait for update - - Returns: - Update result with stack outputs and metadata - - Raises: - RuntimeError: If update fails - """ - stack_name = self._get_stack_name(agent_name) - template_key = "GenericAgentTemplate.json" - - logger.info(f"Starting update of agent stack: {stack_name}") - - try: - # Download template from S3 - template_body = self._download_template(template_key) - - # Prepare CloudFormation parameters - cfn_parameters = self._convert_parameters(parameters) - - # Update stack - self.cfn_client.update_stack( - StackName=stack_name, - TemplateBody=template_body, - Parameters=cfn_parameters, - Capabilities=['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'] - ) - - logger.info(f"CloudFormation stack update initiated: {stack_name}") - - # Wait for stack update to complete - result = self._wait_for_stack_complete(stack_name, 'UPDATE_COMPLETE', timeout_minutes) - - logger.info(f"Agent stack updated successfully: {stack_name}") - return result - - except ClientError as e: - if 'No updates are to be performed' in str(e): - logger.info(f"No updates needed for stack: {stack_name}") - return self.get_stack_info(agent_name) - - log_exception_safely(logger, e, f"Failed to update agent stack {stack_name}") - raise RuntimeError(f"Failed to update agent stack {stack_name}") from e - - def delete_agent_stack( - self, - agent_name: str, - timeout_minutes: int = 30 - ) -> dict[str, Any]: - """ - Delete an agent stack. - - Args: - agent_name: Name of the agent - timeout_minutes: Maximum time to wait for deletion - - Returns: - Deletion result with metadata - - Raises: - RuntimeError: If deletion fails - """ - stack_name = self._get_stack_name(agent_name) - - logger.info(f"Starting deletion of agent stack: {stack_name}") - - try: - # Delete stack - self.cfn_client.delete_stack(StackName=stack_name) - - logger.info(f"CloudFormation stack deletion initiated: {stack_name}") - - # Wait for stack deletion to complete - self._wait_for_stack_delete(stack_name, timeout_minutes) - - logger.info(f"Agent stack deleted successfully: {stack_name}") - - return { - 'stack_name': stack_name, - 'agent_name': agent_name, - 'status': 'DELETE_COMPLETE', - 'deleted_at': datetime.now(timezone.utc).isoformat() - } - - except ClientError as e: - log_exception_safely(logger, e, f"Failed to delete agent stack {stack_name}") - raise RuntimeError(f"Failed to delete agent stack {stack_name}") from e - - def get_stack_info(self, agent_name: str) -> dict[str, Any]: - """ - Get information about an agent stack. - - Args: - agent_name: Name of the agent - - Returns: - Stack information with outputs and metadata - - Raises: - RuntimeError: If stack not found or error occurs - """ - stack_name = self._get_stack_name(agent_name) - - try: - response = self.cfn_client.describe_stacks(StackName=stack_name) - - if not response.get('Stacks'): - raise RuntimeError(f"Stack not found: {stack_name}") - - stack = response['Stacks'][0] - - return { - 'stack_name': stack_name, - 'agent_name': agent_name, - 'stack_id': stack.get('StackId'), - 'status': stack.get('StackStatus'), - 'creation_time': stack.get('CreationTime').isoformat() if stack.get('CreationTime') else None, - 'last_updated_time': stack.get('LastUpdatedTime').isoformat() if stack.get('LastUpdatedTime') else None, - 'outputs': self._parse_stack_outputs(stack.get('Outputs', [])) - } - - except ClientError as e: - if e.response['Error']['Code'] == 'ValidationError': - raise RuntimeError(f"Stack not found: {stack_name}") from e - - log_exception_safely(logger, e, f"Failed to get stack info for {stack_name}") - raise RuntimeError(f"Failed to get stack info for {stack_name}") from e - - def list_agent_stacks(self) -> list[dict[str, Any]]: - """ - List all agent stacks managed by this service. - - Returns: - List of stack summaries - """ - try: - stacks = [] - paginator = self.cfn_client.get_paginator('list_stacks') - - for page in paginator.paginate( - StackStatusFilter=[ - 'CREATE_COMPLETE', - 'UPDATE_COMPLETE', - 'UPDATE_ROLLBACK_COMPLETE' - ] - ): - for stack in page['StackSummaries']: - stack_name = stack['StackName'] - - # Only include stacks managed by this project - if stack_name.startswith(f"{self.project_name}-agent-"): - stacks.append({ - 'stack_name': stack_name, - 'agent_name': self._extract_agent_name(stack_name), - 'status': stack['StackStatus'], - 'creation_time': stack['CreationTime'].isoformat() if stack.get('CreationTime') else None, - 'last_updated_time': stack.get('LastUpdatedTime').isoformat() if stack.get('LastUpdatedTime') else None - }) - - return stacks - - except ClientError as e: - log_exception_safely(logger, e, "Failed to list agent stacks") - raise RuntimeError("Failed to list agent stacks") from e - - def _get_stack_name(self, agent_name: str) -> str: - """Generate CloudFormation stack name for agent.""" - return f"{self.project_name}-agent-{agent_name}" - - def _extract_agent_name(self, stack_name: str) -> str: - """Extract agent name from stack name.""" - prefix = f"{self.project_name}-agent-" - if stack_name.startswith(prefix): - return stack_name[len(prefix):] - return stack_name - - def _download_template(self, template_key: str) -> str: - """ - Download CloudFormation template from S3. - - Args: - template_key: S3 object key for template - - Returns: - Template body as string - """ - try: - response = self.s3_client.get_object( - Bucket=self.template_bucket, - Key=template_key - ) - - template_body = response['Body'].read().decode('utf-8') - logger.info(f"Downloaded template: {template_key}") - - return template_body - - except ClientError as e: - log_exception_safely(logger, e, f"Failed to download template {template_key}") - raise RuntimeError(f"Failed to download template {template_key}") from e - - def _convert_parameters(self, parameters: dict[str, Any]) -> list[dict[str, str]]: - """ - Convert dictionary parameters to CloudFormation format. - - Args: - parameters: Dictionary of parameter name -> value - - Returns: - List of CloudFormation parameter dicts - """ - cfn_parameters = [] - - for key, value in parameters.items(): - cfn_parameters.append({ - 'ParameterKey': key, - 'ParameterValue': str(value) - }) - - return cfn_parameters - - def _parse_stack_outputs(self, outputs: list[dict[str, str]]) -> dict[str, str]: - """ - Parse CloudFormation stack outputs into a dictionary. - - Args: - outputs: List of output dicts from CloudFormation - - Returns: - Dictionary of output key -> value - """ - return { - output['OutputKey']: output['OutputValue'] - for output in outputs - } - - def _wait_for_stack_complete( - self, - stack_name: str, - expected_status: str, - timeout_minutes: int - ) -> dict[str, Any]: - """ - Wait for stack operation to complete. - - Args: - stack_name: Name of the stack - expected_status: Expected completion status - timeout_minutes: Maximum time to wait - - Returns: - Stack information after completion - - Raises: - RuntimeError: If operation fails or times out - """ - start_time = time.time() - timeout_seconds = timeout_minutes * 60 - check_interval = 10 # seconds - - while True: - elapsed = time.time() - start_time - - if elapsed > timeout_seconds: - raise RuntimeError( - f"Stack operation timed out after {timeout_minutes} minutes: {stack_name}" - ) - - try: - response = self.cfn_client.describe_stacks(StackName=stack_name) - - if not response.get('Stacks'): - raise RuntimeError(f"Stack not found: {stack_name}") - - stack = response['Stacks'][0] - status = stack['StackStatus'] - - logger.info(f"Stack {stack_name} status: {status}") - - # Check if completed successfully - if status == expected_status: - return { - 'stack_name': stack_name, - 'agent_name': self._extract_agent_name(stack_name), - 'status': status, - 'outputs': self._parse_stack_outputs(stack.get('Outputs', [])) - } - - # Check for failure states (including ROLLBACK_COMPLETE) - if (status.endswith('_FAILED') or - status == 'ROLLBACK_COMPLETE' or - status == 'ROLLBACK_FAILED' or - status == 'DELETE_FAILED'): - error_msg = self._get_stack_error_reason(stack_name) - logger.error(f"Stack {stack_name} failed with status {status}: {error_msg}") - raise RuntimeError( - f"Stack operation failed with status {status}: {error_msg}" - ) - - # Still in progress, wait before checking again - time.sleep(check_interval) - - except ClientError as e: - log_exception_safely(logger, e, "Error checking stack status") - raise RuntimeError("Error checking stack status") from e - - def _wait_for_stack_delete(self, stack_name: str, timeout_minutes: int) -> None: - """ - Wait for stack deletion to complete. - - Args: - stack_name: Name of the stack - timeout_minutes: Maximum time to wait - - Raises: - RuntimeError: If deletion fails or times out - """ - start_time = time.time() - timeout_seconds = timeout_minutes * 60 - check_interval = 10 # seconds - - while True: - elapsed = time.time() - start_time - - if elapsed > timeout_seconds: - raise RuntimeError( - f"Stack deletion timed out after {timeout_minutes} minutes: {stack_name}" - ) - - try: - response = self.cfn_client.describe_stacks(StackName=stack_name) - - if not response.get('Stacks'): - # Stack no longer exists - deletion complete - logger.info(f"Stack deleted successfully: {stack_name}") - return - - stack = response['Stacks'][0] - status = stack['StackStatus'] - - logger.info(f"Stack {stack_name} deletion status: {status}") - - # Check for failure states - if status == 'DELETE_FAILED': - error_msg = self._get_stack_error_reason(stack_name) - raise RuntimeError( - f"Stack deletion failed: {error_msg}" - ) - - # Still deleting, wait before checking again - time.sleep(check_interval) - - except ClientError as e: - if e.response['Error']['Code'] == 'ValidationError': - # Stack no longer exists - deletion complete - logger.info(f"Stack deleted successfully: {stack_name}") - return - - log_exception_safely(logger, e, "Error checking stack deletion status") - raise RuntimeError("Error checking stack deletion status") from e - - def _get_stack_error_reason(self, stack_name: str) -> str: - """ - Get detailed error reason for stack operation failure. - - Args: - stack_name: Name of the stack - - Returns: - Error message describing the failure - """ - try: - response = self.cfn_client.describe_stack_events(StackName=stack_name) - - # Find failed events - for event in response['StackEvents']: - if event['ResourceStatus'].endswith('_FAILED'): - return event.get('ResourceStatusReason', 'Unknown error') - - return "No detailed error information available" - - except ClientError: - return "Failed to retrieve error details" diff --git a/application_src/configuration-api/app/services/deployment_service.py b/application_src/configuration-api/app/services/deployment_service.py index d694e81..9eb3bd3 100644 --- a/application_src/configuration-api/app/services/deployment_service.py +++ b/application_src/configuration-api/app/services/deployment_service.py @@ -39,9 +39,16 @@ def __init__(self): # Get region from environment variable with fallback self.region = os.environ.get('AWS_REGION', os.environ.get('AWS_DEFAULT_REGION', 'us-east-1')) self.cloudformation = boto3.client('cloudformation', region_name=self.region) - # Get project name from environment or use default - self.project_name = os.environ.get('PROJECT_NAME', 'genai-box') + self.s3 = boto3.client('s3', region_name=self.region) + + # Get project name and account for S3 bucket construction + self.project_name = os.environ.get('PROJECT_NAME', 'ai-platform') + sts = boto3.client('sts') + self.account_id = sts.get_caller_identity()['Account'] + self.template_bucket_name = f"{self.project_name}-templates-{self.account_id}-{self.region}" + logger.info(f"DeploymentService initialized for region: {self.region}, project: {self.project_name}") + logger.info(f"Template bucket: {self.template_bucket_name}") except NoCredentialsError: logger.error("AWS credentials not found") raise ValueError("AWS credentials not configured") @@ -60,16 +67,14 @@ async def get_project_name(self) -> str: async def create_agent_stack( self, - source_stack_name: str, new_agent_name: str, new_stack_name: str, model_config: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """ - Create a new agent stack from an existing template. + Create a new agent stack using the template from S3. Args: - source_stack_name: Name of the existing stack to copy template from new_agent_name: Name for the new agent (AgentName parameter) new_stack_name: Name for the new CloudFormation stack model_config: Model configuration (if not provided, will read from SSM) @@ -78,58 +83,119 @@ async def create_agent_stack( Dictionary containing stack creation information Raises: - ValueError: If source stack not found or validation fails + ValueError: If template not found or validation fails Exception: If stack creation fails """ try: - logger.info(f"Creating agent stack '{new_stack_name}' from source '{source_stack_name}'") + logger.info(f"Creating agent stack '{new_stack_name}' for agent '{new_agent_name}'") # Simplified approach: Agents read all configuration from SSM # No need to inject environment variables - cleaner and more reliable - # Get the template from the source stack - template_body = await self._get_stack_template(source_stack_name) + # Get the template from S3 (deployed by CDK template-storage stack) + template_body = await self._get_template_from_s3("GenericAgentTemplate.json") - # Only modify the AgentName parameter - agents read everything else from SSM - modified_template = self._modify_agent_name_parameter(template_body, new_agent_name) + # No template modification needed - parameters are explicitly required + # AgentName and ImageTag must be provided as CloudFormation parameters logger.info(f"Agent {new_agent_name} will read all configuration from SSM parameter: /agent/{new_agent_name}/config") - # Get the original stack's tags and capabilities - source_stack_info = await self._get_stack_info(source_stack_name) - # Prepare parameters for the new stack + logger.debug(f"Building CloudFormation parameters for agent: {new_agent_name}") + parameters = [ { 'ParameterKey': 'AgentName', 'ParameterValue': new_agent_name } ] + logger.debug(f"Set AgentName parameter: {new_agent_name}") + + # CRITICAL: Retrieve and set ImageTag from SSM to ensure correct image version + # ImageTag parameter is required - no default value in template + logger.info("IMAGE TAG RETRIEVAL FOR AGENT CREATION") + + image_uri = None + try: + logger.info(f"Retrieving image URI from SSM Parameter Store: /{self.project_name}/agent/image-uri") + + image_uri = self._get_image_uri_from_ssm() + + logger.info(f"Successfully retrieved image URI from SSM: {image_uri}") + + # Extract and log the tag portion + if ':' in image_uri: + tag_portion = image_uri.split(':')[-1] + logger.debug(f"Extracted tag: {tag_portion}") + + # Validate that we don't have "latest" in the image URI + if tag_portion.lower() == 'latest': + logger.error("SSM parameter contains 'latest' tag!") + logger.error("This indicates the ECR image was not properly tagged with SHA256") + logger.error("The CDK deployment may not have completed successfully") + raise ValueError("SSM parameter contains 'latest' tag instead of SHA256 hash") + else: + logger.error("No ':' found in image URI from SSM") + logger.error(f"Invalid image URI format: {image_uri}") + raise ValueError("Invalid image URI format - missing tag separator") + + # IMPORTANT: Pass the FULL image URI, not just the tag + # The CloudFormation template expects the complete URI with repository and SHA256 tag + parameters.append({ + 'ParameterKey': 'ImageTag', + 'ParameterValue': image_uri + }) + + logger.info("Successfully added ImageTag to CloudFormation parameters") + logger.debug(f"CloudFormation ImageTag parameter value: {image_uri}") + + except Exception as e: + logger.error(f"Failed to retrieve ImageTag from SSM: {e}") + logger.error(f"Error type: {type(e).__name__}") + + # CRITICAL: Do not proceed without a valid ImageTag - this would cause "latest" to be used + if image_uri and 'latest' in image_uri.lower(): + logger.error("Refusing to create agent with 'latest' image tag!") + logger.error("This would create an unstable deployment") + raise ValueError("Cannot create agent with 'latest' image tag - please ensure CDK deployment completed successfully") + + logger.error("Refusing to create agent WITHOUT ImageTag parameter") + logger.error("ImageTag parameter is required - no default value in template") + logger.error("Agent creation aborted to prevent deployment failure!") + + # Re-raise the exception to prevent agent creation with wrong image + raise ValueError(f"ImageTag retrieval failed: {e}. Cannot create agent without proper image tag.") + + # Log final parameters before CloudFormation call + logger.info("Final CloudFormation parameters for create_stack") + logger.info(f"Stack Name: {new_stack_name}") + logger.info(f"Total parameters: {len(parameters)}") + for i, param in enumerate(parameters, 1): + logger.debug(f" {i}. {param['ParameterKey']} = {param['ParameterValue'][:100]}...") # Truncate long values # Create the new stack create_params = { 'StackName': new_stack_name, - 'TemplateBody': json.dumps(modified_template), + 'TemplateBody': json.dumps(template_body), 'Parameters': parameters, 'Capabilities': ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'], 'Tags': [ {'Key': 'ManagedBy', 'Value': 'ConfigurationAPI'}, {'Key': 'AgentName', 'Value': new_agent_name}, - {'Key': 'SourceStack', 'Value': source_stack_name}, + {'Key': 'TemplateSource', 'Value': 'S3'}, {'Key': 'CreatedAt', 'Value': datetime.utcnow().isoformat()} ] } - # Add original tags (excluding system tags) - if source_stack_info.get('tags'): - for tag in source_stack_info['tags']: - if not tag['Key'].startswith('aws:'): - create_params['Tags'].append(tag) - + logger.info("Calling CloudFormation CreateStack API...") response = self.cloudformation.create_stack(**create_params) stack_id = response['StackId'] - logger.info(f"Successfully created stack '{new_stack_name}' with ID: {stack_id}") + logger.info("Stack creation initiated successfully") + logger.info(f"Stack Name: {new_stack_name}") + logger.info(f"Stack ID: {stack_id}") + logger.info(f"Agent Name: {new_agent_name}") + logger.info(f"Status: CREATE_IN_PROGRESS") return { 'stack_name': new_stack_name, @@ -183,33 +249,48 @@ async def _get_stack_template(self, stack_name: str) -> Dict[str, Any]: log_exception_safely(logger, e, "Error getting stack template") raise - def _modify_agent_name_parameter(self, template: Dict[str, Any], new_agent_name: str) -> Dict[str, Any]: + async def _get_template_from_s3(self, template_key: str = "GenericAgentTemplate.json") -> Dict[str, Any]: """ - Modify the AgentName parameter default value in the template. + Get the latest CloudFormation template from S3. + + This fetches the template that was deployed by CDK to the template storage bucket. + This ensures stack updates pick up the latest CDK-generated template changes. Args: - template: Original CloudFormation template - new_agent_name: New agent name to set as default + template_key: S3 key for the template file Returns: - Modified CloudFormation template + CloudFormation template as a dictionary + + Raises: + ValueError: If template not found in S3 + Exception: If unable to retrieve template """ try: - # Create a copy of the template to avoid modifying the original - modified_template = json.loads(json.dumps(template)) + logger.info(f"Retrieving template from S3: s3://{self.template_bucket_name}/{template_key}") - # Update the AgentName parameter default value - if 'Parameters' in modified_template and 'AgentName' in modified_template['Parameters']: - modified_template['Parameters']['AgentName']['Default'] = new_agent_name - logger.info(f"Updated AgentName parameter default to: {new_agent_name}") - else: - logger.warning("AgentName parameter not found in template") + response = self.s3.get_object( + Bucket=self.template_bucket_name, + Key=template_key + ) - return modified_template + template_body = response['Body'].read().decode('utf-8') + template = json.loads(template_body) + logger.info(f"Successfully retrieved template from S3: {template_key}") + return template + + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code in ['NoSuchKey', 'NoSuchBucket']: + raise ValueError(f"Template '{template_key}' not found in S3 bucket '{self.template_bucket_name}'") + else: + logger.error(f"S3 error retrieving template: {error_code}") + raise Exception(f"Failed to retrieve template from S3: {e.response['Error']['Message']}") except Exception as e: - log_exception_safely(logger, e, "Error modifying template") - raise RuntimeError("Failed to modify template") + log_exception_safely(logger, e, "Error getting template from S3") + raise + def _inject_model_environment_variables( self, @@ -354,14 +435,22 @@ async def get_stack_status(self, stack_name: str) -> Dict[str, Any]: for output in stack_info.get('outputs', []): outputs[output['OutputKey']] = output['OutputValue'] + # Convert datetime objects to ISO format strings for JSON serialization + creation_time = None + if stack_info.get('creation_time'): + creation_time = stack_info['creation_time'].isoformat() if hasattr(stack_info['creation_time'], 'isoformat') else str(stack_info['creation_time']) + + last_updated_time = None + if stack_info.get('last_updated_time'): + last_updated_time = stack_info['last_updated_time'].isoformat() if hasattr(stack_info['last_updated_time'], 'isoformat') else str(stack_info['last_updated_time']) + return { 'stack_name': stack_info['stack_name'], 'stack_id': stack_info['stack_id'], 'status': stack_info['status'], - 'agent_name': agent_name, - 'created_at': stack_info.get('creation_time'), - 'updated_at': stack_info.get('last_updated_time'), - 'outputs': outputs if outputs else None + 'creation_time': creation_time, + 'last_updated_time': last_updated_time, + 'outputs': outputs if outputs else {} } except Exception as e: @@ -451,14 +540,25 @@ async def list_agent_stacks(self) -> List[Dict[str, Any]]: log_exception_safely(logger, e, "Error listing agent stacks") raise - async def find_agent_stack_by_name(self, agent_name: str) -> Optional[Dict[str, Any]]: + def get_stack_name_from_agent(self, agent_name: str) -> str: """ - Find a CloudFormation stack for a specific agent using multiple naming patterns. + Get CloudFormation stack name from agent name using consistent naming pattern. - This method checks multiple possible naming patterns: - 1. ai-platform-agent-{agent-name} (actual pattern used by the system) - 2. ai-platform-{agent-name}-stack (legacy pattern) - 3. {project-name}-{agent-name}-stack (fallback pattern) + Standard pattern: {project_name}-agent-{agent_name} + + Args: + agent_name: Name of the agent + + Returns: + CloudFormation stack name + """ + # Convert underscores to hyphens for stack naming consistency + stack_agent_name = agent_name.replace('_', '-') + return f"{self.project_name}-agent-{stack_agent_name}" + + async def find_agent_stack_by_name(self, agent_name: str) -> Optional[Dict[str, Any]]: + """ + Find a CloudFormation stack for a specific agent using standard naming pattern. Args: agent_name: Name of the agent to find stack for @@ -467,70 +567,272 @@ async def find_agent_stack_by_name(self, agent_name: str) -> Optional[Dict[str, Stack information if found, None otherwise """ try: - # Convert agent name to the expected stack format - stack_agent_name = agent_name.replace('_', '-') - - # Try multiple naming patterns - possible_stack_names = [ - f"ai-platform-agent-{stack_agent_name}", # Actual pattern from AWS CLI output - f"{self.project_name}-agent-{stack_agent_name}", # Project-based agent pattern - f"ai-platform-{stack_agent_name}-stack", # Original expected pattern - f"{self.project_name}-{stack_agent_name}-stack" # Project-based legacy pattern - ] + # Use consistent naming pattern + expected_stack_name = self.get_stack_name_from_agent(agent_name) - logger.info(f"Looking for agent stack using multiple patterns: {possible_stack_names}") + logger.info(f"Looking for agent stack: {expected_stack_name}") - for expected_stack_name in possible_stack_names: - try: - logger.info(f"Trying stack name pattern: {expected_stack_name}") - - # Try to get the stack directly using this pattern - stack_info = await self._get_stack_info(expected_stack_name) + try: + # Get the stack directly using standard pattern + stack_info = await self._get_stack_info(expected_stack_name) + + # Verify this stack has the correct AgentName parameter + stack_agent_name_param = None + for param in stack_info.get('parameters', []): + if param['ParameterKey'] == 'AgentName': + stack_agent_name_param = param['ParameterValue'] + break + + if stack_agent_name_param == agent_name: + logger.info(f"Found agent stack: {expected_stack_name} with AgentName={agent_name}") - # Verify this stack has the correct AgentName parameter - stack_agent_name_param = None - for param in stack_info.get('parameters', []): - if param['ParameterKey'] == 'AgentName': - stack_agent_name_param = param['ParameterValue'] + # Check if it's managed by our API + managed_by_api = False + for tag in stack_info.get('tags', []): + if tag['Key'] == 'ManagedBy' and tag['Value'] == 'ConfigurationAPI': + managed_by_api = True break - if stack_agent_name_param == agent_name: - logger.info(f"āœ… Found exact match: {expected_stack_name} with AgentName={agent_name}") - - # Check if it's managed by our API - managed_by_api = False - for tag in stack_info.get('tags', []): - if tag['Key'] == 'ManagedBy' and tag['Value'] == 'ConfigurationAPI': - managed_by_api = True - break - - return { - 'stack_name': stack_info['stack_name'], - 'stack_id': stack_info['stack_id'], - 'status': stack_info['status'], - 'agent_name': stack_agent_name_param, - 'managed_by_api': managed_by_api, - 'parameters': stack_info.get('parameters', []), - 'created_at': stack_info.get('creation_time'), - 'updated_at': stack_info.get('last_updated_time') - } - else: - logger.warning(f"Stack {expected_stack_name} found but AgentName parameter mismatch: expected '{agent_name}', got '{stack_agent_name_param}'") - continue - - except ValueError: - # Stack not found with this pattern, try next pattern - logger.debug(f"No stack found with pattern: {expected_stack_name}") - continue - - # If we get here, no stack was found with any pattern - logger.info(f"No stack found for agent '{agent_name}' using any naming pattern") - return None + return { + 'stack_name': stack_info['stack_name'], + 'stack_id': stack_info['stack_id'], + 'status': stack_info['status'], + 'agent_name': stack_agent_name_param, + 'managed_by_api': managed_by_api, + 'parameters': stack_info.get('parameters', []), + 'created_at': stack_info.get('creation_time'), + 'updated_at': stack_info.get('last_updated_time') + } + else: + logger.warning(f"Stack {expected_stack_name} found but AgentName parameter mismatch: expected '{agent_name}', got '{stack_agent_name_param}'") + return None + + except ValueError: + # Stack not found with standard pattern + logger.info(f"No stack found for agent '{agent_name}' using pattern: {expected_stack_name}") + return None except Exception as e: log_exception_safely(logger, e, f"Error finding agent stack for '{agent_name}'") return None + async def update_agent_stack( + self, + agent_name: str, + parameters: Optional[Dict[str, Any]] = None + ) -> Dict[str, Any]: + """ + Update an existing agent stack with new parameters or template changes. + + This method: + 1. Finds the CloudFormation stack for the agent + 2. Retrieves the current template + 3. Updates the stack with new parameters (if provided) + + Args: + agent_name: Name of the agent whose stack to update + parameters: Optional dictionary of CloudFormation parameters to update + + Returns: + Dictionary containing update information + + Raises: + ValueError: If agent stack not found + Exception: If stack update fails + """ + try: + logger.info(f"Updating agent stack for: {agent_name}") + + # Find the stack for this agent + stack_info = await self.find_agent_stack_by_name(agent_name) + + if not stack_info: + raise ValueError(f"No CloudFormation stack found for agent '{agent_name}'") + + stack_name = stack_info['stack_name'] + logger.info(f"Found stack '{stack_name}' for agent '{agent_name}'") + + # Fetch the latest template from S3 to pick up any CDK template changes + # This ensures updates include latest infrastructure improvements + logger.info("Fetching latest template from S3 for stack update") + template_from_s3 = await self._get_template_from_s3("GenericAgentTemplate.json") + + # No template modification - parameters must be explicitly provided + # AgentName and ImageTag are required parameters with no defaults + logger.info("Using template as-is - no defaults to modify") + + # Prepare update parameters with unmodified template + update_params = { + 'StackName': stack_name, + 'TemplateBody': json.dumps(template_from_s3), + 'Capabilities': ['CAPABILITY_IAM', 'CAPABILITY_NAMED_IAM'] + } + + # Build parameters list, preserving existing parameters and merging with new ones + # CRITICAL: AgentName must ALWAYS be explicitly set to prevent CloudFormation from using template default + + # Get existing parameters from stack + existing_params = { + param['ParameterKey']: param['ParameterValue'] + for param in stack_info.get('parameters', []) + } + + logger.info(f"Existing parameters from stack: {existing_params}") + + # If new parameters provided, merge them with existing ones + if parameters: + existing_params.update(parameters) + logger.info(f"Merged {len(parameters)} new parameters with existing parameters") + + # CRITICAL: Always ensure AgentName is set to the correct agent name + # This is the most important parameter and must never revert to template default + existing_params['AgentName'] = agent_name + logger.info(f"Explicitly set AgentName parameter to: {agent_name}") + + # CRITICAL: Retrieve and set ImageTag from SSM to ensure ECS updates + # The existing parameter may contain a CDK token like ${Token[TOKEN.262]} + # which must be replaced with the actual full image URI with SHA256 tag from SSM + logger.debug(f"Current ImageTag parameter value before SSM retrieval: {existing_params.get('ImageTag', 'NOT SET')}") + try: + logger.info("Retrieving image URI from SSM for ImageTag parameter...") + image_uri = self._get_image_uri_from_ssm() + logger.debug(f"Full image URI from SSM: {image_uri}") + + # IMPORTANT: Pass the FULL image URI, not just the tag + # The CloudFormation template expects the complete URI with repository and SHA256 tag + logger.debug(f"Replacing ImageTag parameter: '{existing_params.get('ImageTag')}' -> '{image_uri}'") + existing_params['ImageTag'] = image_uri + logger.info(f"Successfully updated ImageTag parameter from SSM: {image_uri}") + except Exception as e: + logger.error(f"Failed to retrieve ImageTag from SSM: {e}") + logger.warning(f"Proceeding with existing ImageTag value: {existing_params.get('ImageTag', 'NOT SET')}") + logger.warning("ECS may not update if ImageTag hasn't changed") + + # Build CloudFormation parameter list from merged parameters + # Put AgentName FIRST to emphasize its importance + cfn_parameters = [ + { + 'ParameterKey': 'AgentName', + 'ParameterValue': agent_name + } + ] + + # Add all other parameters + for key, value in existing_params.items(): + if key != 'AgentName': # Skip AgentName since we already added it first + cfn_parameters.append({ + 'ParameterKey': key, + 'ParameterValue': str(value) + }) + + update_params['Parameters'] = cfn_parameters + logger.info(f"Final parameters for CloudFormation update (AgentName={agent_name} is first): {[p['ParameterKey'] for p in cfn_parameters]}") + + # Add update tags + existing_tags = stack_info.get('tags', []) + update_tags = [tag for tag in existing_tags if not tag['Key'].startswith('aws:')] + update_tags.append({ + 'Key': 'LastUpdatedBy', + 'Value': 'ConfigurationAPI' + }) + update_tags.append({ + 'Key': 'LastUpdatedAt', + 'Value': datetime.utcnow().isoformat() + }) + update_params['Tags'] = update_tags + + # Execute stack update + # Note: EnableTerminationProtection is only valid for create_stack, not update_stack + try: + response = self.cloudformation.update_stack(**update_params) + stack_id = response['StackId'] + + logger.info(f"Successfully initiated stack update for '{stack_name}' (with ForceNewDeployment)") + + return { + 'stack_name': stack_name, + 'stack_id': stack_id, + 'status': 'UPDATE_IN_PROGRESS', + 'agent_name': agent_name, + 'message': f'Stack update initiated for agent {agent_name} - ECS service will force new deployment' + } + + except ClientError as e: + error_code = e.response['Error']['Code'] + error_message = e.response['Error']['Message'] + + # Handle "No updates are to be performed" case gracefully + if error_code == 'ValidationError' and 'No updates are to be performed' in error_message: + logger.info(f"No updates needed for stack '{stack_name}'") + return { + 'stack_name': stack_name, + 'stack_id': stack_info['stack_id'], + 'status': stack_info['status'], + 'agent_name': agent_name, + 'message': 'Stack is already up to date - no changes needed' + } + else: + logger.error(f"CloudFormation error: {error_code} - {error_message}") + raise Exception(f"Failed to update stack: {error_message}") + + except ValueError: + # Re-raise ValueError for agent not found + raise + except Exception as e: + log_exception_safely(logger, e, f"Error updating agent stack for '{agent_name}'") + raise + + def _get_image_uri_from_ssm(self) -> str: + """ + Retrieve the agent image URI from SSM Parameter Store. + + This retrieves the SHA256-tagged image URI that was stored during + the CDK deployment, ensuring ECS tasks always pull the correct image version. + + Returns: + Image URI with SHA256 tag (e.g., "123456789.dkr.ecr.us-east-1.amazonaws.com/repo:sha256tag") + + Raises: + Exception: If parameter not found or retrieval fails + """ + try: + # Get SSM client + ssm = boto3.client('ssm', region_name=self.region) + + # Parameter name where CDK stores the image URI + # This matches the actual parameter created by template_storage stack: + # ssm.StringParameter(parameter_name=f"/{project_name}/agent/image-uri", ...) + parameter_name = f"/{self.project_name}/agent/image-uri" + + logger.debug(f"Fetching SSM parameter: {parameter_name}") + response = ssm.get_parameter(Name=parameter_name) + image_uri = response['Parameter']['Value'] + + logger.info(f"Successfully retrieved image URI from SSM: {image_uri}") + + # Log the expected tag extraction for debugging + if ':' in image_uri: + expected_tag = image_uri.split(':')[-1] + logger.debug(f"Expected tag after extraction: {expected_tag}") + else: + logger.warning("Image URI doesn't contain ':' separator - will default to 'latest'") + + return image_uri + + except ClientError as e: + error_code = e.response['Error']['Code'] + if error_code == 'ParameterNotFound': + logger.error(f"SSM parameter not found: {parameter_name}") + logger.error("Ensure CDK deployment completed successfully and created this parameter") + logger.error(f"Check if parameter exists with: aws ssm get-parameter --name {parameter_name}") + else: + logger.error(f"AWS error retrieving SSM parameter: {error_code}") + raise Exception(f"Failed to retrieve image URI from SSM: {e.response['Error']['Message']}") + except Exception as e: + logger.error("Unexpected error retrieving image URI from SSM") + log_exception_safely(logger, e, "Error retrieving image URI from SSM") + raise + async def delete_stack(self, stack_name: str) -> Dict[str, Any]: """ Delete a CloudFormation stack. diff --git a/application_src/configuration-api/app/services/parameter_initialization.py b/application_src/configuration-api/app/services/parameter_initialization.py index 754c6e5..a88bfb6 100644 --- a/application_src/configuration-api/app/services/parameter_initialization.py +++ b/application_src/configuration-api/app/services/parameter_initialization.py @@ -259,13 +259,13 @@ def _get_default_agent_config(self, agent_name: str) -> Dict[str, Any]: ) # Validate the configuration before returning - validation_result = SSMDataValidator.validate_agent_configuration(config_model.dict()) + validation_result = SSMDataValidator.validate_agent_configuration(config_model.model_dump(mode='json')) if not validation_result["valid"]: logger.error(f"Configuration from development.yaml is invalid: {validation_result['errors']}") raise ValueError(f"Invalid configuration from development.yaml: {validation_result['errors']}") logger.info(f"āœ… Generated valid configuration for {agent_name} using ONLY development.yaml values") - return config_model.dict() + return config_model.model_dump(mode='json') def _get_default_supervisor_config(self, supervisor_name: str) -> Dict[str, Any]: """Get default supervisor agent configuration using ONLY configuration values from development.yaml.""" diff --git a/application_src/configuration-api/app/utils/dependencies.py b/application_src/configuration-api/app/utils/dependencies.py index fdb9f74..421ed4e 100644 --- a/application_src/configuration-api/app/utils/dependencies.py +++ b/application_src/configuration-api/app/utils/dependencies.py @@ -10,7 +10,6 @@ from ..services import SSMService, DiscoveryService, AgentConfigService from ..services.deployment_service import DeploymentService -from ..services.cloudformation_deployment_service import CloudFormationDeploymentService @lru_cache() @@ -67,26 +66,3 @@ def get_deployment_service() -> DeploymentService: Configured Deployment service instance """ return DeploymentService() - - -@lru_cache() -def get_cloudformation_deployment_service() -> CloudFormationDeploymentService: - """ - Get CloudFormation Deployment service instance. - - Returns: - Configured CloudFormation Deployment service instance - """ - region = os.environ.get('AWS_REGION', 'us-east-1') - # Use ai-platform as project name to match CDK stack naming convention - project_name = os.environ.get('PROJECT_NAME', 'ai-platform') - template_bucket = os.environ.get('S3_TEMPLATE_BUCKET') - - if not template_bucket: - raise ValueError("S3_TEMPLATE_BUCKET environment variable is required") - - return CloudFormationDeploymentService( - region=region, - project_name=project_name, - template_bucket=template_bucket - ) diff --git a/application_src/ui-react-cloudscape/package-lock.json b/application_src/ui-react-cloudscape/package-lock.json index 940058b..6e83ffd 100644 --- a/application_src/ui-react-cloudscape/package-lock.json +++ b/application_src/ui-react-cloudscape/package-lock.json @@ -17,10 +17,12 @@ "axios": "^1.12.2", "concurrently": "7.6.0", "cors": "^2.8.5", + "dompurify": "^3.3.0", "express": "^4.21.2", "express-session": "^1.18.2", "helmet": "^8.1.0", "i18next": "23.7.16", + "marked": "^16.4.1", "react": "18.2.0", "react-dom": "18.2.0", "react-i18next": "14.0.0", @@ -11697,6 +11699,14 @@ "url": "https://github.com/fb55/domhandler?sponsor=1" } }, + "node_modules/dompurify": { + "version": "3.3.0", + "resolved": "https://registry.npmjs.org/dompurify/-/dompurify-3.3.0.tgz", + "integrity": "sha512-r+f6MYR1gGN1eJv0TVQbhA7if/U7P87cdPl3HN5rikqaBSBxLiCb/b9O+2eG0cxz0ghyU+mU1QkbsOwERMYlWQ==", + "optionalDependencies": { + "@types/trusted-types": "^2.0.7" + } + }, "node_modules/domutils": { "version": "2.8.0", "resolved": "https://registry.npmjs.org/domutils/-/domutils-2.8.0.tgz", @@ -16170,6 +16180,17 @@ "tmpl": "1.0.5" } }, + "node_modules/marked": { + "version": "16.4.1", + "resolved": "https://registry.npmjs.org/marked/-/marked-16.4.1.tgz", + "integrity": "sha512-ntROs7RaN3EvWfy3EZi14H4YxmT6A5YvywfhO+0pm+cH/dnSQRmdAmoFIc3B9aiwTehyk7pESH4ofyBY+V5hZg==", + "bin": { + "marked": "bin/marked.js" + }, + "engines": { + "node": ">= 20" + } + }, "node_modules/math-intrinsics": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/math-intrinsics/-/math-intrinsics-1.1.0.tgz", diff --git a/application_src/ui-react-cloudscape/package.json b/application_src/ui-react-cloudscape/package.json index e7329e2..0dc1b5f 100644 --- a/application_src/ui-react-cloudscape/package.json +++ b/application_src/ui-react-cloudscape/package.json @@ -12,10 +12,12 @@ "axios": "^1.12.2", "concurrently": "7.6.0", "cors": "^2.8.5", + "dompurify": "^3.3.0", "express": "^4.21.2", "express-session": "^1.18.2", "helmet": "^8.1.0", "i18next": "23.7.16", + "marked": "^16.4.1", "react": "18.2.0", "react-dom": "18.2.0", "react-i18next": "14.0.0", diff --git a/application_src/ui-react-cloudscape/server.js b/application_src/ui-react-cloudscape/server.js index 36faccb..2bab564 100644 --- a/application_src/ui-react-cloudscape/server.js +++ b/application_src/ui-react-cloudscape/server.js @@ -113,6 +113,32 @@ const safeUrlEncode = (str) => { .substring(0, 500); // Limit length to prevent log injection }; +// Safe error serialization helper to avoid circular reference issues +const safeErrorSerialize = (error) => { + if (!error) return 'Unknown error'; + + try { + // Only serialize safe parts of error object + const safeError = { + message: error.message, + name: error.name, + status: error.response?.status, + statusText: error.response?.statusText, + data: error.response?.data, + config: error.config ? { + method: error.config.method, + url: error.config.url, + timeout: error.config.timeout + } : undefined + }; + + return JSON.stringify(safeError, null, 2); + } catch (serializeError) { + // Fallback if even safe serialization fails + return `Error: ${error.message || 'Unknown'}, Status: ${error.response?.status || 'Unknown'}`; + } +}; + // Enhanced security: Validate agent name parameter const validateAgentName = (agentName) => { if (!agentName || typeof agentName !== 'string') { @@ -736,7 +762,7 @@ app.post('/api/config/system-prompts/create/:agentName', async (req, res) => { } console.log(`[PROXY] POST ${CONFIGURATION_API_ENDPOINT}/config/system-prompts/create/${safeUrlEncode(agentName)}`); - console.log('[PROXY] šŸ” Forwarding Authorization header for system prompt creation'); + console.log('[PROXY] Forwarding Authorization header for system prompt creation'); const response = await axios.post(`${CONFIGURATION_API_ENDPOINT}/config/system-prompts/create/${safeUrlEncode(agentName)}`, req.body, { headers: getAuthHeaders(req) // ← FIX: Forward auth headers for system prompt creation }); @@ -757,7 +783,7 @@ app.get('/api/config/discover', async (req, res) => { res.json(response.data); } catch (error) { console.error('[PROXY ERROR] Failed to discover services:', safeUrlEncode(error.message)); - console.error('[PROXY ERROR] Error details:', safeUrlEncode(JSON.stringify(error.response?.data) || error.message || 'Unknown error')); + console.error('[PROXY ERROR] Error details:', safeUrlEncode(safeErrorSerialize(error))); handleAuthError(error, res, 'discover services'); } }); @@ -814,7 +840,7 @@ app.post('/api/config/refresh-agent/:agentName', async (req, res) => { } console.log('[PROXY] POST /api/config/refresh-agent/', agentName, '->', `${CONFIGURATION_API_ENDPOINT}/config/refresh-agent/${agentName}`); - console.log('[PROXY] šŸ” Forwarding Authorization header for agent refresh'); + console.log('[PROXY] Forwarding Authorization header for agent refresh'); const response = await axios.post(`${CONFIGURATION_API_ENDPOINT}/config/refresh-agent/${safeUrlEncode(agentName)}`, {}, { headers: getAuthHeaders(req), // ← FIX: Forward auth headers for agent refresh @@ -846,7 +872,7 @@ app.post('/api/config/agent/:agentName/reload', async (req, res) => { } console.log(`[PROXY] POST /api/config/agent/${agentName}/reload`); - console.log('[PROXY] šŸ” Forwarding Authorization header for agent reload'); + console.log('[PROXY] Forwarding Authorization header for agent reload'); // Step 1: Get agent mapping to find agent endpoint (with auth) const mappingResponse = await axios.get(`${CONFIGURATION_API_ENDPOINT}/agent-mapping`, { @@ -945,7 +971,7 @@ app.post('/api/config/agent/:agentName/reload', async (req, res) => { app.post('/api/deployment/create-agent', async (req, res) => { try { console.log('[PROXY] POST /api/deployment/create-agent'); - console.log('[PROXY] šŸ” Forwarding Authorization header for deployment API'); + console.log('[PROXY] Forwarding Authorization header for deployment API'); console.log('[PROXY] Request body:', req.body); // Forward the request directly to the Config API's create-agent endpoint @@ -973,7 +999,7 @@ app.get('/api/deployment/stack-status/:agentName', async (req, res) => { } console.log(`[PROXY] GET /api/deployment/stack-status/${agentName}`); - console.log('[PROXY] šŸ” Forwarding Authorization header for stack status check'); + console.log('[PROXY] Forwarding Authorization header for stack status check'); const response = await axios.get(`${CONFIGURATION_API_ENDPOINT}/api/deployment/stack-status/${safeUrlEncode(agentName)}`, { headers: getAuthHeaders(req), @@ -991,7 +1017,7 @@ app.get('/api/deployment/stack-status/:agentName', async (req, res) => { app.post('/api/deployment/refresh-agent-urls', async (req, res) => { try { console.log(`[PROXY] POST ${SUPERVISOR_AGENT_ENDPOINT}/refresh-agent-urls`); - console.log('[PROXY] šŸ” Forwarding Authorization header to Supervisor Agent for refresh'); + console.log('[PROXY] Forwarding Authorization header to Supervisor Agent for refresh'); const response = await axios.post(`${SUPERVISOR_AGENT_ENDPOINT}/refresh-agent-urls`, {}, { headers: getAuthHeaders(req), // ← FIX: Forward auth headers for refresh @@ -1007,11 +1033,38 @@ app.post('/api/deployment/refresh-agent-urls', async (req, res) => { } }); +// Stack Update proxy route (for updating agent CloudFormation stacks) +app.put('/api/deployment/stack/:agentName', async (req, res) => { + const { agentName } = req.params; + + try { + // Security validation + if (!validateAgentName(agentName)) { + return res.status(400).json({ error: 'Invalid agent name format' }); + } + + console.log(`[PROXY] PUT /api/deployment/stack/${agentName}`); + console.log('[PROXY] Forwarding Authorization header for stack update'); + + const response = await axios.put(`${CONFIGURATION_API_ENDPOINT}/api/deployment/stack/${safeUrlEncode(agentName)}`, req.body, { + headers: getAuthHeaders(req), + timeout: 600000 // 10 minutes timeout for update operations + }); + + console.log(`[PROXY] Stack update successful for ${safeUrlEncode(agentName)}`); + res.json(response.data); + } catch (error) { + console.error('[PROXY ERROR] Failed to update stack:', safeUrlEncode(agentName), safeUrlEncode(error.message || 'Unknown error')); + console.error('[PROXY ERROR] Error details:', safeUrlEncode(JSON.stringify(error.response?.data || {}) || error.message || 'Unknown error')); + handleAuthError(error, res, 'update agent stack'); + } +}); + // STREAMING ONLY Proxy routes for Supervisor Agent (UX Optimized) with OAuth forwarding app.post('/api/agent/chat', async (req, res) => { try { - console.log('[PROXY] 🌊 STREAMING-ONLY: POST /api/agent/chat -> Supervisor Agent Streaming'); - console.log('[PROXY] šŸ” Forwarding Authorization header to Supervisor Agent'); + console.log('[PROXY] STREAMING-ONLY: POST /api/agent/chat -> Supervisor Agent Streaming'); + console.log('[PROXY] Forwarding Authorization header to Supervisor Agent'); const response = await axios.post(`${SUPERVISOR_AGENT_ENDPOINT}/agent-streaming`, req.body, { headers: getAuthHeaders(req), // ← FIX: Forward auth headers @@ -1034,9 +1087,9 @@ app.post('/api/agent/chat', async (req, res) => { // DEPRECATED: Redirect sync calls to streaming for consistency with OAuth forwarding app.post('/api/agent/chat-sync', async (req, res) => { - console.log('[PROXY] 🚨 DEPRECATED: chat-sync called - redirecting to streaming for optimal UX'); - console.log('[PROXY] šŸ’” STREAMING ENFORCED: All UI communication uses streaming'); - console.log('[PROXY] šŸ” Forwarding Authorization header to Supervisor Agent'); + console.log('[PROXY] DEPRECATED: chat-sync called - redirecting to streaming for optimal UX'); + console.log('[PROXY] STREAMING ENFORCED: All UI communication uses streaming'); + console.log('[PROXY] Forwarding Authorization header to Supervisor Agent'); try { // Redirect to streaming endpoint with proper auth headers @@ -1054,7 +1107,7 @@ app.post('/api/agent/chat-sync', async (req, res) => { }); response.data.on('end', () => { - console.log('[PROXY] šŸ’” Streaming->Sync conversion complete'); + console.log('[PROXY] Streaming->Sync conversion complete'); res.json({ response: completeResponse }); resolve(); }); @@ -1083,7 +1136,7 @@ app.delete('/api/config/delete/:agentName', async (req, res) => { } console.log(`[PROXY] DELETE /api/config/delete/${agentName} -> ${CONFIGURATION_API_ENDPOINT}/config/delete/${agentName}`); - console.log('[PROXY] šŸ” Forwarding Authorization header for agent deletion'); + console.log('[PROXY] Forwarding Authorization header for agent deletion'); const response = await axios.delete(`${CONFIGURATION_API_ENDPOINT}/config/delete/${safeUrlEncode(agentName)}`, { headers: getAuthHeaders(req) // ← FIX: Forward auth headers for deletion @@ -1111,7 +1164,7 @@ app.delete('/api/config/delete-complete/:agentName', async (req, res) => { console.log(`[PROXY] DELETE /api/config/delete-complete/${agentName} -> ${CONFIGURATION_API_ENDPOINT}/config/delete-complete/${agentName}`); console.log(`[PROXY] Include infrastructure: ${includeInfrastructure}`); - console.log('[PROXY] šŸ” Forwarding Authorization header for complete agent deletion'); + console.log('[PROXY] Forwarding Authorization header for complete agent deletion'); const response = await axios.delete(`${CONFIGURATION_API_ENDPOINT}/config/delete-complete/${safeUrlEncode(agentName)}`, { params: { @@ -1274,18 +1327,18 @@ app.get('*', (req, res) => { const HOST = '0.0.0.0'; app.listen(PORT, HOST, () => { - console.log(`šŸš€ React UI Backend Server running on ${HOST}:${PORT}`); - console.log(`šŸ“” Configuration API: ${CONFIGURATION_API_ENDPOINT}`); - console.log(`šŸ¤– Supervisor Agent: ${SUPERVISOR_AGENT_ENDPOINT}`); - console.log(`šŸ”§ NODE_ENV: ${process.env.NODE_ENV || 'undefined'}`); + console.log(`React UI Backend Server running on ${HOST}:${PORT}`); + console.log(`Configuration API: ${CONFIGURATION_API_ENDPOINT}`); + console.log(`Supervisor Agent: ${SUPERVISOR_AGENT_ENDPOINT}`); + console.log(`NODE_ENV: ${process.env.NODE_ENV || 'undefined'}`); console.log(` Current directory: ${__dirname}`); const fs = require('fs'); try { const buildExists = fs.existsSync(path.join(__dirname, 'build')); - console.log(`šŸ“¦ Build directory exists: ${buildExists}`); + console.log(`Build directory exists: ${buildExists}`); } catch (e) { - console.log(`šŸ“¦ Error checking build directory: ${e.message}`); + console.log(`Error checking build directory: ${e.message}`); } }); diff --git a/application_src/ui-react-cloudscape/src/components/AgentMapping.js b/application_src/ui-react-cloudscape/src/components/AgentMapping.js index d916440..403e6f5 100644 --- a/application_src/ui-react-cloudscape/src/components/AgentMapping.js +++ b/application_src/ui-react-cloudscape/src/components/AgentMapping.js @@ -46,6 +46,10 @@ const AgentMapping = ({ const [showDeleteConfirmation, setShowDeleteConfirmation] = useState(false); const [selectedSkillDetails, setSelectedSkillDetails] = useState(null); const [showSkillDetails, setShowSkillDetails] = useState(false); + + // Update agent state + const [updatingAgent, setUpdatingAgent] = useState(null); + const [updateSuccess, setUpdateSuccess] = useState(null); useEffect(() => { if (isOpen) { @@ -444,6 +448,43 @@ const AgentMapping = ({ setSelectedSkillDetails(null); }; + // Handle CloudFormation stack update + const handleUpdateAgent = async (agentName) => { + if (agentName === 'supervisor-agent') { + setError('Cannot update supervisor agent stack - it is managed by the main CDK deployment'); + return; + } + + setUpdatingAgent(agentName); + setError(null); + setUpdateSuccess(null); + + try { + const configService = await import('../services/configuration'); + + // Trigger CloudFormation stack update + const result = await configService.default.updateAgentStack(agentName); + + setUpdateSuccess(`Successfully updated stack for agent: ${agentName}`); + + // Refresh agent mapping after successful update + setTimeout(async () => { + await loadAgentMappings(); + setUpdateSuccess(null); + }, 3000); + + } catch (error) { + if (error.message.includes('No updates needed')) { + setUpdateSuccess(`Agent ${agentName} is already up to date`); + setTimeout(() => setUpdateSuccess(null), 3000); + } else { + setError(`Failed to update agent ${agentName}: ${error.message}`); + } + } finally { + setUpdatingAgent(null); + } + }; + return ( )} + {updateSuccess && ( + setUpdateSuccess(null)} + statusIconAriaLabel="Success" + > + {updateSuccess} + + )} + {/* AWS Foundation Visual Context - Network Overview */} {agentMappingData?.summary && ( ( /* AWS Foundation Visual Context - Agent Actions */ - + + + {item.name !== 'supervisor-agent' && ( + + + + Sync infrastructure with latest config + + + )} + + {item.canDelete !== false && (