Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 68 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,36 @@ pnpm run encrypted-chat -- --model deepseek-ai/DeepSeek-V3.1
pnpm run encrypted-chat -- --model deepseek-ai/DeepSeek-V3.1 --test-both
```

### Image Generation Verification

```bash
export API_KEY=sk-your-api-key-here

# Python
python3 py/image_verifier.py --model black-forest-labs/FLUX.2-klein-4B

# TypeScript
pnpm run image -- --model black-forest-labs/FLUX.2-klein-4B
```

### Encrypted Image Generation Verification

```bash
export API_KEY=sk-your-api-key-here

# Python - Test ECDSA encryption
python3 py/encrypted_image_verifier.py --model black-forest-labs/FLUX.2-klein-4B

# Python - Test both ECDSA and Ed25519
python3 py/encrypted_image_verifier.py --model black-forest-labs/FLUX.2-klein-4B --test-both

# TypeScript - Test ECDSA encryption
pnpm run encrypted-image -- --model black-forest-labs/FLUX.2-klein-4B

# TypeScript - Test both algorithms
pnpm run encrypted-image -- --model black-forest-labs/FLUX.2-klein-4B --test-both
```

### Domain Verification

```bash
Expand Down Expand Up @@ -228,6 +258,19 @@ Fetches chat completions (streaming and non-streaming), verifies ECDSA signature

**Note**: The verifier supplies a fresh nonce when fetching attestation (step 5), which ensures attestation freshness but means the nonce/report_data won't match the original signing context. This is expected behavior - the verifier proves the signing key is bound to valid hardware, not that a specific attestation was used for signing.

## 🖼️ Image Generation Verifier

Fetches image generation responses, verifies ECDSA signatures, and validates attestations:

1. Sends image generation request to `/v1/images/generations`
2. Fetches signature from `/v1/signature/{image_id}` endpoint (using the `id` field from the response)
3. Verifies request hash and response hash match the signed hashes
4. Recovers ECDSA signing address from signature
5. Fetches fresh attestation with user-supplied nonce for the recovered signing address
6. Validates attestation using the same checks as attestation verifier

**Note**: The image generation verifier follows the same pattern as the chat verifier. The response includes an `id` field that is used to fetch the signature, just like `chat_id` for chat completions.

### Setup

Set your API key as an environment variable:
Expand Down Expand Up @@ -267,6 +310,10 @@ pnpm run model -- [--model MODEL_NAME]

Tests end-to-end encryption for chat completions. Encrypts request messages and decrypts response content using ECDSA or Ed25519 signing algorithms.

## 🔐 Encrypted Image Generation Verifier

Tests end-to-end encryption for image generation. Encrypts the prompt in the request and decrypts the response fields (`b64_json` and `revised_prompt`) using ECDSA or Ed25519 signing algorithms. When encryption is enabled, both the request prompt and response image data fields are encrypted.

### Setup

Set your API key as an environment variable:
Expand Down Expand Up @@ -299,7 +346,7 @@ pnpm run encrypted-chat -- --model deepseek-ai/DeepSeek-V3.1 --test-both

**Default model**: `deepseek-ai/DeepSeek-V3.1`

### What It Tests
### What It Tests (Chat)

- ✅ **End-to-End Encryption** - Request messages encrypted with model's public key
- ✅ **Response Decryption** - Response content decrypted with client's private key
Expand All @@ -308,6 +355,14 @@ pnpm run encrypted-chat -- --model deepseek-ai/DeepSeek-V3.1 --test-both
- ✅ **Streaming Support** - Decrypts streaming responses in real-time
- ✅ **Non-Streaming Support** - Decrypts complete non-streaming responses

### What It Tests (Image Generation)

- ✅ **End-to-End Encryption** - Request prompt encrypted with model's public key
- ✅ **Response Decryption** - Response `b64_json` and `revised_prompt` fields decrypted with client's private key
- ✅ **ECDSA Encryption** - ECIES (Elliptic Curve Integrated Encryption Scheme) with AES-GCM
- ✅ **Ed25519 Encryption** - X25519 key exchange with ChaCha20-Poly1305
- ✅ **Signature Verification** - Verifies request and response hashes match signed values

### Encryption Headers

The verifier automatically includes the following headers for encrypted requests:
Expand Down Expand Up @@ -526,6 +581,18 @@ python3 py/chat_verifier.py --model deepseek-ai/DeepSeek-V3.1
pnpm run chat -- --model deepseek-ai/DeepSeek-V3.1
```

### Image Generation Verification with Custom Model

```bash
export API_KEY=sk-your-api-key-here

# Python
python3 py/image_verifier.py --model black-forest-labs/FLUX.2-klein-4B

