Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
155 changes: 28 additions & 127 deletions app-template-generator.py
Original file line number Diff line number Diff line change
@@ -1,118 +1,31 @@
#!/usr/bin/env python3
"""
CDK app for generating agent template without deployment.
This app synthesizes the MultiAgentStack as a standalone CloudFormation template,
builds and pushes the Docker image to ECR, and uploads everything to S3.
This app synthesizes the MultiAgentStack as a standalone CloudFormation template
and prepares it for upload to S3 during deployment.

The Docker image build is now handled by TemplateStorageStack during deployment,
following the same pattern as template upload to S3.

This runs as a pre-hook before 'cdk deploy' to ensure the template
is always up-to-date with the stack definition and available in S3.
is always up-to-date with the stack definition.
"""
import os
import json
import boto3
import sys
import shlex
import subprocess
from aws_cdk import App, Fn, Environment, Stack
from aws_cdk import aws_ec2 as ec2, aws_ecs as ecs, aws_s3 as s3, aws_ecr_assets as ecr_assets
import logging
from aws_cdk import App, Fn, Environment
from stacks.multi_agent.stack import MultiAgentStack
from helper.config import Config


def build_and_push_agent_image(app: App, account_id: str, region: str) -> str:
"""
Build and push the agent Docker image to ECR using direct Docker commands.

Args:
app: CDK app instance
account_id: AWS account ID
region: AWS region

Returns:
ECR image URI with stable tag
"""
print("\n🐳 Building and pushing agent Docker image to ECR...")

import hashlib

# Use stable tag based on directory content hash
ecr_repo = f"{account_id}.dkr.ecr.{region}.amazonaws.com/cdk-hnb659fds-container-assets-{account_id}-{region}"
stable_tag = "agent-instance-latest"
image_uri = f"{ecr_repo}:{stable_tag}"

# Check if Docker/Podman is available - use safe validated default
docker_executable = os.environ.get('CDK_DOCKER', 'docker')

# SECURITY: Validate docker executable to prevent command injection
allowed_executables = ['docker', 'podman']
if docker_executable not in allowed_executables:
raise ValueError(f"Invalid container runtime specified: {docker_executable}. Allowed: {allowed_executables}")

try:
# 1. Login to ECR - separate password retrieval and login for security
print(f" Logging in to ECR...")

# Get ECR password securely - validate region parameter
if not region or not region.replace('-', '').replace('_', '').isalnum():
raise ValueError(f"Invalid AWS region: {region}")

password_result = subprocess.run(
['aws', 'ecr', 'get-login-password', '--region', region],
check=True,
capture_output=True,
text=True
)

# Login to ECR with password from stdin - validate parameters
if not account_id or not account_id.isdigit() or len(account_id) != 12:
raise ValueError(f"Invalid AWS account ID: {account_id}")

ecr_endpoint = f"{account_id}.dkr.ecr.{region}.amazonaws.com"

subprocess.run(
[docker_executable, 'login', '--username', 'AWS', '--password-stdin', ecr_endpoint],
input=password_result.stdout,
check=True,
capture_output=True,
text=True
)

# 2. Build image - validate parameters
print(f" Building Docker image...")
if not image_uri or '://' in image_uri.split(':')[0]: # Basic URI validation
raise ValueError(f"Invalid image URI: {image_uri}")

subprocess.run(
[docker_executable, 'build',
'-t', image_uri,
'--platform', 'linux/arm64',
'-f', 'application_src/multi-agent/agent-instance/Dockerfile',
'application_src'],
check=True,
capture_output=True
)

# 3. Push to ECR
print(f" Pushing image to ECR...")
subprocess.run(
[docker_executable, 'push', image_uri],
check=True,
capture_output=True
)

print(f"✓ Docker image built and pushed to ECR: {image_uri}")
return image_uri

except subprocess.CalledProcessError as e:
print(f"⚠️ Warning: Failed to build/push Docker image: {e}")
print(f" Using placeholder - manual push required")
return image_uri


logger = logging.getLogger(__name__)


def main():
"""Generate the template and save to file."""
logger.info("Generating agent template...")

# Get actual AWS account and region
try:
sts = boto3.client('sts')
Expand All @@ -125,18 +38,14 @@ def main():
# Create app
app = App()

# Step 1: Build and push Docker image to ECR BEFORE creating the stack
# This ensures the image exists in ECR before the CloudFormation template references it
agent_image_uri = build_and_push_agent_image(app, account_id, region)

