Replies: 4 comments 2 replies
-
@seetadev |
Beta Was this translation helpful? Give feedback.
-
@bomanaps PR #876 Review: Enhanced Swarm with Retry Logic and Multi-Connection SupportPR URL: #876 Executive SummaryThis PR enhances the libp2p Swarm networking component with retry logic (exponential backoff) and multi-connection support (connection pooling + load balancing). Addresses issue #874 while maintaining full backward compatibility. Recommendation: ✅ APPROVE Key Features✅ Retry Logic with Exponential Backoff
✅ Multi-Connection Support
✅ Backward Compatibility
Technical ImplementationConfiguration Classes@dataclass
class RetryConfig:
max_retries: int = 3
initial_delay: float = 0.1
max_delay: float = 30.0
backoff_multiplier: float = 2.0
jitter_factor: float = 0.1
@dataclass
class ConnectionConfig:
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
enable_connection_pool: bool = True
load_balancing_strategy: str = "round_robin" Connection Poolclass ConnectionPool:
def __init__(self, max_connections_per_peer: int = 3):
self.peer_connections: dict[ID, list[ConnectionInfo]] = {}
self._round_robin_index: dict[ID, int] = {}
def get_connection(self, peer_id: ID, strategy: str = "round_robin") -> INetConn | None:
# Implements round-robin and least-loaded strategies Code Quality Assessment✅ Strengths
|
Resource | Limit | Enforcement | Risk |
---|---|---|---|
Per-Peer Connections | ✅ Configurable (3) | ✅ Automatic trimming | Low |
Retry Attempts | ✅ Configurable (3) | ✅ Hard limit | Low |
Total Connections | ❌ None | ❌ None | Medium |
Memory Usage | ❌ None | ❌ None | Medium |
Testing Results
- ✅ All tests pass: 43/43 network tests (including 13 new)
- ✅ Type checking: MyPy validation passes
- ✅ Example runs: All features work as expected
- ✅ Backward compatibility: Original functionality preserved
Usage Examples
Basic Usage
from libp2p import new_swarm
swarm = new_swarm() # Uses default configuration
Custom Configuration
from libp2p.network.swarm import RetryConfig, ConnectionConfig
retry_config = RetryConfig(max_retries=5, initial_delay=0.05)
connection_config = ConnectionConfig(
max_connections_per_peer=5,
load_balancing_strategy="least_loaded"
)
swarm = new_swarm(retry_config=retry_config, connection_config=connection_config)
Backward Compatibility
connection_config = ConnectionConfig(enable_connection_pool=False)
swarm = new_swarm(connection_config=connection_config) # Original behavior
Recommendations for Future Iterations
1. Add Configuration Validation
Problem: The current implementation accepts invalid configuration values without validation, which can lead to runtime errors or unexpected behavior.
Solution: Add comprehensive validation using __post_init__
method in dataclasses.
from dataclasses import dataclass
from typing import Literal
import math
@dataclass
class RetryConfig:
max_retries: int = 3
base_delay: float = 1.0
max_delay: float = 30.0
jitter: float = 0.1
def __post_init__(self):
# Validate retry count
if self.max_retries < 0:
raise ValueError("max_retries must be >= 0")
if self.max_retries > 100:
raise ValueError("max_retries cannot exceed 100")
# Validate delay parameters
if self.base_delay <= 0:
raise ValueError("base_delay must be > 0")
if self.max_delay <= 0:
raise ValueError("max_delay must be > 0")
if self.base_delay > self.max_delay:
raise ValueError("base_delay cannot exceed max_delay")
# Validate jitter
if not 0 <= self.jitter <= 1:
raise ValueError("jitter must be between 0 and 1")
@dataclass
class ConnectionConfig:
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
load_balancing_strategy: Literal["round_robin", "least_loaded"] = "round_robin"
def __post_init__(self):
# Validate connection limits
if self.max_connections_per_peer < 1:
raise ValueError("max_connections_per_peer must be >= 1")
if self.max_connections_per_peer > 10:
raise ValueError("max_connections_per_peer cannot exceed 10")
# Validate timeout
if self.connection_timeout <= 0:
raise ValueError("connection_timeout must be > 0")
if self.connection_timeout > 300: # 5 minutes max
raise ValueError("connection_timeout cannot exceed 300 seconds")
# Validate strategy
if self.load_balancing_strategy not in ["round_robin", "least_loaded"]:
raise ValueError("load_balancing_strategy must be 'round_robin' or 'least_loaded'")
2. Improve Connection Cleanup
Problem: Current cleanup uses spawn_system_task
which doesn't provide proper error handling or coordination.
Solution: Use a nursery for coordinated cleanup with proper error handling and timeouts.
import trio
from typing import List
from contextlib import asynccontextmanager
class ConnectionPool:
async def _cleanup_connections(self, connections_to_remove: List[ConnectionInfo]) -> None:
"""Clean up multiple connections with proper coordination and error handling."""
@asynccontextmanager
async def cleanup_nursery():
"""Context manager for coordinated connection cleanup."""
async with trio.open_nursery() as nursery:
yield nursery
# Nursery will wait for all cleanup tasks to complete
async with cleanup_nursery() as nursery:
# Start cleanup tasks with individual error handling
for conn_info in connections_to_remove:
nursery.start_soon(
self._close_connection_with_timeout,
conn_info.connection
)
async def _close_connection_with_timeout(self, connection: INetConn) -> None:
"""Close a single connection with timeout and error handling."""
try:
# Use a timeout to prevent hanging
with trio.move_on_after(5.0): # 5 second timeout
await connection.close()
except Exception as e:
# Log error but don't propagate to avoid affecting other cleanup tasks
self.logger.warning(f"Failed to close connection: {e}")
async def trim_connections(self, peer_id: ID) -> None:
"""Enhanced connection trimming with proper cleanup."""
if peer_id not in self.peer_connections:
return
connections = self.peer_connections[peer_id]
if len(connections) <= self.max_connections_per_peer:
return
# Sort by creation time (oldest first) for LRU behavior
connections.sort(key=lambda c: c.created_at)
# Determine which connections to remove
excess_count = len(connections) - self.max_connections_per_peer
connections_to_remove = connections[:excess_count]
# Remove from tracking immediately
self.peer_connections[peer_id] = connections[excess_count:]
# Clean up connections asynchronously
await self._cleanup_connections(connections_to_remove)
3. Add Global Resource Limits
Problem: The current implementation has no global limits, potentially allowing unlimited memory usage.
Solution: Add comprehensive resource management with global limits and monitoring.
from dataclasses import dataclass, field
from typing import Dict, Set
import time
@dataclass
class GlobalResourceLimits:
max_total_connections: int = 1000
max_peers_with_connections: int = 500
max_memory_usage_mb: int = 512
connection_cleanup_interval: float = 60.0 # seconds
@dataclass
class ConnectionPool:
# ... existing fields ...
# Global resource management
global_limits: GlobalResourceLimits = field(default_factory=GlobalResourceLimits)
_total_connections: int = 0
_peers_with_connections: Set[ID] = field(default_factory=set)
_last_cleanup_time: float = field(default_factory=time.time)
def _check_global_limits(self, peer_id: ID) -> bool:
"""Check if adding a connection would exceed global limits."""
# Check total connections limit
if self._total_connections >= self.global_limits.max_total_connections:
return False
# Check peer count limit (only if this is a new peer)
if (peer_id not in self._peers_with_connections and
len(self._peers_with_connections) >= self.global_limits.max_peers_with_connections):
return False
return True
def _update_global_counts(self, peer_id: ID, delta: int) -> None:
"""Update global connection and peer counts."""
self._total_connections += delta
if peer_id in self.peer_connections:
if len(self.peer_connections[peer_id]) == 0:
self._peers_with_connections.discard(peer_id)
else:
self._peers_with_connections.add(peer_id)
async def add_connection(self, peer_id: ID, connection: INetConn) -> bool:
"""Add connection with global limit checking."""
if not self._check_global_limits(peer_id):
await connection.close()
return False
# Add to peer-specific pool
success = await super().add_connection(peer_id, connection)
if success:
self._update_global_counts(peer_id, 1)
return success
async def remove_connection(self, peer_id: ID, connection: INetConn) -> bool:
"""Remove connection and update global counts."""
success = await super().remove_connection(peer_id, connection)
if success:
self._update_global_counts(peer_id, -1)
return success
async def periodic_cleanup(self) -> None:
"""Periodic cleanup to enforce global limits."""
current_time = time.time()
if current_time - self._last_cleanup_time < self.global_limits.connection_cleanup_interval:
return
self._last_cleanup_time = current_time
# Enforce global connection limit
if self._total_connections > self.global_limits.max_total_connections:
await self._enforce_global_connection_limit()
# Enforce peer count limit
if len(self._peers_with_connections) > self.global_limits.max_peers_with_connections:
await self._enforce_peer_count_limit()
async def _enforce_global_connection_limit(self) -> None:
"""Remove excess connections to meet global limit."""
excess = self._total_connections - self.global_limits.max_total_connections
# Collect all connections sorted by creation time (oldest first)
all_connections = []
for peer_id, connections in self.peer_connections.items():
for conn_info in connections:
all_connections.append((peer_id, conn_info))
all_connections.sort(key=lambda x: x[1].created_at)
# Remove oldest connections
connections_to_remove = all_connections[:excess]
for peer_id, conn_info in connections_to_remove:
await self.remove_connection(peer_id, conn_info.connection)
4. Add Automatic Stream Count Tracking
Problem: The least-loaded strategy relies on stream_count
but it's not automatically updated when streams are created/closed.
Solution: Integrate stream count tracking into the connection lifecycle.
from typing import Dict, WeakSet
import weakref
class ConnectionPool:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Track active streams per connection
self._connection_streams: Dict[INetConn, WeakSet] = {}
self._connection_refs: Dict[INetConn, weakref.ref] = {}
def _track_connection(self, connection: INetConn) -> None:
"""Start tracking a connection for stream counting."""
# Create weak reference to avoid memory leaks
conn_ref = weakref.ref(connection, self._on_connection_garbage_collected)
self._connection_refs[connection] = conn_ref
self._connection_streams[connection] = WeakSet()
def _on_connection_garbage_collected(self, conn_ref: weakref.ref) -> None:
"""Clean up when a connection is garbage collected."""
# Find the connection in our tracking
for conn, ref in list(self._connection_refs.items()):
if ref is conn_ref:
del self._connection_refs[conn]
if conn in self._connection_streams:
del self._connection_streams[conn]
break
def _get_stream_count(self, connection: INetConn) -> int:
"""Get the current stream count for a connection."""
if connection not in self._connection_streams:
return 0
return len(self._connection_streams[connection])
def _increment_stream_count(self, connection: INetConn, stream: INetStream) -> None:
"""Track a new stream on a connection."""
if connection not in self._connection_streams:
self._track_connection(connection)
self._connection_streams[connection].add(stream)
def _decrement_stream_count(self, connection: INetConn, stream: INetStream) -> None:
"""Remove stream tracking when it's closed."""
if connection in self._connection_streams:
self._connection_streams[connection].discard(stream)
def get_connection(self, peer_id: ID, strategy: str = "round_robin") -> INetConn:
"""Enhanced connection selection with accurate stream counting."""
if peer_id not in self.peer_connections:
return None
connections = self.peer_connections[peer_id]
if not connections:
return None
if strategy == "round_robin":
# Existing round-robin logic
conn_info = connections[self._round_robin_indices[peer_id]]
self._round_robin_indices[peer_id] = (self._round_robin_indices[peer_id] + 1) % len(connections)
return conn_info.connection
elif strategy == "least_loaded":
# Use actual stream count instead of unreliable attribute
conn_info = min(
connections,
key=lambda c: self._get_stream_count(c.connection)
)
return conn_info.connection
else:
raise ValueError(f"Unknown load balancing strategy: {strategy}")
class EnhancedSwarm:
async def new_stream(self, peer_id: ID) -> INetStream:
"""Create new stream with automatic stream count tracking."""
# Get connection using enhanced pool
connection = self.connection_pool.get_connection(
peer_id, self.connection_config.load_balancing_strategy
)
if not connection:
# Create new connection if needed
connection = await self._create_connection(peer_id)
# Create stream
stream = await connection.open_stream()
# Track stream for load balancing
self.connection_pool._increment_stream_count(connection, stream)
# Set up cleanup when stream closes
original_close = stream.close
async def tracked_close():
try:
await original_close()
finally:
self.connection_pool._decrement_stream_count(connection, stream)
stream.close = tracked_close
return stream
5. Add Comprehensive Monitoring and Metrics
Problem: No visibility into connection pool performance and resource usage.
Solution: Add metrics collection and monitoring capabilities.
from dataclasses import dataclass
from typing import Dict, Counter
import time
@dataclass
class ConnectionPoolMetrics:
total_connections: int = 0
total_peers: int = 0
connections_per_peer: Counter = None
load_balancing_decisions: Counter = None
connection_creation_time: float = 0.0
connection_cleanup_time: float = 0.0
def __post_init__(self):
if self.connections_per_peer is None:
self.connections_per_peer = Counter()
if self.load_balancing_decisions is None:
self.load_balancing_decisions = Counter()
class ConnectionPool:
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.metrics = ConnectionPoolMetrics()
def get_metrics(self) -> Dict:
"""Get current pool metrics."""
return {
"total_connections": self._total_connections,
"total_peers": len(self._peers_with_connections),
"connections_per_peer": dict(self.metrics.connections_per_peer),
"load_balancing_decisions": dict(self.metrics.load_balancing_decisions),
"avg_connections_per_peer": (
self._total_connections / len(self._peers_with_connections)
if self._peers_with_connections else 0
),
"memory_usage_estimate_mb": self._estimate_memory_usage(),
}
def _estimate_memory_usage(self) -> float:
"""Estimate memory usage in MB."""
# Rough estimate: 1KB per connection + overhead
base_usage = self._total_connections * 1024 # bytes
overhead = len(self._peers_with_connections) * 512 # bytes
return (base_usage + overhead) / (1024 * 1024) # Convert to MB
Risk Assessment
Low Risk
- Backward compatibility, opt-in features, comprehensive testing, per-peer limits
Medium Risk
- Performance overhead, memory usage, missing global resource limits
Mitigation
- Monitor performance in production, implement global limits in future iterations
Conclusion
High-quality PR that significantly improves libp2p networking capabilities. Well-designed, thoroughly tested, and maintains backward compatibility. Ready for merge with minor improvements to be addressed in future iterations.
Key Benefits: Improved reliability, better performance, enhanced fault tolerance, easy migration path.
Recommendation: ✅ APPROVE
Reviewer: AI Assistant & acul71
Status: Approved with minor recommendations
Beta Was this translation helpful? Give feedback.
-
Analysis: 1:1 Mapping Issue in Python libp2p PR #876Executive SummaryThe Python libp2p PR #876 implements multiple connections per peer support, but maintains a 1:1 mapping in the core connections dictionary while hiding multiple connections in a separate connection pool. This architectural approach is inconsistent with the Go and JavaScript reference implementations, which directly support multiple connections per peer in their core APIs. Current Python PR Architecture❌ Problem: 1:1 Mapping in Core APIclass Swarm(Service, INetworkService):
# ❌ Still maintains 1:1 mapping for backward compatibility
connections: dict[ID, INetConn] # Single connection per peer
# ✅ Multiple connections hidden in separate pool
connection_pool: ConnectionPool | None
async def dial_peer(self, peer_id: ID) -> INetConn:
# ❌ Returns single connection
return self.connections[peer_id]
async def new_stream(self, peer_id: ID) -> INetStream:
# ❌ Uses connection pool internally, but API is inconsistent
connection = self.connection_pool.get_connection(peer_id)
return await connection.new_stream() 🔍 Issues with Current Approach
Reference Implementations Analysis✅ Go Implementation (Correct)type Network interface {
// ✅ Returns multiple connections per peer
ConnsToPeer(p peer.ID) []Conn
// ✅ Direct access to connection arrays
DialPeer(ctx context.Context, p peer.ID) (Conn, error)
}
// Usage
conns := network.ConnsToPeer(peerID) // Get all connections
conn := selectBestConnection(conns) // Load balancing at API level ✅ JavaScript Implementation (Correct)class DefaultConnectionManager implements ConnectionManager {
// ✅ Core data structure supports multiple connections per peer
private readonly connections: PeerMap<Connection[]>
// ✅ Direct API access to multiple connections
getConnections(peerId?: PeerId): Connection[] {
if (peerId != null) {
return this.connections.get(peerId) ?? [] // Returns array
}
// Return all connections from all peers
let conns: Connection[] = []
for (const c of this.connections.values()) {
conns = conns.concat(c)
}
return conns
}
// ✅ Get entire connections map
getConnectionsMap(): PeerMap<Connection[]> {
return this.connections
}
} libp2p Specifications Compliance✅ Official Support for Multiple ConnectionsFrom libp2p is pragmatic. It seeks to be usable in as many settings as possible, to be modular and flexible to fit various use cases, and to force as few choices as possible. Thus the libp2p network layer provides what we're loosely referring to as "multi-multiplexing":
- can multiplex multiple listen network interfaces
- can multiplex multiple transport protocols
- **can multiplex multiple connections per peer** ✅ OFFICIAL SPEC
- can multiplex multiple client protocols
- can multiplex multiple streams per protocol, per connection 📋 What Specs Don't DefineThe specifications do NOT define:
Architectural Comparison
Required Architectural Changes🔧 1. Update Core Data Structureclass Swarm(Service, INetworkService):
# ✅ Change from 1:1 to 1:many mapping
connections: dict[ID, list[INetConn]] # Multiple connections per peer
# ❌ Remove connection pool (integrate into main API)
# connection_pool: ConnectionPool | None 🔧 2. Update API Methodsclass Swarm(Service, INetworkService):
def get_connections(self, peer_id: ID | None = None) -> list[INetConn]:
"""Get connections for peer (like JS getConnections, Go ConnsToPeer)."""
if peer_id is not None:
return self.connections.get(peer_id, [])
# Return all connections
all_conns = []
for conns in self.connections.values():
all_conns.extend(conns)
return all_conns
def get_connections_map(self) -> dict[ID, list[INetConn]]:
"""Get all connections map (like JS getConnectionsMap)."""
return self.connections
async def dial_peer(self, peer_id: ID) -> list[INetConn]:
"""Return all connections to peer (like Go/JS)."""
# Implementation with load balancing
connections = self.get_connections(peer_id)
if not connections:
# Dial if no connections exist
connections = await self._dial_new_connections(peer_id)
return connections
async def close_peer(self, peer_id: ID) -> None:
"""Close all connections to peer."""
connections = self.get_connections(peer_id)
for conn in connections:
await conn.close()
self.connections.pop(peer_id, None) 🔧 3. Integrate Load Balancing at API Levelclass Swarm(Service, INetworkService):
async def new_stream(self, peer_id: ID) -> INetStream:
"""Create stream with load balancing at API level."""
connections = self.get_connections(peer_id)
if not connections:
# Dial if no connections exist
connections = await self.dial_peer(peer_id)
# Load balancing strategy at interface level
connection = self._select_connection(connections, self.load_balancing_strategy)
return await connection.new_stream()
def _select_connection(self, connections: list[INetConn], strategy: str) -> INetConn:
"""Select connection based on load balancing strategy."""
if strategy == "round_robin":
return self._round_robin_select(connections)
elif strategy == "least_loaded":
return self._least_loaded_select(connections)
else:
return connections[0] # Default 🔧 4. Maintain Backward Compatibilityclass Swarm(Service, INetworkService):
def get_connection(self, peer_id: ID) -> INetConn | None:
"""Get single connection for backward compatibility."""
conns = self.get_connections(peer_id)
return conns[0] if conns else None
# Legacy property for backward compatibility
@property
def connections_legacy(self) -> dict[ID, INetConn]:
"""Legacy 1:1 mapping for backward compatibility."""
legacy_conns = {}
for peer_id, conns in self.connections.items():
if conns:
legacy_conns[peer_id] = conns[0]
return legacy_conns Benefits of Architectural Changes✅ 1. API Consistency with Reference Implementations
✅ 2. Direct Access to Multiple Connections# ✅ Direct access (new)
connections = swarm.get_connections(peer_id)
for conn in connections:
# Work with each connection
# ❌ Hidden access (current)
connection = swarm.connection_pool.get_connection(peer_id) # Only one ✅ 3. Load Balancing at API Level# ✅ API-level load balancing (new)
stream = await swarm.new_stream(peer_id) # Automatically selects best connection
# ❌ Pool-level load balancing (current)
connection = swarm.connection_pool.get_connection(peer_id) # Hidden logic
stream = await connection.new_stream() ✅ 4. Better Resource Management# ✅ Clear resource management (new)
all_connections = swarm.get_connections() # All connections
peer_connections = swarm.get_connections(peer_id) # Peer connections
# ❌ Confusing resource management (current)
# Who manages what? Main dict vs pool? Implementation StrategyPhase 1: Core Data Structure Update
Phase 2: API Method Updates
Phase 3: Connection Pool Integration
Phase 4: Backward Compatibility
ConclusionThe current Python PR #876 implements multiple connections per peer support but uses an architecturally inconsistent approach compared to the Go and JavaScript reference implementations. Key Issues:
Required Changes:
Benefits:
The PR should be restructured to follow the architectural patterns established by the Go and JavaScript implementations while preserving the innovative load balancing features. |
Beta Was this translation helpful? Give feedback.
-
Connection Health Monitoring Enhancements for Python libp2pExecutive SummaryWhile the current PR #876 successfully implements multiple connections per peer support, it lacks sophisticated connection health monitoring capabilities. This document outlines proposed enhancements to bring Python libp2p's connection health monitoring up to the same level as Go and JavaScript reference implementations. Current Status✅ What Exists Now
❌ What's Missing (Enhanced Features Needed)
Proposed Enhanced Features🔧 1. Connection Health Metrics Data Structure@dataclass
class ConnectionHealth:
"""Enhanced connection health tracking."""
# Basic metrics
established_at: float
last_used: float
last_ping: float
ping_latency: float
# Performance metrics
stream_count: int
total_bytes_sent: int
total_bytes_received: int
# Health indicators
failed_streams: int
ping_success_rate: float
health_score: float # 0.0 to 1.0
# Timestamps
last_successful_operation: float
last_failed_operation: float
# Connection quality metrics
average_stream_lifetime: float
connection_stability: float # Based on disconnection frequency
def update_health_score(self) -> None:
"""Calculate overall health score based on metrics."""
# Weighted scoring algorithm
latency_score = max(0, 1.0 - (self.ping_latency / 1000.0)) # Normalize to 1s
success_score = self.ping_success_rate
stability_score = self.connection_stability
self.health_score = (
latency_score * 0.4 +
success_score * 0.4 +
stability_score * 0.2
) 🔧 2. Enhanced Connection Configuration@dataclass
class ConnectionConfig:
"""Enhanced configuration for multi-connection support."""
# Existing settings
max_connections_per_peer: int = 3
connection_timeout: float = 30.0
load_balancing_strategy: str = "round_robin" # or "least_loaded", "health_based", "latency_based"
# New health monitoring settings
health_check_interval: float = 60.0 # seconds
ping_timeout: float = 5.0 # seconds
min_health_threshold: float = 0.3 # 0.0 to 1.0
min_connections_per_peer: int = 1
# Health scoring weights
latency_weight: float = 0.4
success_rate_weight: float = 0.4
stability_weight: float = 0.2
# Connection replacement thresholds
max_ping_latency: float = 1000.0 # milliseconds
min_ping_success_rate: float = 0.7 # 70%
max_failed_streams: int = 5 🔧 3. Proactive Health Monitoring Serviceclass ConnectionHealthMonitor(Service):
"""Service for monitoring connection health."""
def __init__(self, swarm: Swarm, config: ConnectionConfig):
self.swarm = swarm
self.config = config
self.health_data: dict[ID, dict[INetConn, ConnectionHealth]] = {}
self._monitoring_task: trio.Task | None = None
async def run(self) -> None:
"""Start health monitoring."""
self._monitoring_task = trio.lowlevel.spawn_system_task(
self._monitor_connections
)
await self.manager.wait_finished()
async def _monitor_connections(self) -> None:
"""Periodically monitor all connection health."""
while True:
try:
await trio.sleep(self.config.health_check_interval)
for peer_id, connections in list(self.swarm.connections.items()):
for conn in list(connections):
await self._check_connection_health(peer_id, conn)
except trio.Cancelled:
break
except Exception as e:
logger.error(f"Health monitoring error: {e}")
async def _check_connection_health(self, peer_id: ID, conn: INetConn) -> None:
"""Check individual connection health."""
try:
# Measure ping latency
start_time = trio.current_time()
ping_success = await self._ping_connection(conn)
latency = (trio.current_time() - start_time) * 1000 # Convert to ms
# Update health data
if peer_id not in self.health_data:
self.health_data[peer_id] = {}
if conn not in self.health_data[peer_id]:
self.health_data[peer_id][conn] = ConnectionHealth(
established_at=trio.current_time(),
last_used=trio.current_time(),
last_ping=trio.current_time(),
ping_latency=latency,
stream_count=len(conn.get_streams()),
total_bytes_sent=0, # TODO: Implement byte counting
total_bytes_received=0, # TODO: Implement byte counting
failed_streams=0, # TODO: Track from stream creation failures
ping_success_rate=1.0 if ping_success else 0.0,
health_score=1.0,
last_successful_operation=trio.current_time(),
last_failed_operation=0.0,
average_stream_lifetime=0.0,
connection_stability=1.0
)
else:
health = self.health_data[peer_id][conn]
health.last_ping = trio.current_time()
health.ping_latency = latency
health.stream_count = len(conn.get_streams())
# Update success rate (rolling average)
if ping_success:
health.ping_success_rate = min(1.0, health.ping_success_rate + 0.1)
else:
health.ping_success_rate = max(0.0, health.ping_success_rate - 0.2)
health.update_health_score()
# Check if connection needs replacement
if self._should_replace_connection(peer_id, conn):
await self._replace_unhealthy_connection(peer_id, conn)
except Exception as e:
logger.error(f"Error checking connection health: {e}")
async def _ping_connection(self, conn: INetConn) -> bool:
"""Ping a connection to check health."""
try:
# Use a simple ping protocol or reuse existing DHT ping
# For now, we'll use a simple stream creation test
stream = await conn.new_stream()
await stream.close()
return True
except Exception:
return False
def _should_replace_connection(self, peer_id: ID, conn: INetConn) -> bool:
"""Determine if connection should be replaced."""
if peer_id not in self.health_data or conn not in self.health_data[peer_id]:
return False
health = self.health_data[peer_id][conn]
return (
health.health_score < self.config.min_health_threshold or
health.ping_latency > self.config.max_ping_latency or
health.ping_success_rate < self.config.min_ping_success_rate or
health.failed_streams > self.config.max_failed_streams
)
async def _replace_unhealthy_connection(self, peer_id: ID, old_conn: INetConn) -> None:
"""Replace unhealthy connection with a new one."""
try:
logger.info(f"Replacing unhealthy connection for peer {peer_id}")
# Close unhealthy connection
await old_conn.close()
# Remove from swarm connections
self.swarm.connections[peer_id].remove(old_conn)
# Remove health data
if peer_id in self.health_data and old_conn in self.health_data[peer_id]:
del self.health_data[peer_id][old_conn]
# Dial new connection if needed
min_conns = self.config.min_connections_per_peer
if len(self.swarm.connections[peer_id]) < min_conns:
new_conns = await self.swarm.dial_peer(peer_id)
logger.info(f"Added {len(new_conns)} new connections for peer {peer_id}")
except Exception as e:
logger.error(f"Error replacing connection: {e}") 🔧 4. Health-Aware Load Balancingclass Swarm(Service, INetworkService):
def _select_connection(self, connections: list[INetConn], peer_id: ID) -> INetConn:
"""Enhanced connection selection with health awareness."""
if not connections:
raise ValueError("No connections available")
strategy = self.connection_config.load_balancing_strategy
if strategy == "health_based" and hasattr(self, 'health_monitor'):
# Select connection with highest health score
return max(connections, key=lambda c:
self.health_monitor.get_connection_health(peer_id, c).health_score)
elif strategy == "latency_based" and hasattr(self, 'health_monitor'):
# Select connection with lowest latency
return min(connections, key=lambda c:
self.health_monitor.get_connection_health(peer_id, c).ping_latency)
elif strategy == "least_loaded":
# Find connection with least streams
return min(connections, key=lambda c: len(c.get_streams()))
elif strategy == "round_robin":
# Simple round-robin selection
if peer_id not in self._round_robin_index:
self._round_robin_index[peer_id] = 0
index = self._round_robin_index[peer_id] % len(connections)
self._round_robin_index[peer_id] += 1
return connections[index]
else:
# Default to first connection
return connections[0] 🔧 5. Health Metrics Collection and Reportingclass ConnectionHealthReporter:
"""Collect and report connection health metrics."""
def __init__(self, health_monitor: ConnectionHealthMonitor):
self.health_monitor = health_monitor
def get_peer_health_summary(self, peer_id: ID) -> dict:
"""Get health summary for a specific peer."""
if peer_id not in self.health_monitor.health_data:
return {}
connections = self.health_monitor.health_data[peer_id]
if not connections:
return {}
# Aggregate health metrics across all connections
total_health_score = sum(conn.health.health_score for conn in connections.values())
avg_latency = sum(conn.health.ping_latency for conn in connections.values()) / len(connections)
avg_success_rate = sum(conn.health.ping_success_rate for conn in connections.values()) / len(connections)
return {
"peer_id": str(peer_id),
"connection_count": len(connections),
"average_health_score": total_health_score / len(connections),
"average_latency_ms": avg_latency,
"average_success_rate": avg_success_rate,
"total_streams": sum(conn.health.stream_count for conn in connections.values()),
"unhealthy_connections": sum(1 for conn in connections.values()
if conn.health.health_score < 0.5)
}
def get_global_health_summary(self) -> dict:
"""Get global health summary across all peers."""
all_peers = list(self.health_monitor.health_data.keys())
if not all_peers:
return {}
peer_summaries = [self.get_peer_health_summary(peer_id) for peer_id in all_peers]
return {
"total_peers": len(all_peers),
"total_connections": sum(ps["connection_count"] for ps in peer_summaries),
"average_peer_health": sum(ps["average_health_score"] for ps in peer_summaries) / len(all_peers),
"peers_with_issues": sum(1 for ps in peer_summaries if ps["unhealthy_connections"] > 0),
"peer_details": peer_summaries
}
def export_health_metrics(self, format: str = "json") -> str:
"""Export health metrics in various formats."""
summary = self.get_global_health_summary()
if format == "json":
import json
return json.dumps(summary, indent=2)
elif format == "prometheus":
return self._format_prometheus_metrics(summary)
else:
raise ValueError(f"Unsupported format: {format}")
def _format_prometheus_metrics(self, summary: dict) -> str:
"""Format metrics for Prometheus monitoring."""
metrics = []
metrics.append(f"# HELP libp2p_peers_total Total number of peers")
metrics.append(f"# TYPE libp2p_peers_total gauge")
metrics.append(f"libp2p_peers_total {summary['total_peers']}")
metrics.append(f"# HELP libp2p_connections_total Total number of connections")
metrics.append(f"# TYPE libp2p_connections_total gauge")
metrics.append(f"libp2p_connections_total {summary['total_connections']}")
metrics.append(f"# HELP libp2p_average_peer_health Average health score across all peers")
metrics.append(f"# TYPE libp2p_average_peer_health gauge")
metrics.append(f"libp2p_average_peer_health {summary['average_peer_health']}")
return "\n".join(metrics) Implementation StrategyPhase 1: Core Health Metrics (High Priority)
Phase 2: Proactive Monitoring (Medium Priority)
Phase 3: Advanced Features (Low Priority)
Benefits🎯 1. Production Reliability
🎯 2. Performance Optimization
🎯 3. Operational Visibility
🎯 4. Compliance with Reference Implementations
ConclusionThe current PR #876 provides a solid foundation for multiple connections per peer support, but adding comprehensive connection health monitoring would significantly enhance its production readiness and bring it in line with the reference implementations. These enhancements would transform Python libp2p from a basic multi-connection implementation to a production-grade, self-healing networking layer with sophisticated health monitoring and automatic failover capabilities. Recommendation: Implement these enhancements in a follow-up PR after the current architectural refactoring is merged and stable. |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
Uh oh!
There was an error while loading. Please reload this page.
-
TODO Analysis: Swarm.py Issues
Last review: August 2025
Overview
This document analyzes two TODO issues in
libp2p/network/swarm.py
:Line 292: Listener nursery I/O agnostic API issueShould be dismissedIssue 1: Connection and Peer ID 1-1 Mapping Limitation
Issue Location
File:
libp2p/network/swarm.py:70
TODO Comment:
# TODO: Connection and
peer_idare 1-1 mapping in our implementation, whereas in Go one
peer_idmay point to multiple connections.
Current Problem
The Swarm class uses a simple dictionary mapping:
Limitations:
Evidence: Partial Implementation Already Exists
Circuit Relay v2 NAT Module (
libp2p/relay/circuit_v2/nat.py:240
):DCUtR Protocol (
libp2p/relay/circuit_v2/dcutr.py:504-521
):The system is already designed for multiple connections, but core Swarm implementation hasn't been updated.
Comparative Analysis: Other Implementations
Go Implementation ✅ FULLY IMPLEMENTED
JavaScript Implementation ✅ FULLY IMPLEMENTED
libp2p Specifications Support
SOLUTION: PR #743 is Implementing This
Status: Open PR addressing Issue 1
URL: #743
What it implements:
Alignment with research:
Recommendation: SUPPORT THIS PR - It directly addresses the core TODO issue and aligns with all other libp2p implementations.
Required Changes (if PR not merged)
libp2p/network/swarm.py
- Core implementationlibp2p/network/connection/swarm_connection.py
- Connection managementlibp2p/abc.py
- Interface updatesIssue 2: Listener Nursery I/O Agnostic APIShould be dimissed (Remove TODO)Issue Location
File:
libp2p/network/swarm.py:292
TODO Comment:
# TODO:
listener.listenis not bounded with nursery. If we want to be I/O agnostic, we should change the API.
DEEP ANALYSIS: Why Issue 2 is NOT Optimal to Implement
Answer: Issue 2 should be DISMISSED due to fundamental architectural constraints.
Critical Implementation Constraints
1. TCPListener Nursery Dependency is FUNDAMENTAL
The Problem:
trio.serve_tcp()
requiresTaskStatus
fromnursery.start()
serve_tcp
function must report back to nursery when it starts2. Swarm Nursery Lifecycle Management is COMPLEX
The Problem:
3. Notification System Uses Nursery
The Problem:
Files That Would Need MAJOR Changes
Core Implementation Files (COMPLEX CHANGES)
libp2p/transport/tcp/tcp.py
- COMPLETE REWRITEtrio.serve_tcp()
without nursery coordinationlibp2p/network/swarm.py
- MAJOR REFACTORINGlibp2p/abc.py
- Interface redesignTest and Factory Files (MINOR CHANGES)
tests/core/transport/test_tcp.py
- Remove nursery parametertests/utils/factories.py
- Update factory callslibp2p/relay/circuit_v2/transport.py
- Simple parameter removalWhy Implementation is NOT Optimal
Option A: Complete Nursery Abstraction (IMPOSSIBLE)
Option B: Hybrid Approach (COMPLEX)
Option C: Framework-Specific Implementation (RECOMMENDED)
Conclusion
Issue 2 should be DISMISSED because:
Recommendation: Focus on Issue 1 (Multiple connections) which provides real value and is achievable, while accepting Issue 2 as a Trio-specific limitation rather than a critical TODO.
Combined Impact and Recommendations
Priority Assessment
Issue 1 (Multiple Connections) - HIGH Priority ⬆️
Issue 2 (I/O Agnostic API) - LOW Priority ⬇️
Implementation Order
Focus on Issue 1 (Multiple connections)
Dismiss Issue 2 (I/O agnostic API)
Migration Strategy
Files That Would Need Updates
For Issue 1 (Multiple Connections) - RECOMMENDED
libp2p/network/swarm.py
- Core implementationlibp2p/network/connection/swarm_connection.py
- Connection managementlibp2p/abc.py
- Interface updatestests/core/network/test_swarm.py
- Test updatestests/core/host/test_*.py
- Host testsFor Issue 2 (I/O Agnostic API) - NOT RECOMMENDED
Conclusion
The TODO issues represent different levels of importance for the libp2p Python implementation:
Key Findings:
Recommendation: Focus on Issue 1 (Multiple connections) which provides real value and is achievable, while accepting Issue 2 as a Trio-specific limitation rather than a critical TODO. This approach will unlock the full potential of the networking layer and achieve feature parity with other libp2p implementations without attempting an impossible architectural rewrite.
Beta Was this translation helpful? Give feedback.
All reactions