An AI-powered chatbot that helps security analysts with Common CNA tasks including CWE assignment, CVSS scoring, and vulnerability analysis using Amazon Bedrock.
This application leverages Amazon Bedrock with Knowledge Bases and Agents to provide intelligent assistance for security vulnerability analysis. Given a vulnerability description, the assistant can recommend appropriate CWE classifications with supporting reasoning based on FIRST guidance and CWE documentation.
The solution uses a serverless architecture built with AWS CDK. The diagram below shows how a user's question flows through the system to generate an AI-powered response.
| Color | Component | Description |
|---|---|---|
| π’ Green (User) | Security Analyst | You! The person asking questions about vulnerabilities |
| π΅ Blue | Web Interface | The website you interact with (Load Balancer + Streamlit App) |
| π Orange | Processing | AWS Lambda functions that coordinate requests |
| π£ Purple | AI Engine | Amazon Bedrock - the "brain" that understands and answers questions |
| π’ Green (Data) | Data Storage | CWE documents and vector search index |
How it works: You ask a question β the web app sends it to the AI engine β the AI searches the knowledge base and generates a response β you get an expert-level CWE recommendation with reasoning.
- Python 3.11+
- Docker
- AWS CDK 2.114.1+
- AWS account with Bedrock access (Claude and Titan models enabled)
-
Create and activate virtual environment:
python3 -m venv .venv source .venv/bin/activate -
Install dependencies:
pip install -r requirements.txt
-
Configure environment (create
code/streamlit-app/.env):ACCOUNT_ID=<your-account-id> AWS_REGION=<your-region> LAMBDA_FUNCTION_NAME=invokeAgentLambda
-
Deploy:
cdk bootstrap # first time only cdk deploy
Initial deployment takes 30-45 minutes. Access the chatbot via the URL in CloudFormation outputs.
cdk destroyNote: Manually delete any S3 buckets created by the stack.
security-advisory-assistant/
βββ code/
β βββ code_stack.py # Main CDK stack definition
β βββ lambdas/
β β βββ action-lambda/ # Bedrock Agent action handler
β β βββ create-index-lambda/ # OpenSearch index creation
β β βββ invoke-lambda/ # Agent invocation endpoint
β β βββ update-lambda/ # Post-deployment updates
β βββ layers/ # Lambda layers (boto3, opensearch)
β βββ security/ # Security middleware and config
β βββ streamlit-app/ # Web UI application
βββ assets/
β βββ agent_api_schema/ # Bedrock Agent API definitions
β βββ data_query_data_source/ # Structured data for Athena
β βββ diagrams/ # Architecture diagrams
β βββ knowledgebase_data_source/ # CWE knowledge base documents
βββ configs/ # Configuration files
βββ docs/ # Extended documentation
βββ tests/ # Unit tests
- API Reference - Detailed API documentation for all components
- Architecture Diagrams - Visual diagrams of system architecture, data flow, and deployment
- Original Blog Post - Complete project history, examples, and detailed walkthrough
- Support
- Contributing
- Changelog
Configuration is managed via cdk.json under the context.config key:
{
"config": {
"logging": {
"lambda_log_level": "INFO",
"streamlit_log_level": "INFO"
},
"paths": {
"assets_folder_name": "assets",
"lambdas_source_folder": "code/lambdas",
"layers_source_folder": "code/layers",
"athena_data_destination_prefix": "data_query_data_source",
"athena_table_data_prefix": "ec2_pricing",
"knowledgebase_destination_prefix": "knowledgebase_data_source",
"knowledgebase_file_name": "cna_wisdom.zip",
"agent_schema_destination_prefix": "agent_api_schema",
"fewshot_examples_path": "dynamic_examples.csv"
},
"names": {
"bedrock_agent_name": "chatbotBedrockAgent-${timestamp}",
"bedrock_agent_alias": "bedrockAgent",
"streamlit_lambda_function_name": "invokeAgentLambda"
},
"models": {
"bedrock_agent_foundation_model": "anthropic.claude-3-haiku-20240307-v1:0"
},
"bedrock_instructions": {
"agent_instruction": "...",
"knowledgebase_instruction": "...",
"action_group_description": "..."
}
}
}- KMS encryption for S3 buckets and CloudWatch logs
- Input validation and sanitization with character whitelisting
- SQL injection prevention with keyword whitelisting
- Rate limiting on agent invocations (configurable, default 60/min)
- Secure session management with cryptographic tokens
- Audit logging for compliance
- HTTP security headers (X-Frame-Options, CSP, HSTS, etc.)
- Configurable security group IP restrictions:
# Restrict to current IP
cdk deploy --parameters SourceIpAddress=$(curl -s https://checkip.amazonaws.com)/32
# Allow all (default)
cdk deployimport boto3
import json
lambda_client = boto3.client('lambda')
response = lambda_client.invoke(
FunctionName='sec-advis-asst-invokeAgentLambda-<account>-<region>',
InvocationType='RequestResponse',
Payload=json.dumps({
'body': {
'query': 'What CWE applies to a buffer overflow vulnerability?',
'session_id': 'unique-session-id'
}
})
)
result = json.loads(response['Payload'].read())
print(result['answer'])
print(result['source'])from code.security.middleware import validate_input, error_handler, rate_limit
from code.security.security_config import safe_log
@error_handler
@rate_limit(max_calls=10, time_window=60)
@validate_input
def process_query(user_input: str, session_id: str) -> dict:
safe_log(f"Processing query for session: {session_id}")
# Your logic here
return {"result": "success"}- Place documents in
assets/knowledgebase_data_source/ - Update
cdk.jsonβpaths.knowledgebase_file_name - Update
bedrock_instructions.knowledgebase_instruction
- Add CSV/JSON/Parquet to
assets/data_query_data_source/<subfolder>/ - Update
cdk.jsonβpaths.athena_table_data_prefix - Update
code/lambdas/action-lambda/prompt_templates.py - Add examples to
code/lambdas/action-lambda/dynamic_examples.csv
MIT-0 License. See LICENSE.