# Get configuration
environment = app.node.try_get_context("environment") or os.environ.get("ENVIRONMENT", "development")
conf = Config(environment=environment)

# Get project name from config (should match app.py)
project_name_from_config = conf.get('ProjectName')

print(f"\n📦 Using CloudFormation exports from VPC stack...")
logger.info("Using CloudFormation exports from VPC stack...")

# Import VPC ID using CloudFormation export token
# The MultiAgentStack will handle importing the VPC and all related resources
Expand All @@ -151,10 +60,10 @@ def main():
# Import VPC Lattice service network ARN using CloudFormation export
service_network_arn = Fn.import_value(f"{project_name_from_config}-vpc:ExportsOutputFnGetAttservicenetworkArnD9BDB9C7")

print(f"✓ All infrastructure references use CloudFormation imports")
print(f" - VPC ID: {{Fn::ImportValue: {project_name_from_config}-VpcId}}")
print(f" - Cluster: {{Fn::ImportValue: {project_name_from_config}-ClusterName}}")
print(f" - Bucket: {{Fn::ImportValue: {project_name_from_config}-AccessLogBucketName}}")
logger.info("All infrastructure references use CloudFormation imports")
logger.debug(f" - VPC ID: {{Fn::ImportValue: {project_name_from_config}-VpcId}}")
logger.debug(f" - Cluster: {{Fn::ImportValue: {project_name_from_config}-ClusterName}}")
logger.debug(f" - Bucket: {{Fn::ImportValue: {project_name_from_config}-AccessLogBucketName}}")

# Create the agent stack - MultiAgentStack will import all resources internally
# Pass tokens directly - no boto3 queries needed!
Expand Down Expand Up @@ -183,25 +92,17 @@ def main():
with open(template_path, 'r', encoding='utf-8') as f:
template_content = json.load(f)

# Replace Docker image reference with stable tag
task_def = template_content['Resources']['agenttaskdefinitionF56FAA50']
container = task_def['Properties']['ContainerDefinitions'][0]

# Update image to use stable tag
container['Image'] = {
'Fn::Sub': f"{account_id}.dkr.ecr.{region}.${{AWS::URLSuffix}}/cdk-hnb659fds-container-assets-{account_id}-{region}:agent-instance-latest"
}

print(f"✓ Updated template to use stable Docker image tag: agent-instance-latest")
# Log that template uses ImageTag parameter (will be updated by TemplateStorageStack)
logger.info("Template uses ImageTag parameter (default will be updated during deployment)")

# Save to standard location for easy access
output_path = "cdk.out/GenericAgentTemplate.json"
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(template_content, f, indent=2)

print(f"Template generated: {output_path}")
print(f" Stack name: {agent_stack.stack_name}")
print(f" Template size: {len(json.dumps(template_content))} bytes")
logger.info(f"Template generated: {output_path}")
logger.debug(f" Stack name: {agent_stack.stack_name}")
logger.debug(f" Template size: {len(json.dumps(template_content))} bytes")

# List resources for reference
if 'Resources' in template_content:
Expand All @@ -211,18 +112,18 @@ def main():
rtype = resource.get('Type', 'Unknown')
resource_types[rtype] = resource_types.get(rtype, 0) + 1

print(f" Total resources: {resource_count}")
print(f" Resource breakdown:")
logger.debug(f" Total resources: {resource_count}")
logger.debug(" Resource breakdown:")
for rtype, count in sorted(resource_types.items()):
print(f" - {rtype}: {count}")
logger.debug(f" - {rtype}: {count}")

# Note: Template will be automatically uploaded to S3 by TemplateStorageStack
# using CDK's BucketDeployment construct during 'cdk deploy'
print(f"\n📤 Template will be uploaded to S3 by TemplateStorageStack during deployment")
logger.info("Template and Docker image will be deployed by TemplateStorageStack")
logger.debug(" - Template: Uploaded to S3 during deployment")
logger.debug(" - Docker Image: Built and pushed to ECR during deployment")

return True
else:
print(f"Error: Template not found at {template_path}")
logger.error(f"Template not found at {template_path}")
return False


Expand Down
71 changes: 68 additions & 3 deletions application_src/common/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,18 @@
logger = get_logger(__name__)

