AutoTLS Integration Action Plan for py-libp2p #1006
Replies: 2 comments
-
AutoTLS Improvements Based on go-libp2p ImplementationThis document analyzes the go-libp2p AutoTLS implementation and suggests improvements for the Python implementation based on their approach. Analysis of go-libp2p AutoTLS ImplementationKey Findings1. Certificate Storage Architecturego-libp2p Approach:
Storage Structure (certmagic): Key Characteristics:
2. Identity Key Managementgo-libp2p Approach: const identityKeyFile = "identity.key"
// Load or generate identity
privKey, err := LoadIdentity(identityKeyFile)
// LoadIdentity checks if file exists, loads it, or generates new one
func LoadIdentity(keyPath string) (crypto.PrivKey, error) {
if _, err := os.Stat(keyPath); err == nil {
return ReadIdentity(keyPath)
} else if os.IsNotExist(err) {
return GenerateIdentity(keyPath)
}
return nil, err
}
// File permissions: 0400 (read-only for owner)
err = os.WriteFile(path, bytes, 0400)Key Characteristics:
3. Certificate Manager Lifecyclego-libp2p Approach: certManager, err := p2pforge.NewP2PForgeCertMgr(
p2pforge.WithCAEndpoint(p2pforge.DefaultCATestEndpoint),
p2pforge.WithCertificateStorage(&certmagic.FileStorage{Path: "p2p-forge-certs"}),
p2pforge.WithLogger(rawLogger.Sugar().Named("autotls")),
p2pforge.WithUserAgent(userAgent),
p2pforge.WithRegistrationDelay(10*time.Second),
p2pforge.WithOnCertLoaded(func() {
certLoaded <- true
}),
)
certManager.Start()
defer certManager.Stop()
// Provide host to cert manager - triggers AutoTLS flow
certManager.ProvideHost(h)Key Characteristics:
4. Network Reachability Integrationgo-libp2p Approach:
Key Characteristics:
5. Certificate Usage Patterngo-libp2p Approach:
Suggested Improvements for Python Implementation1. Structured Certificate StorageCurrent Python Implementation:
Suggested Improvement: # Proposed structure
AUTOTLS_STORAGE_DIR = Path("~/.libp2p/autotls") or Path("./autotls-certs")
AUTOTLS_STORAGE_DIR/
├── certificates/
│ └── {b36_peerid}/
│ ├── cert.pem
│ └── key.pem
├── acme/
│ ├── account.json # ACME account info
│ └── orders/ # Order state
└── metadata.json # Certificate metadata (expiry, domain, etc.)Benefits:
Implementation Notes:
2. Certificate Manager ClassCurrent Python Implementation:
Suggested Improvement: class AutoTLSCertManager:
"""Manages AutoTLS certificate lifecycle, similar to go-libp2p's certManager."""
def __init__(
self,
storage_path: Path | None = None,
ca_endpoint: str = ACME_STAGING_ENDPOINT,
on_cert_loaded: Callable[[], None] | None = None,
):
self.storage_path = storage_path or self._default_storage_path()
self.ca_endpoint = ca_endpoint
self.on_cert_loaded = on_cert_loaded
self._cert_cache: dict[str, tuple[Path, Path]] = {} # b36_peerid -> (cert_path, key_path)
def start(self) -> None:
"""Initialize certificate manager and load existing certificates."""
self.storage_path.mkdir(parents=True, exist_ok=True)
self._load_existing_certificates()
def stop(self) -> None:
"""Clean shutdown of certificate manager."""
# Cleanup if needed
pass
def get_certificate_paths(self, peer_id: ID) -> tuple[Path, Path] | None:
"""Get certificate and key paths for a peer ID."""
b36_peerid = compute_b36_peer_id(peer_id)
return self._cert_cache.get(b36_peerid)
async def ensure_certificate(
self,
peer_id: ID,
host: BasicHost,
public_ip: str,
port: int,
) -> tuple[Path, Path]:
"""Ensure certificate exists, fetching if necessary."""
b36_peerid = compute_b36_peer_id(peer_id)
# Check cache first
if paths := self.get_certificate_paths(peer_id):
if self._is_certificate_valid(paths[0]):
return paths
# Fetch new certificate
return await self._fetch_certificate(peer_id, host, public_ip, port)
def _default_storage_path(self) -> Path:
"""Get default storage path (home directory or current dir)."""
home = Path.home()
if home.exists():
return home / ".libp2p" / "autotls"
return Path("autotls-certs")Benefits:
3. Certificate Metadata StorageCurrent Python Implementation:
Suggested Improvement: @dataclass
class CertificateMetadata:
"""Metadata for an AutoTLS certificate."""
b36_peerid: str
domain: str
cert_path: Path
key_path: Path
issued_at: datetime
expires_at: datetime
issuer: str # "Let's Encrypt" or "Let's Encrypt (Staging)"
serial_number: str | None = None
def is_valid(self) -> bool:
"""Check if certificate is still valid."""
now = datetime.now(timezone.utc)
return self.issued_at <= now < self.expires_at
def is_expiring_soon(self, threshold_hours: int = 24) -> bool:
"""Check if certificate expires within threshold."""
now = datetime.now(timezone.utc)
return (self.expires_at - now).total_seconds() < (threshold_hours * 3600)
class AutoTLSCertManager:
def _load_metadata(self) -> dict[str, CertificateMetadata]:
"""Load certificate metadata from storage."""
metadata_file = self.storage_path / "metadata.json"
if not metadata_file.exists():
return {}
# Load and deserialize metadata
# ...
def _save_metadata(self, metadata: CertificateMetadata) -> None:
"""Save certificate metadata to storage."""
# Update metadata.json
# ...Benefits:
4. Automatic Certificate RenewalCurrent Python Implementation:
Suggested Improvement: class AutoTLSCertManager:
async def _renewal_worker(self) -> None:
"""Background task to check and renew expiring certificates."""
while self._running:
await trio.sleep(3600) # Check every hour
metadata = self._load_metadata()
now = datetime.now(timezone.utc)
for b36_peerid, cert_meta in metadata.items():
if cert_meta.is_expiring_soon(threshold_hours=24 * 7): # 7 days
logger.info(f"Certificate for {b36_peerid} expiring soon, renewing...")
# Trigger renewal
# ...
def start(self) -> None:
"""Initialize and start background tasks."""
self._running = True
self._load_existing_certificates()
# Start renewal worker in background
# ...Benefits:
5. Network Reachability IntegrationCurrent Python Implementation:
Suggested Improvement: class AutoTLSCertManager:
def __init__(
self,
...
require_public_reachability: bool = True,
):
self.require_public_reachability = require_public_reachability
self._reachability_listener: Callable[[bool], None] | None = None
def set_reachability_listener(
self,
listener: Callable[[bool], None]
) -> None:
"""Set callback for reachability changes."""
self._reachability_listener = listener
async def ensure_certificate(
self,
peer_id: ID,
host: BasicHost,
public_ip: str,
port: int,
) -> tuple[Path, Path] | None:
"""Ensure certificate exists, but only if publicly reachable."""
if self.require_public_reachability:
if not await self._check_public_reachability(host):
logger.warning(
"Node not publicly reachable, skipping AutoTLS certificate request"
)
return None
# Proceed with certificate fetch
# ...
async def _check_public_reachability(self, host: BasicHost) -> bool:
"""Check if host is publicly reachable via AutoNAT or Identify."""
# Use AutoNAT to determine reachability
# Or check observed addresses from Identify protocol
# ...Benefits:
6. Improved Certificate LoadingCurrent Python Implementation:
Suggested Improvement: class AutoTLSCertManager:
def _load_existing_certificates(self) -> None:
"""Load and validate existing certificates from storage."""
metadata = self._load_metadata()
for b36_peerid, cert_meta in metadata.items():
# Validate certificate file exists and is readable
if not cert_meta.cert_path.exists():
logger.warning(f"Certificate file missing for {b36_peerid}")
continue
if not cert_meta.key_path.exists():
logger.warning(f"Key file missing for {b36_peerid}")
continue
# Validate certificate is still valid
if not cert_meta.is_valid():
logger.info(f"Certificate expired for {b36_peerid}, will renew")
continue
# Verify certificate matches peer ID
if not self._verify_certificate_peer_id(cert_meta.cert_path, b36_peerid):
logger.warning(f"Certificate peer ID mismatch for {b36_peerid}")
continue
# Cache valid certificate
self._cert_cache[b36_peerid] = (cert_meta.cert_path, cert_meta.key_path)
logger.info(f"Loaded valid certificate for {b36_peerid}")
def _verify_certificate_peer_id(
self,
cert_path: Path,
expected_b36_peerid: str
) -> bool:
"""Verify certificate DNS name matches expected peer ID."""
# Load certificate
# Extract DNS names from SAN extension
# Check if domain matches *.{b36_peerid}.libp2p.direct
# ...Benefits:
7. Configuration ManagementCurrent Python Implementation:
Suggested Improvement: @dataclass
class AutoTLSConfig:
"""Configuration for AutoTLS certificate manager."""
storage_path: Path
ca_endpoint: str = ACME_STAGING_ENDPOINT
user_agent: str = "py-libp2p/autotls"
registration_delay: float = 10.0 # seconds
require_public_reachability: bool = True
renewal_threshold_hours: int = 24 * 7 # 7 days
enable_auto_renewal: bool = True
@classmethod
def from_file(cls, config_path: Path) -> "AutoTLSConfig":
"""Load configuration from JSON file."""
# ...
def to_file(self, config_path: Path) -> None:
"""Save configuration to JSON file."""
# ...
@classmethod
def default(cls) -> "AutoTLSConfig":
"""Get default configuration."""
return cls(
storage_path=Path.home() / ".libp2p" / "autotls",
ca_endpoint=ACME_STAGING_ENDPOINT,
)Benefits:
8. Integration with BasicHostCurrent Python Implementation:
Suggested Improvement: class BasicHost:
def __init__(
self,
...
autotls_cert_manager: AutoTLSCertManager | None = None,
):
self._autotls_cert_manager = autotls_cert_manager
if autotls_cert_manager:
autotls_cert_manager.start()
async def initiate_autotls_procedure(self) -> None:
"""Initiate AutoTLS procedure using certificate manager."""
if not self._autotls_cert_manager:
logger.warning("AutoTLS certificate manager not configured")
return
# Check if certificate already exists
cert_paths = self._autotls_cert_manager.get_certificate_paths(self.get_id())
if cert_paths and self._autotls_cert_manager._is_certificate_valid(cert_paths[0]):
logger.info("AutoTLS certificate already exists and is valid")
return
# Extract public IP
public_ip, port = self._extract_public_ip_and_port()
if not public_ip:
raise RuntimeError("No public IP address found")
# Fetch certificate
cert_path, key_path = await self._autotls_cert_manager.ensure_certificate(
peer_id=self.get_id(),
host=self,
public_ip=public_ip,
port=port,
)
# Update paths for TLS transport
libp2p.utils.paths.AUTOTLS_CERT_PATH = cert_path
libp2p.utils.paths.AUTOTLS_KEY_PATH = key_pathBenefits:
Summary of Suggested ChangesHigh Priority
Medium Priority
Low Priority (Nice to Have)
Migration PathPhase 1: Add Certificate Manager (Non-Breaking)
Phase 2: Integrate Certificate Manager
Phase 3: Add Advanced Features
Benefits of These Improvements
Comparison Table
These improvements would bring the Python implementation much closer to the production-grade approach used in go-libp2p while maintaining compatibility with the current working implementation. |
Beta Was this translation helpful? Give feedback.
-
Thanks for identifying this critical issue! The placeholder peer ID problem is indeed breaking correctness - ping messages going to the Your Ed25519 approach was conceptually perfect, but the ACME limitation is unfortunate. Since we're stuck with RSA for AutoTLS certificates, I think we should implement the b36_peerid extraction from certificate DNS name approach you originally suggested. How go-libp2p Handles ThisLooking at go-libp2p's implementation:
Proposed Solution for PythonSince Python's SSL doesn't request client certificates without a trusted CA root, and AutoTLS certs don't have libp2p extensions, we should:
For the server side (inbound), we could also explore:
However, even with client certs, AutoTLS certificates won't have libp2p extensions, so we'd still need the DNS extraction approach. This would:
Should I start implementing the b36_peerid extraction approach, or do you want to discuss the trade-offs on the call first? |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
AutoTLS Integration Action Plan for py-libp2p
Related Issue: #555
Hi everyone
This is a proposed action plan for adding AutoTLS support to
py-libp2pto make TLS configuration seamless, automated, and aligned with what’s already available in go-libp2p and js-libp2p.The goal is to introduce a modular AutoTLS layer that supports both ephemeral self-signed certificates (for quick dev and test setups) and ACME/Let’s Encrypt certificates for production environments.
Proposed File & Module Structure
Core Functional Components
1. AutoTLS Manager (
manager.py)2. ACME Client (
acme_client.py)3. Cache Layer (
cache.py)4. Configuration (
config.py)Centralizes all environment variables and defaults.
Allows users to opt in to ACME or stay with ephemeral mode.
Supports flags like:
5. Transport Integration (
transport.py)Testing Strategy
Unit Tests
Integration Tests
Edge Cases
Summary of the Plan
libp2p/security/tls/layer.This plan is designed to deliver a complete and well-structured AutoTLS feature set while staying fully aligned with libp2p’s security design principles and the approaches used in other language implementations.
cc @seetadev @acul71 @pacrob @yashksaini-coder (collaborator)
Beta Was this translation helpful? Give feedback.
All reactions