# TypeScript
pnpm run image -- --model black-forest-labs/FLUX.2-klein-4B
```

### Domain Verification

```bash
Expand Down
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
"build": "tsc",
"chat": "tsx ts/chat_verifier.ts",
"encrypted-chat": "tsx ts/encrypted_chat_verifier.ts",
"image": "tsx ts/image_verifier.ts",
"encrypted-image": "tsx ts/encrypted_image_verifier.ts",
"model": "tsx ts/model_verifier.ts",
"domain": "tsx ts/domain_verifier.ts",
"test": "echo \"Error: no test specified\" && exit 1"
Expand Down
244 changes: 10 additions & 234 deletions py/encrypted_chat_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,15 @@
import asyncio
import json
import os
import secrets

import requests
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, ed25519
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.backends import default_backend
from nacl.public import (
PrivateKey as X25519PrivateKeyNaCl,
PublicKey as X25519PublicKeyNaCl,
Box,
from encryption_utils import (
decrypt_text,
encrypt_text,
fetch_model_public_key as fetch_model_public_key_util,
generate_ecdsa_key_pair,
generate_ed25519_key_pair,
)
from nacl import bindings

from chat_verifier import verify_chat

Expand All @@ -29,242 +24,23 @@

def fetch_model_public_key(model, signing_algo="ecdsa"):
"""Fetch model public key from attestation report."""
url = f"{BASE_URL}/v1/attestation/report?model={model}&signing_algo={signing_algo}"
headers = {"Authorization": f"Bearer {API_KEY}"}
report = requests.get(url, headers=headers, timeout=30).json()

# Try to get signing_public_key from model_attestations
if "model_attestations" in report:
for attestation in report["model_attestations"]:
if "signing_public_key" in attestation:
return attestation["signing_public_key"]

raise ValueError(
f"Could not find signing_public_key for model {model} with algorithm {signing_algo}"
return fetch_model_public_key_util(
base_url=BASE_URL, api_key=API_KEY, model=model, signing_algo=signing_algo
)


def generate_ecdsa_key_pair():
"""Generate ECDSA key pair and return (private_key_hex, public_key_hex, private_key_obj)."""
private_key = ec.generate_private_key(ec.SECP256K1(), default_backend())
public_key = private_key.public_key()

# Get private key bytes (32 bytes)
# SECP256K1 doesn't support Raw format, so we extract the integer value
private_numbers = private_key.private_numbers()
private_key_int = private_numbers.private_value
# Convert to 32-byte big-endian representation
private_key_bytes = private_key_int.to_bytes(32, byteorder="big")

# Get public key bytes (uncompressed, 65 bytes with 0x04 prefix)
public_key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)

# Remove 0x04 prefix for public key (64 bytes)
public_key_hex = public_key_bytes[1:].hex()
private_key_hex = private_key_bytes.hex()

return private_key_hex, public_key_hex, private_key


def generate_ed25519_key_pair():
"""Generate Ed25519 key pair and return (private_key_hex, public_key_hex, private_key_obj)."""
private_key = ed25519.Ed25519PrivateKey.generate()
public_key = private_key.public_key()

# Get private key bytes (32 bytes seed)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)

# Get public key bytes (32 bytes)
public_key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)

return private_key_bytes.hex(), public_key_bytes.hex(), private_key


def encrypt_ecdsa(data: bytes, public_key_hex: str) -> bytes:
"""Encrypt data using ECDSA public key (ECIES)."""
# Parse public key from hex
public_key_bytes = bytes.fromhex(public_key_hex)
if len(public_key_bytes) == 65 and public_key_bytes[0] == 0x04:
public_key_bytes = public_key_bytes[1:] # Remove 0x04 prefix

if len(public_key_bytes) != 64:
raise ValueError(
f"ECDSA public key must be 64 bytes, got {len(public_key_bytes)}"
)

# Create EC public key
public_key = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256K1(), b"\x04" + public_key_bytes
)

# Generate ephemeral EC key pair
ephemeral_private = ec.generate_private_key(ec.SECP256K1(), default_backend())
ephemeral_public = ephemeral_private.public_key()

# Perform ECDH key exchange
shared_secret = ephemeral_private.exchange(ec.ECDH(), public_key)

# Derive AES key using HKDF
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"ecdsa_encryption",
backend=default_backend(),
)
aes_key = hkdf.derive(shared_secret)

# Encrypt with AES-GCM
nonce = secrets.token_bytes(12)
aesgcm = AESGCM(aes_key)
ciphertext = aesgcm.encrypt(nonce, data, None)

# Format: [ephemeral_public_key (65 bytes)][nonce (12 bytes)][ciphertext]
ephemeral_public_bytes = ephemeral_public.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint,
)
return ephemeral_public_bytes + nonce + ciphertext


