# Extending TMI This guide explains how to extend TMI through addons, webhooks, and custom integrations. ## Table of Contents - [Overview](#overview) - [Addon Development](#addon-development) - [Webhook Integration](#webhook-integration) - [Custom Integrations](#custom-integrations) - [Client Libraries](#client-libraries) ## Overview TMI can be extended in several ways: 1. **Addons** - Server-side extensions that users can invoke 2. **Webhooks** - Receive notifications for TMI events 3. **Custom Integrations** - Build tools that interact with TMI's API 4. **Client Libraries** - Create language-specific SDKs ## Addon Development ### What are Addons? Addons are webhook-based services that: - Receive invocation requests from TMI users - Process them asynchronously - Report status back via callbacks - Appear in the TMI UI as actions users can trigger ### Addon Use Cases - **STRIDE Analysis** - Automated threat categorization - **Compliance Checking** - Verify against security frameworks - **Report Generation** - Create PDFs or documents - **Integration** - Sync with external tools (Jira, ServiceNow) - **AI Analysis** - LLM-powered threat identification ### Quick Start #### 1. Register Webhook First, create a webhook subscription: ```bash POST /webhooks Authorization: Bearer {jwt} Content-Type: application/json { "name": "My Addon Service", "url": "https://my-service.example.com/webhooks/tmi", "events": [], "secret": "your-hmac-secret-128-chars-minimum" } ``` Save the `webhook_id` from response. #### 2. Register Addon Have a TMI administrator register your addon: ```bash POST /addons Authorization: Bearer {admin_jwt} Content-Type: application/json { "name": "STRIDE Analyzer", "webhook_id": "{webhook_id_from_step_1}", "description": "Automated STRIDE threat analysis", "icon": "material-symbols:security", "objects": ["threat_model", "asset"] } ``` #### 3. Implement Webhook Endpoint Create an HTTPS endpoint that: 1. Receives POST requests from TMI 2. Verifies HMAC signature 3. Processes asynchronously 4. Calls back to update status ### Webhook Invocation Flow #### Step 1: Receive Invocation Your webhook receives: ```http POST /webhooks/tmi Content-Type: application/json X-Webhook-Event: addon.invoked X-Invocation-Id: 550e8400-e29b-41d4-a716-446655440000 X-Addon-Id: 123e4567-e89b-12d3-a456-426614174000 X-Webhook-Signature: sha256=abc123... User-Agent: TMI-Addon-Worker/1.0 { "event_type": "addon.invoked", "invocation_id": "550e8400-e29b-41d4-a716-446655440000", "addon_id": "123e4567-e89b-12d3-a456-426614174000", "threat_model_id": "789e0123-e45b-67c8-d901-234567890abc", "object_type": "asset", "object_id": "def01234-5678-90ab-cdef-1234567890ab", "timestamp": "2025-11-08T12:00:00Z", "payload": { "user_param_1": "value1", "user_param_2": "value2" }, "callback_url": "https://tmi.example.com/invocations/550e8400.../status" } ``` #### Step 2: Verify HMAC Signature **CRITICAL**: Always verify the signature: ```python import hmac import hashlib def verify_signature(payload_bytes, signature_header, secret): expected = hmac.new( secret.encode('utf-8'), payload_bytes, hashlib.sha256 ).hexdigest() expected_sig = f"sha256={expected}" return hmac.compare_digest(signature_header, expected_sig) # In handler payload_bytes = request.get_data() signature = request.headers.get('X-Webhook-Signature') if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET): return 'Invalid signature', 401 ``` ```javascript // Node.js const crypto = require('crypto'); function verifySignature(payloadBody, signatureHeader, secret) { const hmac = crypto.createHmac('sha256', secret); hmac.update(payloadBody); const expectedSig = `sha256=${hmac.digest('hex')}`; return crypto.timingSafeEqual( Buffer.from(signatureHeader), Buffer.from(expectedSig) ); } ``` #### Step 3: Respond Quickly Return `200 OK` immediately: ```python @app.route('/webhooks/tmi', methods=['POST']) def handle_invocation(): # Verify signature if not verify_signature(...): return 'Invalid signature', 401 payload = request.json invocation_id = payload['invocation_id'] # Queue for async processing task_queue.enqueue(process_invocation, payload) # Respond immediately (TMI marks as in_progress) return '', 200 ``` #### Step 4: Update Status During Processing Call back to TMI: ```python import requests import json def update_status(invocation_id, status, percent, message): callback_url = f"https://tmi.example.com/invocations/{invocation_id}/status" payload = json.dumps({ "status": status, "status_percent": percent, "status_message": message }) signature = generate_signature(payload.encode(), WEBHOOK_SECRET) response = requests.post( callback_url, data=payload, headers={ 'Content-Type': 'application/json', 'X-Webhook-Signature': signature } ) return response.status_code == 200 # During processing def process_invocation(payload): invocation_id = payload['invocation_id'] # Started update_status(invocation_id, "in_progress", 10, "Starting analysis...") # Do work analyze_threats(payload) # Halfway update_status(invocation_id, "in_progress", 50, "Analyzing assets...") # More work generate_report(payload) # Completed update_status(invocation_id, "completed", 100, "Analysis complete") ``` #### Step 5: Handle Failures ```python def process_invocation(payload): invocation_id = payload['invocation_id'] try: analyze_threats(payload) update_status(invocation_id, "completed", 100, "Success") except ValidationError as e: update_status(invocation_id, "failed", 0, f"Validation error: {e}") except Exception as e: logger.exception("Processing failed") update_status(invocation_id, "failed", 0, f"Internal error: {e}") ``` ### Status Update API **Endpoint**: `POST /invocations/{invocation_id}/status` **Authentication**: HMAC signature (no JWT required) **Request**: ```json { "status": "in_progress", "status_percent": 75, "status_message": "Processing..." } ``` **Valid Statuses**: - `in_progress` (0-99%) - `completed` (100%) - `failed` (0%) **Status Transitions**: ``` pending → in_progress → completed → failed ``` **Invalid transitions** (return 409): - `completed → in_progress` - `failed → in_progress` ### Complete Example (Python Flask) ```python from flask import Flask, request import hmac import hashlib import json import requests app = Flask(__name__) WEBHOOK_SECRET = "your-128-char-secret" TMI_BASE_URL = "https://tmi.example.com" def verify_signature(payload_bytes, signature, secret): expected = hmac.new( secret.encode(), payload_bytes, hashlib.sha256 ).hexdigest() return hmac.compare_digest(f"sha256={expected}", signature) def generate_signature(payload_bytes, secret): mac = hmac.new(secret.encode(), payload_bytes, hashlib.sha256) return f"sha256={mac.hexdigest()}" def update_status(invocation_id, status, percent, message=""): payload = json.dumps({ "status": status, "status_percent": percent, "status_message": message }) signature = generate_signature(payload.encode(), WEBHOOK_SECRET) requests.post( f"{TMI_BASE_URL}/invocations/{invocation_id}/status", data=payload, headers={ 'Content-Type': 'application/json', 'X-Webhook-Signature': signature } ) @app.route('/webhooks/tmi', methods=['POST']) def handle_invocation(): # Verify signature payload_bytes = request.get_data() signature = request.headers.get('X-Webhook-Signature') if not verify_signature(payload_bytes, signature, WEBHOOK_SECRET): return 'Unauthorized', 401 # Parse payload data = request.json invocation_id = data['invocation_id'] # Queue for async processing task_queue.enqueue(process_invocation, data) # Respond immediately return '', 200 def process_invocation(data): invocation_id = data['invocation_id'] user_payload = data['payload'] try: # Start update_status(invocation_id, "in_progress", 10, "Starting analysis") # Process result = analyze_threats(user_payload) # Progress update_status(invocation_id, "in_progress", 75, "Generating report") # Finish update_status(invocation_id, "completed", 100, "Analysis complete") except Exception as e: update_status(invocation_id, "failed", 0, f"Error: {e}") if __name__ == '__main__': app.run(port=8000) ``` ### Best Practices #### 1. Idempotency Handle duplicate invocations: ```python cache = {} def process_invocation(payload): invocation_id = payload['invocation_id'] # Check if already processed if invocation_id in cache: logger.info(f"Duplicate invocation: {invocation_id}") return cache[invocation_id] # Process result = do_work(payload) # Cache result cache[invocation_id] = result return result ``` #### 2. Progress Updates Update regularly for long operations: ```python def long_operation(invocation_id): update_status(invocation_id, "in_progress", 0, "Starting...") for i, step in enumerate(steps): process_step(step) percent = int((i + 1) / len(steps) * 100) update_status(invocation_id, "in_progress", percent, f"Step {i+1}/{len(steps)}") update_status(invocation_id, "completed", 100, "Done") ``` #### 3. Error Handling Provide useful error messages: ```python try: validate_payload(payload) except ValidationError as e: update_status(invocation_id, "failed", 0, f"Invalid input: {e}. Please check parameters.") ``` #### 4. Timeouts Set reasonable timeouts: ```python from timeout_decorator import timeout @timeout(300) # 5 minute timeout def process_invocation(payload): try: result = do_work(payload) update_status(invocation_id, "completed", 100, "Success") except TimeoutError: update_status(invocation_id, "failed", 0, "Processing timeout after 5 minutes") ``` #### 5. Security - Always verify HMAC signatures - Use HTTPS for all callbacks - Don't log secrets - Validate all input - Use constant-time comparison for signatures ### Testing Your Addon #### Local Development 1. Use ngrok to expose local server: ```bash ngrok http 8000 ``` 2. Register webhook with ngrok URL: ```bash POST /webhooks { "url": "https://abc123.ngrok.io/webhooks/tmi", ... } ``` 3. Invoke addon and check logs #### Testing HMAC ```python import hmac import hashlib import json payload = json.dumps({"status": "completed", "status_percent": 100}) secret = "your-webhook-secret" mac = hmac.new(secret.encode(), payload.encode(), hashlib.sha256) signature = f"sha256={mac.hexdigest()}" print(f"X-Webhook-Signature: {signature}") ``` #### Test Status Updates ```bash curl -X POST https://tmi.example.com/invocations/{id}/status \ -H "Content-Type: application/json" \ -H "X-Webhook-Signature: sha256=..." \ -d '{ "status": "completed", "status_percent": 100, "status_message": "Test completed" }' ``` ## Webhook Integration ### Event Webhooks Subscribe to TMI events: **Available Events**: - `threat_model.created` - `threat_model.updated` - `threat_model.deleted` - `diagram.created` - `diagram.updated` - `threat.created` - `threat.updated` **Register Webhook**: ```bash POST /webhooks Authorization: Bearer {jwt} Content-Type: application/json { "name": "My Integration", "url": "https://my-service.example.com/webhooks", "events": ["threat_model.created", "threat.created"], "secret": "your-hmac-secret" } ``` **Receive Event**: ```http POST /webhooks Content-Type: application/json X-Webhook-Event: threat_model.created X-Webhook-Signature: sha256=... { "event": "threat_model.created", "timestamp": "2025-11-08T12:00:00Z", "data": { "id": "uuid", "name": "New Threat Model", "owner": "alice@example.com" } } ``` ### Webhook Security Always verify HMAC signature before processing. ## Custom Integrations ### Issue Tracking Integration Sync threats with Jira/GitHub Issues: ```python class JiraIntegration: def sync_threat_to_jira(self, threat): # Create Jira issue issue = self.jira_client.create_issue({ 'project': {'key': 'SEC'}, 'summary': threat['name'], 'description': threat['description'], 'issuetype': {'name': 'Security Threat'}, 'labels': [threat['stride']] }) # Store issue key in TMI metadata self.tmi_client.create_metadata( threat['threat_model_id'], threat['id'], 'jira_issue', issue.key ) return issue.key ``` ### CI/CD Integration Validate threat models in CI pipeline: ```bash #!/bin/bash # validate-threat-model.sh TM_ID=$1 API_URL="https://api.tmi.example.com" # Get threat model response=$(curl -s -H "Authorization: Bearer $TOKEN" \ "$API_URL/threat_models/$TM_ID") # Check threat count threat_count=$(echo $response | jq '.threat_count') if [ "$threat_count" -lt 5 ]; then echo "ERROR: Insufficient threats identified (found: $threat_count, required: 5)" exit 1 fi # Check for high severity unmitigated threats unmitigated=$(curl -s -H "Authorization: Bearer $TOKEN" \ "$API_URL/threat_models/$TM_ID/threats" | \ jq '[.[] | select(.severity == "high" and .status == "open")] | length') if [ "$unmitigated" -gt 0 ]; then echo "ERROR: $unmitigated high severity threats not mitigated" exit 1 fi echo "SUCCESS: Threat model validation passed" ``` ### Reporting Integration Generate custom reports: ```python from jinja2 import Template from weasyprint import HTML class ReportGenerator: def generate_pdf_report(self, threat_model_id): # Fetch data from TMI tm = self.tmi_client.get_threat_model(threat_model_id) threats = self.tmi_client.get_threats(threat_model_id) diagrams = self.tmi_client.get_diagrams(threat_model_id) # Render HTML template template = Template(open('report_template.html').read()) html_content = template.render( threat_model=tm, threats=threats, diagrams=diagrams ) # Convert to PDF HTML(string=html_content).write_pdf( f'threat_model_{threat_model_id}.pdf' ) ``` ## Client Libraries ### Building a Client Library Create language-specific SDKs: #### Python Client ```python # tmi_client/__init__.py import requests class TMIClient: def __init__(self, api_url, token): self.api_url = api_url self.session = requests.Session() self.session.headers.update({ 'Authorization': f'Bearer {token}', 'Content-Type': 'application/json' }) def get_threat_models(self): response = self.session.get(f'{self.api_url}/threat_models') response.raise_for_status() return response.json() def create_threat_model(self, name, description): response = self.session.post( f'{self.api_url}/threat_models', json={'name': name, 'description': description} ) response.raise_for_status() return response.json() # Usage client = TMIClient('https://api.tmi.dev', 'your-jwt-token') threat_models = client.get_threat_models() ``` #### JavaScript Client ```javascript // tmi-client.js class TMIClient { constructor(apiUrl, token) { this.apiUrl = apiUrl; this.token = token; } async getThreatModels() { const response = await fetch(`${this.apiUrl}/threat_models`, { headers: { Authorization: `Bearer ${this.token}` } }); if (!response.ok) throw new Error(`HTTP ${response.status}`); return response.json(); } async createThreatModel(name, description) { const response = await fetch(`${this.apiUrl}/threat_models`, { method: 'POST', headers: { 'Authorization': `Bearer ${this.token}`, 'Content-Type': 'application/json' }, body: JSON.stringify({ name, description }) }); if (!response.ok) throw new Error(`HTTP ${response.status}`); return response.json(); } } // Usage const client = new TMIClient('https://api.tmi.dev', 'your-jwt-token'); const threatModels = await client.getThreatModels(); ``` ### Publishing Client Libraries 1. **Package**: - Python: `pip install tmi-client` - JavaScript: `npm install @tmi/client` - Go: `go get github.com/ericfitz/tmi-clients` 2. **Document**: Include examples and API reference 3. **Test**: Comprehensive test coverage 4. **Versioning**: Semantic versioning (semver) ## Examples and Resources ### Example Projects See `https://github.com/ericfitz/tmi-clients` for: - Python client library - JavaScript/TypeScript client - Go client - Example integrations ### Documentation - [API Integration Guide](API-Integration.md) - REST and WebSocket APIs - [Architecture](Architecture-and-Design.md) - System design - Developer docs in `/docs/developer/` of repositories ### Support - GitHub Issues: https://github.com/ericfitz/tmi/issues - API Reference: OpenAPI spec in `/docs/reference/apis/` ## Next Steps - [API Integration](API-Integration.md) - Learn the APIs - [Architecture and Design](Architecture-and-Design.md) - Understand the system - [Getting Started](Getting-Started-with-Development.md) - Set up development environment