# CRITICAL: Initialize observability BEFORE importing Strands Agent
# This ensures Langfuse environment variables are set before Strands Agent initialization
# This ensures environment variables are set before Strands Agent initialization
log_info(logger, "🔧 Initializing observability before Strands Agent import")
from config import Config
from observability import ObservabilityFactory

# Note: Observability will be initialized per-agent, not at module level
log_warning(logger, "⚠️ Observability will be initialized per-agent - Strands Agent will run without global tracing")

# NOW import Strands Agent (after environment variables are set)
# NOW import Strands Agent and tracer (after environment variables are set)
from strands import Agent, tool
from strands.models import BedrockModel
from strands.telemetry.tracer import get_tracer
from strands_tools import http_request

# Import system prompt
Expand Down Expand Up @@ -401,6 +402,30 @@ def create_agent(prompt, user_id=None, agent_name="qa_agent"):
else:
print("No knowledge base tools available")

# Configure Strands SDK observability (get_tracer + logging)
try:
observability_provider = ObservabilityFactory.get_current_provider()
if observability_provider:
service_name = observability_provider.trace_attributes.get("service.name", agent_name)
environment = observability_provider.trace_attributes.get("deployment.environment", "production")

print(f"🔍 Configuring Strands SDK observability for {observability_provider.provider_name}...")

# Configure Strands tracer with provider-specific settings
if hasattr(observability_provider, 'get_strands_tracer_config'):
tracer_config = observability_provider.get_strands_tracer_config(service_name, environment)
if tracer_config:
print(f"📡 Configuring Strands get_tracer with {observability_provider.provider_name} settings...")
tracer = get_tracer(**tracer_config)
print(f"✅ Strands tracer configured for {observability_provider.provider_name}")

# Configure Strands logging with provider-specific settings
if hasattr(observability_provider, 'configure_strands_logging'):
observability_provider.configure_strands_logging(service_name, environment)

except Exception as obs_error:
print(f"⚠️ Error configuring Strands SDK observability: {obs_error}")

# Get trace attributes for observability
trace_attributes = get_trace_attributes(agent_name)

Expand Down Expand Up @@ -453,7 +478,26 @@ def run_agent(prompt: str, user_id=None, agent_name="qa_agent"):
return "Error: Failed to create agent"

# Run the agent
response = agent(prompt) # Call the agent directly instead of using .run()
agent_result = agent(prompt) # Call the agent directly and get AgentResult

# Extract response from AgentResult
response = str(agent_result) # AgentResult can be converted to string for response

# Send Strands metrics to observability provider if configured
try:
observability_provider = ObservabilityFactory.get_current_provider()
if observability_provider and hasattr(observability_provider, 'process_strands_metrics'):
service_name = observability_provider.trace_attributes.get("service.name", agent_name)
environment = observability_provider.trace_attributes.get("deployment.environment", "production")

print(f"📊 Forwarding Strands metrics to {observability_provider.provider_name}...")
observability_provider.process_strands_metrics(
agent_result, service_name, environment
)
else:
print("ℹ️ No observability provider configured for metrics")
except Exception as metrics_error:
print(f"⚠️ Failed to process Strands metrics: {metrics_error}")

# Get memory configuration from the specified agent
from config import Config
Expand Down Expand Up @@ -640,6 +684,27 @@ async def run_agent_and_stream_response(prompt: str, user_id=None, agent_name="q

print(f"✅ Streaming completed. Total response length: {len(response)}")

# Send Strands metrics to observability provider if configured (after streaming completes)
try:
observability_provider = ObservabilityFactory.get_current_provider()
if observability_provider and hasattr(observability_provider, 'process_strands_metrics'):
# Get the final AgentResult from the completed streaming
final_result = agent.last_result if hasattr(agent, 'last_result') else None
if final_result:
service_name = observability_provider.trace_attributes.get("service.name", agent_name)
environment = observability_provider.trace_attributes.get("deployment.environment", "production")

print(f"📊 Forwarding Strands streaming metrics to {observability_provider.provider_name}...")
observability_provider.process_strands_metrics(
final_result, service_name, environment
)
else:
print("ℹ️ No AgentResult available for metrics processing")
else:
print("ℹ️ No observability provider configured for metrics")
except Exception as metrics_error:
print(f"⚠️ Failed to process Strands streaming metrics: {metrics_error}")

# Store the response in memory if memory is enabled
if memory_enabled and mem0_module and hasattr(mem0_module, 'mem0_memory'):
try:
Expand Down
Loading
Loading