def decrypt_ecdsa(encrypted_data: bytes, private_key_obj) -> bytes:
"""Decrypt data using ECDSA private key."""
if len(encrypted_data) < 93:
raise ValueError("Encrypted data too short")

# Extract components
ephemeral_public_bytes = encrypted_data[:65]
nonce = encrypted_data[65:77]
ciphertext = encrypted_data[77:]

# Parse ephemeral public key
ephemeral_public = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256K1(), ephemeral_public_bytes
)

# Use the private key object directly for ECDH exchange
# private_key_obj is already an EllipticCurvePrivateKey
shared_secret = private_key_obj.exchange(ec.ECDH(), ephemeral_public)

# Derive AES key using HKDF
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=None,
info=b"ecdsa_encryption",
backend=default_backend(),
)
aes_key = hkdf.derive(shared_secret)

# Decrypt with AES-GCM
aesgcm = AESGCM(aes_key)
plaintext = aesgcm.decrypt(nonce, ciphertext, None)

return plaintext


def encrypt_ed25519(data: bytes, public_key_hex: str) -> bytes:
"""Encrypt data using Ed25519 public key via PyNaCl Box (X25519 + ChaCha20-Poly1305)."""
# Parse public key from hex
public_key_bytes = bytes.fromhex(public_key_hex)
if len(public_key_bytes) != 32:
raise ValueError(
f"Ed25519 public key must be 32 bytes, got {len(public_key_bytes)}"
)

# Convert Ed25519 public key to X25519 public key (PyNaCl format)
x25519_public = X25519PublicKeyNaCl(
bindings.crypto_sign_ed25519_pk_to_curve25519(public_key_bytes)
)

# Generate ephemeral X25519 key pair using PyNaCl
ephemeral_private = X25519PrivateKeyNaCl.generate()
ephemeral_public = ephemeral_private.public_key

# Create Box for encryption
box = Box(ephemeral_private, x25519_public)

# Encrypt using PyNaCl Box
encrypted = box.encrypt(data)

# Format: [ephemeral_public_key (32 bytes)][nonce (24 bytes)][ciphertext]
ephemeral_public_bytes = bytes(ephemeral_public)
return ephemeral_public_bytes + encrypted


def decrypt_ed25519(encrypted_data: bytes, private_key_obj) -> bytes:
"""Decrypt data using Ed25519 private key via PyNaCl Box."""
if len(encrypted_data) < 72:
raise ValueError("Encrypted data too short")

# Extract components
ephemeral_public_bytes = encrypted_data[:32]
box_encrypted = encrypted_data[32:] # Contains [nonce (24 bytes)][ciphertext]

# Get Ed25519 private key and convert to X25519 private (PyNaCl format)
seed_bytes = private_key_obj.private_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PrivateFormat.Raw,
encryption_algorithm=serialization.NoEncryption(),
)
public_key_bytes = private_key_obj.public_key().public_bytes(
encoding=serialization.Encoding.Raw,
format=serialization.PublicFormat.Raw,
)
ed25519_secret_key = seed_bytes + public_key_bytes
x25519_private_bytes = bindings.crypto_sign_ed25519_sk_to_curve25519(
ed25519_secret_key
)
x25519_private = X25519PrivateKeyNaCl(x25519_private_bytes)

# Convert ephemeral public key to X25519 (PyNaCl format)
ephemeral_public = X25519PublicKeyNaCl(ephemeral_public_bytes)

# Create Box for decryption
box = Box(x25519_private, ephemeral_public)

# Decrypt using PyNaCl Box
plaintext = box.decrypt(box_encrypted)

return plaintext


def encrypt_message_content(
message_content: str, model_public_key: str, signing_algo: str
) -> str:
"""Encrypt message content using model's public key."""
data = message_content.encode("utf-8")
if signing_algo == "ecdsa":
encrypted = encrypt_ecdsa(data, model_public_key)
elif signing_algo == "ed25519":
encrypted = encrypt_ed25519(data, model_public_key)
else:
raise ValueError(f"Unsupported signing algorithm: {signing_algo}")
return encrypted.hex()
return encrypt_text(message_content, model_public_key, signing_algo)


def decrypt_message_content(
encrypted_hex: str, client_private_key, signing_algo: str
) -> str:
"""Decrypt message content using client's private key."""
encrypted_data = bytes.fromhex(encrypted_hex)
if signing_algo == "ecdsa":
decrypted = decrypt_ecdsa(encrypted_data, client_private_key)
elif signing_algo == "ed25519":
decrypted = decrypt_ed25519(encrypted_data, client_private_key)
else:
raise ValueError(f"Unsupported signing algorithm: {signing_algo}")
return decrypted.decode("utf-8")
return decrypt_text(encrypted_hex, client_private_key, signing_algo)


async def encrypted_streaming_example(model, signing_algo="ecdsa"):
Expand Down
Loading