-
Notifications
You must be signed in to change notification settings - Fork 0
API Workflows
Eric Fitzgerald edited this page Nov 12, 2025
·
1 revision
This page provides common API workflow patterns with complete code examples in multiple programming languages.
Complete Workflow Definitions:
- File: api-workflows.json
- Version: 1.0.0
- Description: Comprehensive API workflow patterns for TMI
Complete authentication workflow to obtain JWT token.
Flow:
1. GET /oauth2/providers → List available providers
2. GET /oauth2/authorize → Initiate OAuth flow
3. User authenticates with provider
4. GET /oauth2/callback → Receive authorization code
5. POST /oauth2/token → Exchange code for JWT token
6. GET /oauth2/userinfo → Verify authentication
Python Example:
import requests
from urllib.parse import urlencode
# Configuration
BASE_URL = 'https://api.tmi.dev'
CLIENT_ID = 'your-client-id'
REDIRECT_URI = 'https://your-app.com/callback'
def authenticate():
"""Complete OAuth authentication flow"""
# 1. List providers
providers = requests.get(f'{BASE_URL}/oauth2/providers').json()
print('Available providers:', providers)
# 2. Build authorization URL
auth_params = {
'response_type': 'code',
'client_id': CLIENT_ID,
'redirect_uri': REDIRECT_URI,
'scope': 'openid email profile',
'state': 'random-state-string'
}
auth_url = f'{BASE_URL}/oauth2/authorize?{urlencode(auth_params)}'
print('Visit this URL to authenticate:')
print(auth_url)
# 3. User authenticates (browser flow)
# Provider redirects to: {REDIRECT_URI}?code={auth_code}&state={state}
# 4. Exchange authorization code for token
auth_code = input('Enter authorization code: ')
token_response = requests.post(
f'{BASE_URL}/oauth2/token',
data={
'grant_type': 'authorization_code',
'code': auth_code,
'redirect_uri': REDIRECT_URI,
'client_id': CLIENT_ID
}
)
tokens = token_response.json()
access_token = tokens['access_token']
# 5. Verify token by getting user info
user_info = requests.get(
f'{BASE_URL}/oauth2/userinfo',
headers={'Authorization': f'Bearer {access_token}'}
).json()
print(f"Authenticated as: {user_info['email']}")
return access_tokenJavaScript Example:
// OAuth authentication using PKCE (for browser-based apps)
async function authenticate() {
const BASE_URL = 'https://api.tmi.dev';
const CLIENT_ID = 'your-client-id';
const REDIRECT_URI = 'https://your-app.com/callback';
// 1. Generate PKCE code verifier and challenge
const codeVerifier = generateRandomString(128);
const codeChallenge = await generateCodeChallenge(codeVerifier);
// 2. Store code verifier for later
sessionStorage.setItem('code_verifier', codeVerifier);
// 3. Build authorization URL
const authParams = new URLSearchParams({
response_type: 'code',
client_id: CLIENT_ID,
redirect_uri: REDIRECT_URI,
scope: 'openid email profile',
state: generateRandomString(32),
code_challenge: codeChallenge,
code_challenge_method: 'S256'
});
// 4. Redirect to authorization endpoint
window.location.href = `${BASE_URL}/oauth2/authorize?${authParams}`;
}
// Handle OAuth callback
async function handleCallback() {
const params = new URLSearchParams(window.location.search);
const code = params.get('code');
const codeVerifier = sessionStorage.getItem('code_verifier');
// Exchange code for token
const response = await fetch(`${BASE_URL}/oauth2/token`, {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: new URLSearchParams({
grant_type: 'authorization_code',
code: code,
redirect_uri: REDIRECT_URI,
client_id: CLIENT_ID,
code_verifier: codeVerifier
})
});
const tokens = await response.json();
localStorage.setItem('access_token', tokens.access_token);
// Get user info
const userInfo = await fetch(`${BASE_URL}/oauth2/userinfo`, {
headers: { 'Authorization': `Bearer ${tokens.access_token}` }
}).then(r => r.json());
console.log('Authenticated as:', userInfo.email);
}
// Helper functions
function generateRandomString(length) {
const possible = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
let text = '';
for (let i = 0; i < length; i++) {
text += possible.charAt(Math.floor(Math.random() * possible.length));
}
return text;
}
async function generateCodeChallenge(verifier) {
const encoder = new TextEncoder();
const data = encoder.encode(verifier);
const hash = await crypto.subtle.digest('SHA-256', data);
return base64URLEncode(hash);
}
function base64URLEncode(buffer) {
const bytes = new Uint8Array(buffer);
let str = '';
for (const byte of bytes) {
str += String.fromCharCode(byte);
}
return btoa(str)
.replace(/\+/g, '-')
.replace(/\//g, '_')
.replace(/=/g, '');
}Create, read, update, and delete threat models.
Python Example:
import requests
BASE_URL = 'https://api.tmi.dev'
TOKEN = 'your-jwt-token'
headers = {
'Authorization': f'Bearer {TOKEN}',
'Content-Type': 'application/json'
}
def create_threat_model():
"""Create a new threat model"""
response = requests.post(
f'{BASE_URL}/threat_models',
headers=headers,
json={
'name': 'Production API Security',
'description': 'Comprehensive threat model for production APIs',
'authorization': [
{
'subject': 'security-team',
'subject_type': 'group',
'role': 'writer'
}
],
'metadata': {
'project': 'API-Gateway',
'compliance': 'GDPR'
}
}
)
threat_model = response.json()
print(f"Created threat model: {threat_model['id']}")
return threat_model
def list_threat_models():
"""List all accessible threat models"""
response = requests.get(
f'{BASE_URL}/threat_models',
headers=headers,
params={
'limit': 50,
'sort': '-created_at'
}
)
data = response.json()
print(f"Found {data['total']} threat models")
return data['items']
def get_threat_model(threat_model_id):
"""Get specific threat model with details"""
response = requests.get(
f'{BASE_URL}/threat_models/{threat_model_id}',
headers=headers
)
threat_model = response.json()
print(f"Threat Model: {threat_model['name']}")
print(f" Diagrams: {threat_model.get('diagram_count', 0)}")
print(f" Threats: {threat_model.get('threat_count', 0)}")
return threat_model
def update_threat_model(threat_model_id):
"""Update threat model (partial)"""
response = requests.patch(
f'{BASE_URL}/threat_models/{threat_model_id}',
headers=headers,
json={
'description': 'Updated description with new scope',
'metadata': {
'project': 'API-Gateway',
'compliance': 'GDPR,HIPAA'
}
}
)
updated = response.json()
print(f"Updated threat model: {updated['id']}")
return updated
def delete_threat_model(threat_model_id):
"""Delete threat model"""
response = requests.delete(
f'{BASE_URL}/threat_models/{threat_model_id}',
headers=headers
)
if response.status_code == 204:
print(f"Deleted threat model: {threat_model_id}")
return response.status_code == 204
# Complete workflow
if __name__ == '__main__':
# Create
tm = create_threat_model()
# List
all_models = list_threat_models()
# Get
details = get_threat_model(tm['id'])
# Update
updated = update_threat_model(tm['id'])
# Delete
deleted = delete_threat_model(tm['id'])Go Example:
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
)
const (
BaseURL = "https://api.tmi.dev"
Token = "your-jwt-token"
)
type ThreatModel struct {
ID string `json:"id,omitempty"`
Name string `json:"name"`
Description string `json:"description"`
Authorization []Authorization `json:"authorization,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
type Authorization struct {
Subject string `json:"subject"`
SubjectType string `json:"subject_type"`
Role string `json:"role"`
}
func createThreatModel() (*ThreatModel, error) {
tm := &ThreatModel{
Name: "Production API Security",
Description: "Comprehensive threat model for production APIs",
Authorization: []Authorization{
{
Subject: "security-team",
SubjectType: "group",
Role: "writer",
},
},
Metadata: map[string]interface{}{
"project": "API-Gateway",
"compliance": "GDPR",
},
}
body, _ := json.Marshal(tm)
req, _ := http.NewRequest("POST", BaseURL+"/threat_models", bytes.NewBuffer(body))
req.Header.Set("Authorization", "Bearer "+Token)
req.Header.Set("Content-Type", "application/json")
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var created ThreatModel
json.NewDecoder(resp.Body).Decode(&created)
fmt.Printf("Created threat model: %s\n", created.ID)
return &created, nil
}
func listThreatModels() ([]ThreatModel, error) {
req, _ := http.NewRequest("GET", BaseURL+"/threat_models?limit=50", nil)
req.Header.Set("Authorization", "Bearer "+Token)
client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var data struct {
Items []ThreatModel `json:"items"`
Total int `json:"total"`
}
json.NewDecoder(resp.Body).Decode(&data)
fmt.Printf("Found %d threat models\n", data.Total)
return data.Items, nil
}
func main() {
// Create
tm, _ := createThreatModel()
// List
models, _ := listThreatModels()
fmt.Printf("Total models: %d\n", len(models))
// Get, update, delete would follow similar patterns
}Python Example:
def create_diagram(threat_model_id):
"""Create a data flow diagram"""
response = requests.post(
f'{BASE_URL}/threat_models/{threat_model_id}/diagrams',
headers=headers,
json={
'title': 'System Architecture',
'diagram_type': 'data_flow',
'diagram_json': {
'cells': [
{
'id': 'process-1',
'type': 'process',
'position': {'x': 100, 'y': 100},
'size': {'width': 120, 'height': 60},
'attrs': {
'label': {'text': 'API Gateway'}
}
},
{
'id': 'datastore-1',
'type': 'datastore',
'position': {'x': 300, 'y': 100},
'size': {'width': 100, 'height': 60},
'attrs': {
'label': {'text': 'Database'}
}
},
{
'id': 'link-1',
'type': 'link',
'source': {'id': 'process-1'},
'target': {'id': 'datastore-1'},
'attrs': {
'label': {'text': 'SQL Query'}
}
}
],
'assets': [
{
'cell_id': 'datastore-1',
'name': 'User Database',
'sensitivity': 'high',
'description': 'Contains PII'
}
]
}
}
)
diagram = response.json()
print(f"Created diagram: {diagram['id']}")
return diagramPython Example:
def identify_threats(threat_model_id):
"""Create multiple threats for a threat model"""
threats = [
{
'threat_model_id': threat_model_id,
'title': 'SQL Injection Vulnerability',
'description': 'User input not sanitized before SQL queries',
'category': 'injection',
'severity': 'high',
'likelihood': 'medium',
'impact': 'Data breach, unauthorized access',
'mitigation': 'Use parameterized queries and input validation',
'status': 'open',
'affected_assets': ['User Database']
},
{
'threat_model_id': threat_model_id,
'title': 'Insufficient Logging',
'description': 'Security events not logged adequately',
'category': 'logging',
'severity': 'medium',
'likelihood': 'high',
'impact': 'Delayed incident detection',
'mitigation': 'Implement comprehensive logging',
'status': 'open'
},
{
'threat_model_id': threat_model_id,
'title': 'Missing Authentication',
'description': 'API endpoint lacks authentication',
'category': 'authentication',
'severity': 'critical',
'likelihood': 'high',
'impact': 'Unauthorized access to sensitive data',
'mitigation': 'Add OAuth 2.0 authentication',
'status': 'open'
}
]
created_threats = []
for threat_data in threats:
response = requests.post(
f'{BASE_URL}/threats',
headers=headers,
json=threat_data
)
threat = response.json()
created_threats.append(threat)
print(f"Created threat: {threat['title']}")
return created_threats
def update_threat_status(threat_id, status, resolution_notes):
"""Update threat status to mitigated/resolved"""
response = requests.patch(
f'{BASE_URL}/threats/{threat_id}',
headers=headers,
json={
'status': status,
'resolution_notes': resolution_notes,
'resolved_at': '2025-01-15T10:00:00Z'
}
)
updated = response.json()
print(f"Updated threat status: {updated['status']}")
return updatedPython Example:
def setup_webhook_integration():
"""Complete webhook setup workflow"""
# 1. Create webhook subscription
response = requests.post(
f'{BASE_URL}/webhook/subscriptions',
headers=headers,
json={
'name': 'Slack Notifications',
'url': 'https://your-service.com/webhooks/tmi',
'events': [
'threat_model.created',
'threat_model.updated',
'threat.created'
],
'secret': 'your-webhook-secret',
'threat_model_id': None # All threat models
}
)
subscription = response.json()
print(f"Created webhook: {subscription['id']}")
print(f"Status: {subscription['status']}") # pending_verification
# 2. Verify endpoint responds to challenge
# (Your webhook endpoint must respond to challenge request)
# 3. Check subscription status
response = requests.get(
f"{BASE_URL}/webhook/subscriptions/{subscription['id']}",
headers=headers
)
current = response.json()
print(f"Current status: {current['status']}") # active after verification
# 4. Monitor deliveries
response = requests.get(
f'{BASE_URL}/webhook/deliveries',
headers=headers,
params={'subscription_id': subscription['id']}
)
deliveries = response.json()
print(f"Delivery count: {len(deliveries['items'])}")
for delivery in deliveries['items']:
print(f" {delivery['event_type']}: {delivery['status']}")
return subscriptionPython Example:
def batch_create_resources(threat_model_id):
"""Create multiple resources efficiently"""
# Create multiple threats in parallel
import concurrent.futures
threat_templates = [
{
'title': f'Threat {i}',
'description': f'Description for threat {i}',
'category': 'other',
'severity': 'medium',
'threat_model_id': threat_model_id
}
for i in range(10)
]
def create_single_threat(threat_data):
response = requests.post(
f'{BASE_URL}/threats',
headers=headers,
json=threat_data
)
return response.json()
# Create threats in parallel
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
threats = list(executor.map(create_single_threat, threat_templates))
print(f"Created {len(threats)} threats")
return threatsPython Example:
def generate_threat_report(threat_model_id):
"""Generate comprehensive threat model report"""
# 1. Get threat model
tm = requests.get(
f'{BASE_URL}/threat_models/{threat_model_id}',
headers=headers
).json()
# 2. Get all threats
threats = requests.get(
f'{BASE_URL}/threats',
headers=headers,
params={'threat_model_id': threat_model_id, 'limit': 500}
).json()['items']
# 3. Get all diagrams
diagrams = requests.get(
f'{BASE_URL}/threat_models/{threat_model_id}/diagrams',
headers=headers
).json()['items']
# 4. Build report
report = {
'threat_model': {
'name': tm['name'],
'description': tm['description'],
'owner': tm['owner'],
'created_at': tm['created_at']
},
'statistics': {
'total_threats': len(threats),
'critical_threats': len([t for t in threats if t['severity'] == 'critical']),
'high_threats': len([t for t in threats if t['severity'] == 'high']),
'open_threats': len([t for t in threats if t['status'] == 'open']),
'mitigated_threats': len([t for t in threats if t['status'] == 'mitigated'])
},
'threats_by_category': {},
'diagrams': len(diagrams),
'threats': threats
}
# Group threats by category
for threat in threats:
category = threat.get('category', 'other')
if category not in report['threats_by_category']:
report['threats_by_category'][category] = []
report['threats_by_category'][category].append(threat)
# 5. Export report
import json
with open(f'threat_report_{threat_model_id}.json', 'w') as f:
json.dump(report, f, indent=2)
print(f"Generated report: threat_report_{threat_model_id}.json")
return reportPython Example:
import time
from requests.exceptions import RequestException
def api_call_with_retry(method, url, **kwargs):
"""Make API call with exponential backoff retry"""
max_retries = 3
base_delay = 1
for attempt in range(max_retries):
try:
response = requests.request(method, url, **kwargs)
# Handle different status codes
if response.status_code == 200 or response.status_code == 201:
return response.json()
elif response.status_code == 204:
return None # Success, no content
elif response.status_code == 401:
# Token expired, re-authenticate
raise Exception('Authentication required')
elif response.status_code == 429:
# Rate limited, wait and retry
retry_after = int(response.headers.get('Retry-After', base_delay * (2 ** attempt)))
print(f'Rate limited, waiting {retry_after}s')
time.sleep(retry_after)
continue
elif response.status_code >= 500:
# Server error, retry
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f'Server error, retrying in {delay}s')
time.sleep(delay)
continue
# Other errors
error = response.json()
raise Exception(f"API Error: {error.get('message', 'Unknown error')}")
except RequestException as e:
if attempt < max_retries - 1:
delay = base_delay * (2 ** attempt)
print(f'Network error, retrying in {delay}s: {e}')
time.sleep(delay)
else:
raise
raise Exception('Max retries exceeded')- API-Overview - API design and authentication
- REST-API-Reference - Complete endpoint reference
- WebSocket-API-Reference - Real-time collaboration
- API-Integration - Integration guide
- Webhook-Integration - Webhook setup
- Addon-System - Addon integration
- API Workflows Specification
- Using TMI for Threat Modeling
- Accessing TMI
- Creating Your First Threat Model
- Understanding the User Interface
- Working with Data Flow Diagrams
- Managing Threats
- Collaborative Threat Modeling
- Using Notes and Documentation
- Metadata and Extensions
- Planning Your Deployment
- Deploying TMI Server
- Deploying TMI Web Application
- Setting Up Authentication
- Database Setup
- Component Integration
- Post-Deployment
- Monitoring and Health
- Database Operations
- Security Operations
- Performance and Scaling
- Maintenance Tasks