-
Notifications
You must be signed in to change notification settings - Fork 0
Extending TMI
This guide explains how to extend TMI through addons, webhooks, and custom integrations.
TMI can be extended in several ways:
- Addons - Server-side extensions that users can invoke
- Webhooks - Receive notifications for TMI events
- Custom Integrations - Build tools that interact with TMI's API
- Client Libraries - Create language-specific SDKs
Addons are webhook-based services that:
- Receive invocation requests from TMI users
- Process them asynchronously
- Report status back via callbacks
- Appear in the TMI UI as actions users can trigger
- STRIDE Analysis - Automated threat categorization
- Compliance Checking - Verify against security frameworks
- Report Generation - Create PDFs or documents
- Integration - Sync with external tools (Jira, ServiceNow)
- AI Analysis - LLM-powered threat identification
First, create a webhook subscription:
POST /webhooks
Authorization: Bearer {jwt}
Content-Type: application/json
{
"name": "My Addon Service",
"url": "https://my-service.example.com/webhooks/tmi",
"events": [],
"secret": "your-hmac-secret-128-chars-minimum"
}Save the webhook_id from response.
Have a TMI administrator register your addon:
POST /addons
Authorization: Bearer {admin_jwt}
Content-Type: application/json
{
"name": "STRIDE Analyzer",
"webhook_id": "{webhook_id_from_step_1}",
"description": "Automated STRIDE threat analysis",
"icon": "material-symbols:security",
"objects": ["threat_model", "asset"]
}Create an HTTPS endpoint that:
- Receives POST requests from TMI
- Verifies HMAC signature
- Processes asynchronously
- Calls back to update status
Your webhook receives:
POST /webhooks/tmi
Content-Type: application/json
X-Webhook-Event: addon_invocation
X-Invocation-Id: 550e8400-e29b-41d4-a716-446655440000
X-Addon-Id: 123e4567-e89b-12d3-a456-426614174000
X-Webhook-Signature: sha256=abc123...
User-Agent: TMI-Addon-Worker/1.0
{
"event_type": "addon_invocation",
"invocation_id": "550e8400-e29b-41d4-a716-446655440000",
"addon_id": "123e4567-e89b-12d3-a456-426614174000",
"threat_model_id": "789e0123-e45b-67c8-d901-234567890abc",
"object_type": "asset",
"object_id": "def01234-5678-90ab-cdef-1234567890ab",
"timestamp": "2025-11-08T12:00:00Z",
"payload": {
"user_param_1": "value1",
"user_param_2": "value2"
},
"callback_url": "https://tmi.example.com/invocations/550e8400.../status"
}CRITICAL: Always verify the signature:
import hmac
import hashlib
def verify_signature(payload_bytes, signature_header, secret):
expected = hmac.new(
secret.encode('utf-8'),
payload_bytes,
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
return hmac.compare_digest(signature_header, expected_sig)
# In handler
payload_bytes = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET):
return 'Invalid signature', 401// Node.js
const crypto = require('crypto');
function verifySignature(payloadBody, signatureHeader, secret) {
const hmac = crypto.createHmac('sha256', secret);
hmac.update(payloadBody);
const expectedSig = `sha256=${hmac.digest('hex')}`;
return crypto.timingSafeEqual(
Buffer.from(signatureHeader),
Buffer.from(expectedSig)
);
}Return 200 OK immediately:
@app.route('/webhooks/tmi', methods=['POST'])
def handle_invocation():
# Verify signature
if not verify_signature(...):
return 'Invalid signature', 401
payload = request.json
invocation_id = payload['invocation_id']
# Queue for async processing
task_queue.enqueue(process_invocation, payload)
# Respond immediately (TMI marks as in_progress)
return '', 200Call back to TMI:
import requests
import json
def update_status(invocation_id, status, percent, message):
callback_url = f"https://tmi.example.com/invocations/{invocation_id}/status"
payload = json.dumps({
"status": status,
"status_percent": percent,
"status_message": message
})
signature = generate_signature(payload.encode(), WEBHOOK_SECRET)
response = requests.post(
callback_url,
data=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Signature': signature
}
)
return response.status_code == 200
# During processing
def process_invocation(payload):
invocation_id = payload['invocation_id']
# Started
update_status(invocation_id, "in_progress", 10, "Starting analysis...")
# Do work
analyze_threats(payload)
# Halfway
update_status(invocation_id, "in_progress", 50, "Analyzing assets...")
# More work
generate_report(payload)
# Completed
update_status(invocation_id, "completed", 100, "Analysis complete")def process_invocation(payload):
invocation_id = payload['invocation_id']
try:
analyze_threats(payload)
update_status(invocation_id, "completed", 100, "Success")
except ValidationError as e:
update_status(invocation_id, "failed", 0, f"Validation error: {e}")
except Exception as e:
logger.exception("Processing failed")
update_status(invocation_id, "failed", 0, f"Internal error: {e}")Endpoint: POST /invocations/{invocation_id}/status
Authentication: HMAC signature (no JWT required)
Request:
{
"status": "in_progress",
"status_percent": 75,
"status_message": "Processing..."
}Valid Statuses:
-
in_progress(0-99%) -
completed(100%) -
failed(0%)
Status Transitions:
pending → in_progress → completed
→ failed
Invalid transitions (return 409):
completed → in_progressfailed → in_progress
from flask import Flask, request
import hmac
import hashlib
import json
import requests
app = Flask(__name__)
WEBHOOK_SECRET = "your-128-char-secret"
TMI_BASE_URL = "https://tmi.example.com"
def verify_signature(payload_bytes, signature, secret):
expected = hmac.new(
secret.encode(),
payload_bytes,
hashlib.sha256
).hexdigest()
return hmac.compare_digest(f"sha256={expected}", signature)
def generate_signature(payload_bytes, secret):
mac = hmac.new(secret.encode(), payload_bytes, hashlib.sha256)
return f"sha256={mac.hexdigest()}"
def update_status(invocation_id, status, percent, message=""):
payload = json.dumps({
"status": status,
"status_percent": percent,
"status_message": message
})
signature = generate_signature(payload.encode(), WEBHOOK_SECRET)
requests.post(
f"{TMI_BASE_URL}/invocations/{invocation_id}/status",
data=payload,
headers={
'Content-Type': 'application/json',
'X-Webhook-Signature': signature
}
)
@app.route('/webhooks/tmi', methods=['POST'])
def handle_invocation():
# Verify signature
payload_bytes = request.get_data()
signature = request.headers.get('X-Webhook-Signature')
if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET):
return 'Unauthorized', 401
# Parse payload
data = request.json
invocation_id = data['invocation_id']
# Queue for async processing
task_queue.enqueue(process_invocation, data)
# Respond immediately
return '', 200
def process_invocation(data):
invocation_id = data['invocation_id']
user_payload = data['payload']
try:
# Start
update_status(invocation_id, "in_progress", 10, "Starting analysis")
# Process
result = analyze_threats(user_payload)
# Progress
update_status(invocation_id, "in_progress", 75, "Generating report")
# Finish
update_status(invocation_id, "completed", 100, "Analysis complete")
except Exception as e:
update_status(invocation_id, "failed", 0, f"Error: {e}")
if __name__ == '__main__':
app.run(port=8000)Handle duplicate invocations:
cache = {}
def process_invocation(payload):
invocation_id = payload['invocation_id']
# Check if already processed
if invocation_id in cache:
logger.info(f"Duplicate invocation: {invocation_id}")
return cache[invocation_id]
# Process
result = do_work(payload)
# Cache result
cache[invocation_id] = result
return resultUpdate regularly for long operations:
def long_operation(invocation_id):
update_status(invocation_id, "in_progress", 0, "Starting...")
for i, step in enumerate(steps):
process_step(step)
percent = int((i + 1) / len(steps) * 100)
update_status(invocation_id, "in_progress", percent,
f"Step {i+1}/{len(steps)}")
update_status(invocation_id, "completed", 100, "Done")Provide useful error messages:
try:
validate_payload(payload)
except ValidationError as e:
update_status(invocation_id, "failed", 0,
f"Invalid input: {e}. Please check parameters.")Set reasonable timeouts:
from timeout_decorator import timeout
@timeout(300) # 5 minute timeout
def process_invocation(payload):
try:
result = do_work(payload)
update_status(invocation_id, "completed", 100, "Success")
except TimeoutError:
update_status(invocation_id, "failed", 0,
"Processing timeout after 5 minutes")- Always verify HMAC signatures
- Use HTTPS for all callbacks
- Don't log secrets
- Validate all input
- Use constant-time comparison for signatures
- Use ngrok to expose local server:
ngrok http 8000- Register webhook with ngrok URL:
POST /webhooks
{
"url": "https://abc123.ngrok.io/webhooks/tmi",
...
}- Invoke addon and check logs
import hmac
import hashlib
import json
payload = json.dumps({"status": "completed", "status_percent": 100})
secret = "your-webhook-secret"
mac = hmac.new(secret.encode(), payload.encode(), hashlib.sha256)
signature = f"sha256={mac.hexdigest()}"
print(f"X-Webhook-Signature: {signature}")curl -X POST https://tmi.example.com/invocations/{id}/status \
-H "Content-Type: application/json" \
-H "X-Webhook-Signature: sha256=..." \
-d '{
"status": "completed",
"status_percent": 100,
"status_message": "Test completed"
}'Subscribe to TMI events:
Available Events:
threat_model.createdthreat_model.updatedthreat_model.deleteddiagram.createddiagram.updatedthreat.createdthreat.updated
Register Webhook:
POST /webhooks
Authorization: Bearer {jwt}
Content-Type: application/json
{
"name": "My Integration",
"url": "https://my-service.example.com/webhooks",
"events": ["threat_model.created", "threat.created"],
"secret": "your-hmac-secret"
}Receive Event:
POST /webhooks
Content-Type: application/json
X-Webhook-Event: threat_model.created
X-Webhook-Signature: sha256=...
{
"event": "threat_model.created",
"timestamp": "2025-11-08T12:00:00Z",
"data": {
"id": "uuid",
"name": "New Threat Model",
"owner": "[email protected]"
}
}Always verify HMAC signature before processing.
Sync threats with Jira/GitHub Issues:
class JiraIntegration:
def sync_threat_to_jira(self, threat):
# Create Jira issue
issue = self.jira_client.create_issue({
'project': {'key': 'SEC'},
'summary': threat['name'],
'description': threat['description'],
'issuetype': {'name': 'Security Threat'},
'labels': [threat['stride']]
})
# Store issue key in TMI metadata
self.tmi_client.create_metadata(
threat['threat_model_id'],
threat['id'],
'jira_issue',
issue.key
)
return issue.keyValidate threat models in CI pipeline:
#!/bin/bash
# validate-threat-model.sh
TM_ID=$1
API_URL="https://api.tmi.example.com"
# Get threat model
response=$(curl -s -H "Authorization: Bearer $TOKEN" \
"$API_URL/threat_models/$TM_ID")
# Check threat count
threat_count=$(echo $response | jq '.threat_count')
if [ "$threat_count" -lt 5 ]; then
echo "ERROR: Insufficient threats identified (found: $threat_count, required: 5)"
exit 1
fi
# Check for high severity unmitigated threats
unmitigated=$(curl -s -H "Authorization: Bearer $TOKEN" \
"$API_URL/threat_models/$TM_ID/threats" | \
jq '[.[] | select(.severity == "high" and .status == "open")] | length')
if [ "$unmitigated" -gt 0 ]; then
echo "ERROR: $unmitigated high severity threats not mitigated"
exit 1
fi
echo "SUCCESS: Threat model validation passed"Generate custom reports:
from jinja2 import Template
from weasyprint import HTML
class ReportGenerator:
def generate_pdf_report(self, threat_model_id):
# Fetch data from TMI
tm = self.tmi_client.get_threat_model(threat_model_id)
threats = self.tmi_client.get_threats(threat_model_id)
diagrams = self.tmi_client.get_diagrams(threat_model_id)
# Render HTML template
template = Template(open('report_template.html').read())
html_content = template.render(
threat_model=tm,
threats=threats,
diagrams=diagrams
)
# Convert to PDF
HTML(string=html_content).write_pdf(
f'threat_model_{threat_model_id}.pdf'
)Create language-specific SDKs:
# tmi_client/__init__.py
import requests
class TMIClient:
def __init__(self, api_url, token):
self.api_url = api_url
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {token}',
'Content-Type': 'application/json'
})
def get_threat_models(self):
response = self.session.get(f'{self.api_url}/threat_models')
response.raise_for_status()
return response.json()
def create_threat_model(self, name, description):
response = self.session.post(
f'{self.api_url}/threat_models',
json={'name': name, 'description': description}
)
response.raise_for_status()
return response.json()
# Usage
client = TMIClient('https://api.tmi.dev', 'your-jwt-token')
threat_models = client.get_threat_models()// tmi-client.js
class TMIClient {
constructor(apiUrl, token) {
this.apiUrl = apiUrl;
this.token = token;
}
async getThreatModels() {
const response = await fetch(`${this.apiUrl}/threat_models`, {
headers: { Authorization: `Bearer ${this.token}` }
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
async createThreatModel(name, description) {
const response = await fetch(`${this.apiUrl}/threat_models`, {
method: 'POST',
headers: {
'Authorization': `Bearer ${this.token}`,
'Content-Type': 'application/json'
},
body: JSON.stringify({ name, description })
});
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
}
}
// Usage
const client = new TMIClient('https://api.tmi.dev', 'your-jwt-token');
const threatModels = await client.getThreatModels();-
Package:
- Python:
pip install tmi-client - JavaScript:
npm install @tmi/client - Go:
go get github.com/ericfitz/tmi-clients
- Python:
-
Document: Include examples and API reference
-
Test: Comprehensive test coverage
-
Versioning: Semantic versioning (semver)
See https://github.com/ericfitz/tmi-clients for:
- Python client library
- JavaScript/TypeScript client
- Go client
- Example integrations
- API Integration Guide - REST and WebSocket APIs
- Architecture - System design
- Developer docs in
/docs/developer/of repositories
- GitHub Issues: https://github.com/ericfitz/tmi/issues
- API Reference: OpenAPI spec in
/docs/reference/apis/
- API Integration - Learn the APIs
- Architecture and Design - Understand the system
- Getting Started - Set up development environment
- Using TMI for Threat Modeling
- Accessing TMI
- Creating Your First Threat Model
- Understanding the User Interface
- Working with Data Flow Diagrams
- Managing Threats
- Collaborative Threat Modeling
- Using Notes and Documentation
- Metadata and Extensions
- Planning Your Deployment
- Deploying TMI Server
- Deploying TMI Web Application
- Setting Up Authentication
- Database Setup
- Component Integration
- Post-Deployment
- Monitoring and Health
- Database Operations
- Security Operations
- Performance and Scaling
- Maintenance Tasks