|
| 1 | +# AgentConnect: https://github.com/chgaowei/AgentConnect |
| 2 | +# Author: GaoWei Chang |
| 3 | +# Email: chgaowei@gmail.com |
| 4 | +# Website: https://agent-network-protocol.com/ |
| 5 | +# |
| 6 | +# This project is open-sourced under the MIT License. For details, please see the LICENSE file. |
| 7 | + |
| 8 | +# This is a basic example of how to use DID WBA authentication. |
| 9 | +# It first creates a DID document and private keys. |
| 10 | +# Then it uploads the DID document to the server. |
| 11 | +# Then it generates an authentication header and tests the DID authentication. |
| 12 | + |
| 13 | +import os |
| 14 | +import sys |
| 15 | +import json |
| 16 | +import secrets |
| 17 | +import asyncio |
| 18 | +import aiohttp |
| 19 | +import logging |
| 20 | +from pathlib import Path |
| 21 | +from cryptography.hazmat.primitives import serialization, hashes |
| 22 | +from cryptography.hazmat.primitives.asymmetric import ec |
| 23 | +from canonicaljson import encode_canonical_json |
| 24 | + |
| 25 | +from agent_connect.authentication.did_wba import ( |
| 26 | + create_did_wba_document, |
| 27 | + resolve_did_wba_document, |
| 28 | + generate_auth_header |
| 29 | +) |
| 30 | +from agent_connect.utils.log_base import set_log_color_level |
| 31 | + |
| 32 | +_is_local_testing = False |
| 33 | + |
| 34 | +# TODO: Change to your own server domain. |
| 35 | +# Or use the test domain we provide (currently using pi-unlimited.com, will later change to agent-network-protocol.com) |
| 36 | +# SERVER_DOMAIN = "agent-network-protocol.com" |
| 37 | +SERVER_DOMAIN = "pi-unlimited.com" |
| 38 | + |
| 39 | +def convert_url_for_local_testing(url: str) -> str: |
| 40 | + if _is_local_testing: |
| 41 | + url = url.replace('https://', 'http://') |
| 42 | + url = url.replace(SERVER_DOMAIN, '127.0.0.1:9000') |
| 43 | + return url |
| 44 | + |
| 45 | +async def upload_did_document(url: str, did_document: dict) -> bool: |
| 46 | + """Upload DID document to server""" |
| 47 | + try: |
| 48 | + local_url = convert_url_for_local_testing(url) |
| 49 | + logging.info("Converting URL from %s to %s", url, local_url) |
| 50 | + |
| 51 | + async with aiohttp.ClientSession() as session: |
| 52 | + async with session.put( |
| 53 | + local_url, |
| 54 | + json=did_document, |
| 55 | + headers={'Content-Type': 'application/json'} |
| 56 | + ) as response: |
| 57 | + return response.status == 200 |
| 58 | + except Exception as e: |
| 59 | + logging.error("Failed to upload DID document: %s", e) |
| 60 | + return False |
| 61 | + |
| 62 | +async def download_did_document(url: str) -> dict: |
| 63 | + """Download DID document from server""" |
| 64 | + try: |
| 65 | + local_url = convert_url_for_local_testing(url) |
| 66 | + logging.info("Converting URL from %s to %s", url, local_url) |
| 67 | + |
| 68 | + async with aiohttp.ClientSession() as session: |
| 69 | + async with session.get(local_url) as response: |
| 70 | + if response.status == 200: |
| 71 | + return await response.json() |
| 72 | + logging.warning("Failed to download DID document, status: %d", response.status) |
| 73 | + return None |
| 74 | + except Exception as e: |
| 75 | + logging.error("Failed to download DID document: %s", e) |
| 76 | + return None |
| 77 | + |
| 78 | +async def test_did_auth(url: str, auth_header: str) -> tuple[bool, str]: |
| 79 | + """Test DID authentication and get token""" |
| 80 | + try: |
| 81 | + local_url = convert_url_for_local_testing(url) |
| 82 | + logging.info("Converting URL from %s to %s", url, local_url) |
| 83 | + |
| 84 | + async with aiohttp.ClientSession() as session: |
| 85 | + async with session.get( |
| 86 | + local_url, |
| 87 | + headers={'Authorization': auth_header} |
| 88 | + ) as response: |
| 89 | + token = response.headers.get('Authorization', '') |
| 90 | + if token.startswith('Bearer '): |
| 91 | + token = token[7:] # Remove 'Bearer ' prefix |
| 92 | + return response.status == 200, token |
| 93 | + except Exception as e: |
| 94 | + logging.error("DID authentication test failed: %s", e) |
| 95 | + return False, '' |
| 96 | + |
| 97 | +def save_private_key(unique_id: str, keys: dict, did_document: dict) -> str: |
| 98 | + """Save private keys and DID document to user directory and return the user directory path""" |
| 99 | + current_dir = Path(__file__).parent.absolute() |
| 100 | + user_dir = current_dir / "did_keys" / f"user_{unique_id}" |
| 101 | + # Create parent directories if they don't exist |
| 102 | + user_dir.mkdir(parents=True, exist_ok=True) |
| 103 | + |
| 104 | + # Save private keys |
| 105 | + for method_fragment, (private_key_bytes, _) in keys.items(): |
| 106 | + private_key_path = user_dir / f"{method_fragment}_private.pem" |
| 107 | + with open(private_key_path, 'wb') as f: |
| 108 | + f.write(private_key_bytes) |
| 109 | + logging.info("Saved private key '%s' to %s", method_fragment, private_key_path) |
| 110 | + |
| 111 | + # Save DID document |
| 112 | + did_path = user_dir / "did.json" |
| 113 | + with open(did_path, 'w', encoding='utf-8') as f: |
| 114 | + json.dump(did_document, f, indent=2) |
| 115 | + logging.info("Saved DID document to %s", did_path) |
| 116 | + |
| 117 | + return str(user_dir) |
| 118 | + |
| 119 | +def load_private_key(private_key_dir: str, method_fragment: str) -> ec.EllipticCurvePrivateKey: |
| 120 | + """Load private key from file""" |
| 121 | + key_dir = Path(private_key_dir) |
| 122 | + key_path = key_dir / f"{method_fragment}_private.pem" |
| 123 | + |
| 124 | + logging.info("Loading private key from %s", key_path) |
| 125 | + with open(key_path, 'rb') as f: |
| 126 | + private_key_bytes = f.read() |
| 127 | + return serialization.load_pem_private_key( |
| 128 | + private_key_bytes, |
| 129 | + password=None |
| 130 | + ) |
| 131 | + |
| 132 | +def sign_callback(content: bytes, method_fragment: str) -> bytes: |
| 133 | + """Sign content using private key""" |
| 134 | + # Load private key using the global variable |
| 135 | + private_key = load_private_key(sign_callback.private_key_dir, method_fragment) |
| 136 | + |
| 137 | + # Sign the content |
| 138 | + signature = private_key.sign( |
| 139 | + content, |
| 140 | + ec.ECDSA(hashes.SHA256()) |
| 141 | + ) |
| 142 | + return signature |
| 143 | + |
| 144 | +async def main(): |
| 145 | + # 1. Generate unique identifier (8 bytes = 16 hex characters) |
| 146 | + unique_id = secrets.token_hex(8) |
| 147 | + |
| 148 | + # 2. Set server information |
| 149 | + server_domain = SERVER_DOMAIN |
| 150 | + base_path = f"/wba/user/{unique_id}" |
| 151 | + did_path = f"{base_path}/did.json" |
| 152 | + |
| 153 | + # 3. Create DID document |
| 154 | + logging.info("Creating DID document...") |
| 155 | + did_document, keys = create_did_wba_document( |
| 156 | + hostname=server_domain, |
| 157 | + path_segments=["wba", "user", unique_id] |
| 158 | + ) |
| 159 | + |
| 160 | + # 4. Save private keys, DID document and set path for sign_callback |
| 161 | + user_dir = save_private_key(unique_id, keys, did_document) |
| 162 | + sign_callback.private_key_dir = user_dir |
| 163 | + |
| 164 | + # 5. Upload DID document (This should be stored on your server) |
| 165 | + document_url = f"https://{server_domain}{did_path}" |
| 166 | + logging.info("Uploading DID document to %s", document_url) |
| 167 | + success = await upload_did_document(document_url, did_document) |
| 168 | + if not success: |
| 169 | + logging.error("Failed to upload DID document") |
| 170 | + return |
| 171 | + logging.info("DID document uploaded successfully") |
| 172 | + |
| 173 | + # 7. Generate authentication header |
| 174 | + logging.info("Generating authentication header...") |
| 175 | + auth_header = generate_auth_header( |
| 176 | + did_document, |
| 177 | + server_domain, |
| 178 | + sign_callback |
| 179 | + ) |
| 180 | + |
| 181 | + # 8. Test DID authentication and get token |
| 182 | + test_url = f"https://{server_domain}/wba/test" |
| 183 | + logging.info("Testing DID authentication at %s", test_url) |
| 184 | + auth_success, token = await test_did_auth(test_url, auth_header) |
| 185 | + |
| 186 | + if not auth_success or not token: |
| 187 | + logging.error(f"DID authentication test failed. auth_success: {auth_success}, token: {token}") |
| 188 | + return |
| 189 | + |
| 190 | + logging.info("DID authentication test successful") |
| 191 | + |
| 192 | +if __name__ == "__main__": |
| 193 | + set_log_color_level(logging.INFO) |
| 194 | + asyncio.run(main()) |
| 195 | + |
| 196 | + |
| 197 | + |
| 198 | + |
| 199 | + |
| 200 | + |
| 201 | + |
0 commit comments