Skip to content

Commit 8abaace

Browse files
test: add 101 tests for CA security, MCP integration, and audit stubs
- test_ca_security.py: +30 tests (SVID certificate validation, registration lifecycle, CA init, token expiration edge cases) - test_mcp_integration.py: +71 tests (TrustGatedMCPServer tool invocation, capability matching, audit recording, client verification; TrustGatedMCPClient connect/invoke/credentials) Total suite: 1805 passed, 35 skipped. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 0f96adc commit 8abaace

File tree

2 files changed

+1307
-0
lines changed

2 files changed

+1307
-0
lines changed

packages/agent-mesh/tests/test_ca_security.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,16 @@
55
Validates that:
66
- Sponsor signatures are cryptographically verified (CRIT-1 fix)
77
- Refresh tokens are validated against issued tokens (CRIT-2 fix)
8+
- SVID certificate issuance and properties
9+
- Full registration-to-rotation lifecycle
810
"""
911

1012
import pytest
13+
from datetime import datetime, timedelta, timezone
14+
from unittest.mock import patch
1115
from cryptography.hazmat.primitives.asymmetric import ed25519
1216
from cryptography.hazmat.primitives import serialization
17+
from cryptography import x509
1318

1419
from agentmesh.core.identity.ca import (
1520
CertificateAuthority,
@@ -256,3 +261,223 @@ def test_missing_public_key_raises(self):
256261
reg = self._register_agent(ca, sponsor_key, email)
257262
with pytest.raises(ValueError, match="New public key required"):
258263
ca.rotate_credentials(reg.agent_did, reg.refresh_token)
264+
265+
266+
# ---------------------------------------------------------------------------
267+
# Registration lifecycle tests
268+
# ---------------------------------------------------------------------------
269+
270+
271+
class TestRegistrationLifecycle:
272+
"""End-to-end registration and credential properties."""
273+
274+
def _register(self, ca, sponsor_key, email, name="test-agent", caps=None):
275+
sig = _sign_registration(sponsor_key, name, email, caps or [])
276+
request = RegistrationRequest(
277+
agent_name=name,
278+
public_key=_make_agent_public_key(),
279+
sponsor_email=email,
280+
sponsor_signature=sig,
281+
capabilities=caps or [],
282+
)
283+
return ca.register_agent(request)
284+
285+
def test_response_contains_valid_svid_certificate(self):
286+
ca, sponsor_key, email = _make_ca_with_sponsor()
287+
resp = self._register(ca, sponsor_key, email)
288+
# DER bytes should parse as a valid X.509 certificate
289+
cert = x509.load_der_x509_certificate(resp.svid_certificate)
290+
# Subject CN should be the agent DID
291+
cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0]
292+
assert cn.value == resp.agent_did
293+
# Should have SPIFFE SAN
294+
san = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName)
295+
uris = san.value.get_values_for_type(x509.UniformResourceIdentifier)
296+
assert any(resp.agent_did in uri for uri in uris)
297+
298+
def test_response_contains_ca_pem_certificate(self):
299+
ca, sponsor_key, email = _make_ca_with_sponsor()
300+
resp = self._register(ca, sponsor_key, email)
301+
assert resp.ca_certificate.startswith("-----BEGIN CERTIFICATE-----")
302+
# Should parse as valid PEM
303+
x509.load_pem_x509_certificate(resp.ca_certificate.encode())
304+
305+
def test_svid_certificate_signed_by_ca(self):
306+
ca, sponsor_key, email = _make_ca_with_sponsor()
307+
resp = self._register(ca, sponsor_key, email)
308+
svid_cert = x509.load_der_x509_certificate(resp.svid_certificate)
309+
# The CA public key should verify the SVID cert's signature
310+
ca.ca_public_key.verify(
311+
svid_cert.signature,
312+
svid_cert.tbs_certificate_bytes,
313+
)
314+
315+
def test_svid_certificate_not_ca(self):
316+
ca, sponsor_key, email = _make_ca_with_sponsor()
317+
resp = self._register(ca, sponsor_key, email)
318+
cert = x509.load_der_x509_certificate(resp.svid_certificate)
319+
bc = cert.extensions.get_extension_for_class(x509.BasicConstraints)
320+
assert bc.value.ca is False
321+
322+
def test_initial_trust_score_is_500(self):
323+
ca, sponsor_key, email = _make_ca_with_sponsor()
324+
resp = self._register(ca, sponsor_key, email)
325+
assert resp.initial_trust_score == 500
326+
327+
def test_trust_dimensions_present(self):
328+
ca, sponsor_key, email = _make_ca_with_sponsor()
329+
resp = self._register(ca, sponsor_key, email)
330+
assert "policy_compliance" in resp.trust_dimensions
331+
assert "security_posture" in resp.trust_dimensions
332+
assert len(resp.trust_dimensions) == 5
333+
334+
def test_tokens_have_correct_format(self):
335+
ca, sponsor_key, email = _make_ca_with_sponsor()
336+
resp = self._register(ca, sponsor_key, email)
337+
assert resp.access_token.startswith("agentmesh_access_")
338+
assert resp.refresh_token.startswith("agentmesh_refresh_")
339+
340+
def test_token_ttl_matches_config(self):
341+
ca, sponsor_key, email = _make_ca_with_sponsor()
342+
resp = self._register(ca, sponsor_key, email)
343+
assert resp.token_ttl_seconds == 15 * 60 # default 15 min
344+
345+
def test_custom_ttl(self):
346+
"""CA with custom TTL issues certs with matching duration."""
347+
sponsor_private, sponsor_public = _make_sponsor_keypair()
348+
reg = SponsorRegistry()
349+
reg.register_sponsor("s@corp.com", sponsor_public)
350+
ca = CertificateAuthority(default_ttl_minutes=5, sponsor_registry=reg)
351+
resp = self._register(ca, sponsor_private, "s@corp.com")
352+
assert resp.token_ttl_seconds == 5 * 60
353+
354+
def test_each_registration_gets_unique_did(self):
355+
ca, sponsor_key, email = _make_ca_with_sponsor()
356+
r1 = self._register(ca, sponsor_key, email, name="agent-1")
357+
r2 = self._register(ca, sponsor_key, email, name="agent-2")
358+
assert r1.agent_did != r2.agent_did
359+
360+
def test_each_registration_gets_unique_tokens(self):
361+
ca, sponsor_key, email = _make_ca_with_sponsor()
362+
r1 = self._register(ca, sponsor_key, email, name="agent-1")
363+
r2 = self._register(ca, sponsor_key, email, name="agent-2")
364+
assert r1.access_token != r2.access_token
365+
assert r1.refresh_token != r2.refresh_token
366+
367+
def test_svid_expiry_in_future(self):
368+
ca, sponsor_key, email = _make_ca_with_sponsor()
369+
resp = self._register(ca, sponsor_key, email)
370+
assert resp.svid_expires_at > datetime.now(timezone.utc)
371+
372+
def test_registration_with_capabilities(self):
373+
ca, sponsor_key, email = _make_ca_with_sponsor()
374+
resp = self._register(ca, sponsor_key, email, caps=["read:data", "write:logs"])
375+
assert resp.status == "success"
376+
377+
def test_registration_with_organization(self):
378+
ca, sponsor_key, email = _make_ca_with_sponsor()
379+
sig = _sign_registration(sponsor_key, "org-agent", email)
380+
request = RegistrationRequest(
381+
agent_name="org-agent",
382+
public_key=_make_agent_public_key(),
383+
sponsor_email=email,
384+
sponsor_signature=sig,
385+
organization="Contoso",
386+
organization_id="contoso-123",
387+
)
388+
resp = ca.register_agent(request)
389+
assert resp.status == "success"
390+
391+
392+
# ---------------------------------------------------------------------------
393+
# CA initialization tests
394+
# ---------------------------------------------------------------------------
395+
396+
397+
class TestCertificateAuthorityInit:
398+
"""Tests for CA construction and self-signed cert."""
399+
400+
def test_auto_generates_keypair(self):
401+
ca = CertificateAuthority()
402+
assert ca.ca_private_key is not None
403+
assert ca.ca_public_key is not None
404+
405+
def test_auto_generates_self_signed_cert(self):
406+
ca = CertificateAuthority()
407+
cert = ca.ca_certificate
408+
# Self-signed: subject == issuer
409+
assert cert.subject == cert.issuer
410+
cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)[0]
411+
assert cn.value == "AgentMesh CA"
412+
413+
def test_ca_cert_is_ca(self):
414+
ca = CertificateAuthority()
415+
bc = ca.ca_certificate.extensions.get_extension_for_class(
416+
x509.BasicConstraints
417+
)
418+
assert bc.value.ca is True
419+
420+
def test_custom_keypair_accepted(self):
421+
key = ed25519.Ed25519PrivateKey.generate()
422+
ca = CertificateAuthority(ca_private_key=key)
423+
assert ca.ca_private_key is key
424+
425+
def test_default_sponsor_registry_is_empty(self):
426+
ca = CertificateAuthority()
427+
assert not ca.sponsor_registry.is_registered("anyone@example.com")
428+
429+
def test_issued_tokens_start_empty(self):
430+
ca = CertificateAuthority()
431+
assert len(ca._issued_refresh_tokens) == 0
432+
433+
434+
# ---------------------------------------------------------------------------
435+
# Token expiration edge cases
436+
# ---------------------------------------------------------------------------
437+
438+
439+
class TestRefreshTokenExpiration:
440+
"""Edge cases for refresh token timing."""
441+
442+
def _register_agent(self, ca, sponsor_key, email):
443+
sig = _sign_registration(sponsor_key, "test-agent", email)
444+
request = RegistrationRequest(
445+
agent_name="test-agent",
446+
public_key=_make_agent_public_key(),
447+
sponsor_email=email,
448+
sponsor_signature=sig,
449+
)
450+
return ca.register_agent(request)
451+
452+
def test_expired_token_rejected(self):
453+
"""Manually expire a token and verify it's rejected."""
454+
ca, sponsor_key, email = _make_ca_with_sponsor()
455+
reg = self._register_agent(ca, sponsor_key, email)
456+
457+
# Find the token hash and force-expire it
458+
import hashlib
459+
token_hash = hashlib.sha256(reg.refresh_token.encode()).hexdigest()
460+
stored_did, _ = ca._issued_refresh_tokens[token_hash]
461+
ca._issued_refresh_tokens[token_hash] = (
462+
stored_did,
463+
datetime.now(timezone.utc) - timedelta(hours=1),
464+
)
465+
466+
new_key = _make_agent_public_key()
467+
with pytest.raises(ValueError, match="Invalid or expired refresh token"):
468+
ca.rotate_credentials(reg.agent_did, reg.refresh_token, new_key)
469+
470+
def test_multiple_agents_independent_tokens(self):
471+
"""Each agent's tokens are independent."""
472+
ca, sponsor_key, email = _make_ca_with_sponsor()
473+
r1 = self._register_agent(ca, sponsor_key, email)
474+
r2 = self._register_agent(ca, sponsor_key, email)
475+
476+
# Rotate agent 1's creds
477+
new_key = _make_agent_public_key()
478+
ca.rotate_credentials(r1.agent_did, r1.refresh_token, new_key)
479+
480+
# Agent 2's token should still work
481+
new_key2 = _make_agent_public_key()
482+
rotated = ca.rotate_credentials(r2.agent_did, r2.refresh_token, new_key2)
483+
assert rotated.status == "success"

0 commit comments

Comments
 (0)