Replies: 2 comments
-
py-multiaddr vs js-multiaddr Implementation ComparisonOverviewThis document compares the Python ( Core Implementation Differences1. Type System
2. Protocol Handling
3. Data Access Methods
Feature Comparison1. Core Features (Both Implement)✅ Both Implement:
2. Additional Features in js-multiaddr🔍 js-multiaddr Only: DNS Resolution SupportThe JavaScript implementation includes built-in DNS resolution capabilities, particularly for DNSADDR protocol. js-multiaddr Example: // Resolving a DNSADDR multiaddr
const ma = multiaddr('/dnsaddr/bootstrap.libp2p.io')
const resolved = await ma.resolve()
// Returns array of resolved multiaddrs Proposed py-multiaddr Implementation: # New file: multiaddr/resolvers/dns.py
from typing import List, Optional
import aiodns
from ..multiaddr import Multiaddr
class DNSResolver:
def __init__(self):
self.resolver = aiodns.DNSResolver()
async def resolve(self, ma: Multiaddr) -> List[Multiaddr]:
"""Resolve a DNSADDR multiaddr to its actual addresses."""
if not ma.protocols()[0].name == 'dnsaddr':
return [ma]
# Get the hostname
hostname = ma.value_for_protocol('dnsaddr')
if not hostname:
return [ma]
# Query DNS TXT records
try:
txt_records = await self.resolver.query(hostname, 'TXT')
results = []
for record in txt_records:
# Parse TXT record and create new multiaddr
for addr in record.text:
if addr.startswith('dnsaddr='):
results.append(Multiaddr(addr[8:]))
return results
except Exception as e:
raise DNSResolutionError(f"Failed to resolve {hostname}: {str(e)}") Thin Waist Address Validationjs-multiaddr includes explicit validation for thin waist addresses (ip4/ip6 + tcp/udp combinations). js-multiaddr Example: const ma = multiaddr('/ip4/127.0.0.1/tcp/4001')
ma.isThinWaistAddress() // true
const ma2 = multiaddr('/ip4/127.0.0.1/tcp/4001/ws')
ma2.isThinWaistAddress() // false Proposed py-multiaddr Implementation: # Add to multiaddr/multiaddr.py
def is_thin_waist(self) -> bool:
"""Check if this multiaddr is a thin waist address.
A thin waist address must start with ip4/ip6 followed by tcp/udp.
Additional protocols can be added after the thin waist.
"""
protocols = list(self.protocols())
if len(protocols) < 2:
return False
# Check first protocol is ip4/ip6
if protocols[0].code not in (4, 41): # ip4 or ip6
return False
# Check second protocol is tcp/udp
if protocols[1].code not in (6, 273): # tcp or udp
return False
return True
def get_thin_waist(self) -> Optional['Multiaddr']:
"""Extract the thin waist portion of this multiaddr if it exists."""
if not self.is_thin_waist():
return None
protocols = list(self.protocols())
return self.split(2)[0] # Return first two protocols Protocol Utility Methodsjs-multiaddr provides more extensive protocol-related utility methods. js-multiaddr Example: const ma = multiaddr('/ip4/127.0.0.1/tcp/4001')
// Protocol information methods
ma.protoCodes() // [4, 6]
ma.protoNames() // ['ip4', 'tcp']
ma.tuples() // [[4, <Buffer>], [6, <Buffer>]]
ma.stringTuples() // [[4, '127.0.0.1'], [6, '4001']]
// Protocol validation and extraction
ma.isThinWaistAddress() // Checks if address follows thin waist principle
ma.getPeerId() // Returns peer ID if present
ma.getPath() // Returns path if present
// Protocol manipulation
const addr = multiaddr('/ip4/0.0.0.0/tcp/8080/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')
addr.decapsulateCode(421) // Returns: Multiaddr('/ip4/0.0.0.0/tcp/8080') Proposed py-multiaddr Implementation: # Add to multiaddr/multiaddr.py
# Protocol information methods
def proto_codes(self) -> List[int]:
"""Return the protocol codes in order."""
return [p.code for p in self.protocols()]
def proto_names(self) -> List[str]:
"""Return the protocol names in order."""
return [p.name for p in self.protocols()]
def tuples(self) -> List[Tuple[int, bytes]]:
"""Return the protocol code/value pairs."""
return [(p.code, self.value_for_protocol(p))
for p in self.protocols()
if self.value_for_protocol(p) is not None]
def string_tuples(self) -> List[Tuple[int, str]]:
"""Return the protocol code/value pairs with string values.
Example:
>>> addr = Multiaddr('/ip4/127.0.0.1/tcp/4001')
>>> addr.string_tuples()
[(4, '127.0.0.1'), (6, '4001')]
"""
return [(p.code, str(self.value_for_protocol(p)))
for p in self.protocols()
if self.value_for_protocol(p) is not None]
# Protocol validation and extraction
def is_thin_waist(self) -> bool:
"""Check if this multiaddr is a thin waist address.
A thin waist address must start with ip4/ip6 followed by tcp/udp.
Additional protocols can be added after the thin waist.
Example:
>>> addr1 = Multiaddr('/ip4/127.0.0.1/tcp/80')
>>> addr1.is_thin_waist() # True
>>> addr2 = Multiaddr('/ip4/127.0.0.1/tcp/80/udp/53')
>>> addr2.is_thin_waist() # False
"""
protocols = list(self.protocols())
if len(protocols) < 2:
return False
# Check first protocol is ip4/ip6
if protocols[0].code not in (4, 41): # ip4 or ip6
return False
# Check second protocol is tcp/udp
if protocols[1].code not in (6, 273): # tcp or udp
return False
return True
def get_peer_id(self) -> Optional[str]:
"""Get the P2P peer ID if present.
Example:
>>> addr = Multiaddr('/ip4/127.0.0.1/tcp/80/p2p/QmHash')
>>> addr.get_peer_id() # Returns: 'QmHash'
"""
for proto in self.protocols():
if proto.name == 'p2p':
return proto.value
return None
def get_path(self) -> Optional[str]:
"""Get the path if present.
Example:
>>> addr = Multiaddr('/ip4/127.0.0.1/tcp/80/unix/tmp/p2p.sock')
>>> addr.get_path() # Returns: '/tmp/p2p.sock'
"""
for proto in self.protocols():
if proto.name == 'unix':
return proto.value
return None
# Protocol manipulation
def decapsulate_code(self, code: int) -> 'Multiaddr':
"""Remove the last occurrence of a protocol with the given code.
This is a more reliable version of decapsulate when targeting a specific
protocol code. The last index of the code will be removed from the Multiaddr,
and a new instance will be returned. If the code is not present, the original
Multiaddr is returned.
Example:
>>> addr = Multiaddr('/ip4/0.0.0.0/tcp/8080/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')
>>> addr.decapsulate_code(421) # 421 is the p2p protocol code
Multiaddr('/ip4/0.0.0.0/tcp/8080')
"""
protocols = list(self.protocols())
# Find the last occurrence of the protocol code
for i in range(len(protocols) - 1, -1, -1):
if protocols[i].code == code:
# Split at this index and return the first part
return self.split(i)[0]
# If code not found, return a copy of the original
return Multiaddr(self) Comprehensive Error Handlingjs-multiaddr has more specific error types and better error messages. js-multiaddr Example: try {
const ma = multiaddr('/invalid/addr')
} catch (e) {
// e is a specific error type with detailed message
console.error(e.message) // "Protocol not found: invalid"
} Proposed py-multiaddr Implementation: # New file: multiaddr/exceptions.py
class MultiaddrError(Exception):
"""Base class for all multiaddr errors."""
pass
class ProtocolNotFoundError(MultiaddrError):
"""Raised when a protocol is not found in the registry."""
def __init__(self, protocol: str):
super().__init__(f"Protocol not found: {protocol}")
self.protocol = protocol
class InvalidMultiaddrError(MultiaddrError):
"""Raised when a multiaddr string is invalid."""
def __init__(self, addr: str, reason: str):
super().__init__(f"Invalid multiaddr '{addr}': {reason}")
self.addr = addr
self.reason = reason
class ProtocolValueError(MultiaddrError):
"""Raised when a protocol value is invalid."""
def __init__(self, protocol: str, value: str, reason: str):
super().__init__(
f"Invalid value for protocol {protocol}: {value} - {reason}"
)
self.protocol = protocol
self.value = value
self.reason = reason Protocol Information Accessjs-multiaddr provides more comprehensive access to protocol information. js-multiaddr Example: const ma = multiaddr('/ip4/127.0.0.1/tcp/4001')
ma.protos() // Returns detailed protocol information
// [
// {code: 4, size: 32, name: 'ip4', resolvable: false},
// {code: 6, size: 16, name: 'tcp', resolvable: false}
// ] Proposed py-multiaddr Implementation: # Add to multiaddr/protocols.py
@dataclass
class ProtocolInfo:
"""Detailed information about a protocol."""
code: int
size: int
name: str
resolvable: bool = False
path: bool = False
# Add to multiaddr/multiaddr.py
def protocol_info(self) -> List[ProtocolInfo]:
"""Return detailed information about each protocol in this multiaddr."""
return [
ProtocolInfo(
code=p.code,
size=p.size,
name=p.name,
resolvable=getattr(p, 'resolvable', False),
path=getattr(p, 'path', False)
)
for p in self.protocols()
] Missing Tests in py-multiaddrjs-multiaddr has a comprehensive test suite that covers various aspects of multiaddr functionality. Here are the key tests that should be added to py-multiaddr: 1. Basic Construction Tests: def test_construction():
# Create multiaddr
udp_addr = Multiaddr('/ip4/127.0.0.1/udp/1234')
assert is_multiaddr(udp_addr) is True
# Clone multiaddr
udp_addr_clone = Multiaddr(udp_addr)
assert udp_addr_clone != udp_addr
assert udp_addr_clone.bytes == udp_addr.bytes
# Reconstruct with bytes
assert Multiaddr(udp_addr.bytes).bytes != udp_addr.bytes # Different objects
assert Multiaddr(udp_addr.bytes).bytes == udp_addr.bytes # Same content
# Reconstruct with string
assert Multiaddr(udp_addr.to_string()).bytes != udp_addr.bytes
assert Multiaddr(udp_addr.to_string()).bytes == udp_addr.bytes
# Empty construct
assert Multiaddr('').to_string() == '/'
# Invalid inputs
with pytest.raises(ValueError):
Multiaddr({}) # Invalid dict
with pytest.raises(ValueError):
Multiaddr([]) # Invalid list
with pytest.raises(ValueError):
Multiaddr(138) # Invalid number
def test_construction_edge_cases():
# Null/None handling
assert Multiaddr(None).to_string() == '/'
assert Multiaddr().to_string() == '/'
# Empty string variations
assert Multiaddr('').to_string() == '/'
assert Multiaddr('/').to_string() == '/'
# Whitespace handling
with pytest.raises(ValueError):
Multiaddr(' /ip4/127.0.0.1') # Leading whitespace
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1 ') # Trailing whitespace
# Special characters
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/80/') # Trailing slash
with pytest.raises(ValueError):
Multiaddr('ip4/127.0.0.1') # Missing leading slash 2. Protocol Manipulation Tests: def test_protocol_manipulation():
# Basic protocol operations
udp_addr = Multiaddr('/ip4/127.0.0.1/udp/1234')
assert udp_addr.proto_codes() == [4, 273] # ip4, udp
assert udp_addr.proto_names() == ['ip4', 'udp']
# Encapsulation
udp_addr2 = udp_addr.encapsulate('/udp/5678')
assert udp_addr2.to_string() == '/ip4/127.0.0.1/udp/1234/udp/5678'
# Decapsulation
assert udp_addr2.decapsulate('/udp').to_string() == '/ip4/127.0.0.1/udp/1234'
assert udp_addr2.decapsulate('/ip4').to_string() == '/'
# Decapsulate code
p2p_addr = Multiaddr('/ip4/0.0.0.0/tcp/8080/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC')
assert p2p_addr.decapsulate_code(421).to_string() == '/ip4/0.0.0.0/tcp/8080'
def test_protocol_manipulation_edge_cases():
# Empty encapsulation/decapsulation
addr = Multiaddr('/ip4/127.0.0.1')
assert addr.encapsulate('').to_string() == addr.to_string()
assert addr.decapsulate('').to_string() == addr.to_string()
# Self encapsulation
addr = Multiaddr('/ip4/127.0.0.1')
assert addr.encapsulate(addr).to_string() == '/ip4/127.0.0.1/ip4/127.0.0.1'
# Multiple decapsulation
addr = Multiaddr('/ip4/127.0.0.1/tcp/80/tcp/443')
assert addr.decapsulate('/tcp').to_string() == '/ip4/127.0.0.1/tcp/80'
# Non-existent protocol decapsulation
addr = Multiaddr('/ip4/127.0.0.1')
assert addr.decapsulate('/tcp').to_string() == addr.to_string()
# Protocol code edge cases
addr = Multiaddr('/ip4/127.0.0.1/tcp/80')
with pytest.raises(ValueError):
addr.decapsulate_code(-1) # Invalid protocol code
with pytest.raises(ValueError):
addr.decapsulate_code(999999) # Non-existent protocol code 3. Protocol Variant Tests: def test_protocol_variants():
# IP4
ip4_addr = Multiaddr('/ip4/127.0.0.1')
assert ip4_addr.to_string() == '/ip4/127.0.0.1'
# IP6
ip6_addr = Multiaddr('/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095')
assert ip6_addr.to_string() == '/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095'
# IP4 + TCP
tcp_addr = Multiaddr('/ip4/127.0.0.1/tcp/5000')
assert tcp_addr.to_string() == '/ip4/127.0.0.1/tcp/5000'
def test_protocol_variants_edge_cases():
# IP4 edge cases
with pytest.raises(ValueError):
Multiaddr('/ip4/256.0.0.1') # Invalid IP4
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0') # Incomplete IP4
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1.1') # Extra octet
# IP6 edge cases
with pytest.raises(ValueError):
Multiaddr('/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31:7095:1234') # Extra segment
with pytest.raises(ValueError):
Multiaddr('/ip6/2001:8a0:7ac5:4201:3ac9:86ff:fe31') # Incomplete IP6
# TCP/UDP port edge cases
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/70000') # Port too large
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/-1') # Negative port
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/abc') # Non-numeric port 4. Thin Waist Tests: def test_thin_waist():
# Valid thin waist addresses
valid_addr1 = Multiaddr('/ip4/127.0.0.1/tcp/80')
assert valid_addr1.is_thin_waist() is True
valid_addr2 = Multiaddr('/ip6/::1/udp/53')
assert valid_addr2.is_thin_waist() is True
# Invalid thin waist addresses
invalid_addr1 = Multiaddr('/ip4/127.0.0.1/tcp/80/udp/53')
assert invalid_addr1.is_thin_waist() is False
invalid_addr2 = Multiaddr('/tcp/80')
assert invalid_addr2.is_thin_waist() is False
def test_thin_waist_edge_cases():
# Empty address
assert Multiaddr('').is_thin_waist() is False
# Single protocol
assert Multiaddr('/ip4/127.0.0.1').is_thin_waist() is False
assert Multiaddr('/tcp/80').is_thin_waist() is False
# Wrong order
assert Multiaddr('/tcp/80/ip4/127.0.0.1').is_thin_waist() is False
# Multiple network protocols
assert Multiaddr('/ip4/127.0.0.1/ip6/::1/tcp/80').is_thin_waist() is False
# Multiple transport protocols
assert Multiaddr('/ip4/127.0.0.1/tcp/80/tcp/443').is_thin_waist() is False
# Non-standard protocols
assert Multiaddr('/ip4/127.0.0.1/ws').is_thin_waist() is False
assert Multiaddr('/ip4/127.0.0.1/wss').is_thin_waist() is False 5. Protocol Information Tests: def test_protocol_information():
addr = Multiaddr('/ip4/127.0.0.1/tcp/80/p2p/QmHash')
# Get peer ID
assert addr.get_peer_id() == 'QmHash'
# Get path
unix_addr = Multiaddr('/ip4/127.0.0.1/tcp/80/unix/tmp/p2p.sock')
assert unix_addr.get_path() == '/tmp/p2p.sock'
# Protocol tuples
assert addr.tuples() == [(4, b'127.0.0.1'), (6, b'80'), (421, b'QmHash')]
assert addr.string_tuples() == [(4, '127.0.0.1'), (6, '80'), (421, 'QmHash')]
def test_protocol_information_edge_cases():
# Empty address
addr = Multiaddr('')
assert addr.proto_codes() == []
assert addr.proto_names() == []
assert addr.tuples() == []
assert addr.string_tuples() == []
# Missing peer ID
addr = Multiaddr('/ip4/127.0.0.1/tcp/80')
assert addr.get_peer_id() is None
# Invalid peer ID format
addr = Multiaddr('/ip4/127.0.0.1/tcp/80/p2p/invalid')
assert addr.get_peer_id() == 'invalid' # Should still return the value
# Multiple paths
addr = Multiaddr('/ip4/127.0.0.1/tcp/80/unix/path1/unix/path2')
assert addr.get_path() == '/path2' # Should return last path
# Protocol with no value
addr = Multiaddr('/ip4/127.0.0.1/tcp/80/ws')
assert addr.tuples() == [(4, b'127.0.0.1'), (6, b'80'), (477, b'')] 6. Error Handling Tests: def test_error_handling():
# Invalid protocol codes
with pytest.raises(ValueError):
Multiaddr('/invalid/127.0.0.1')
# Invalid protocol values
with pytest.raises(ValueError):
Multiaddr('/ip4/256.0.0.1') # Invalid IP
# Invalid decapsulation
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1').decapsulate('/')
def test_error_handling_edge_cases():
# Protocol code validation
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1').decapsulate_code(-1)
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1').decapsulate_code(999999)
# Protocol value validation
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/abc') # Non-numeric port
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/-1') # Negative port
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/70000') # Port too large
# Protocol order validation
with pytest.raises(ValueError):
Multiaddr('/tcp/80/ip4/127.0.0.1') # Wrong order
# Protocol combination validation
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/ip4/192.168.0.1') # Multiple IP4
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/80/tcp/443') # Multiple TCP
# Protocol value format validation
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/80/p2p/') # Empty peer ID
with pytest.raises(ValueError):
Multiaddr('/ip4/127.0.0.1/tcp/80/unix/') # Empty path These tests cover the core functionality of multiaddr and would help ensure that py-multiaddr maintains compatibility with js-multiaddr. The tests are organized into logical groups and cover:
Each test group now includes both basic functionality tests and edge cases to ensure robust implementation. ConclusionThe JavaScript implementation ( Key areas where
These enhancements would make the Python implementation more robust, easier to use, and more aligned with the JavaScript implementation's capabilities. They would also make it easier to work with multiaddrs in a type-safe and error-resistant way. |
Beta Was this translation helpful? Give feedback.
-
I'm working on DNS resolution support here multiformats/py-multiaddr#68 |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
py-multiaddr Specification Compliance Analysis
Overview
This document analyzes the compliance of py-multiaddr with the libp2p addressing specifications. The analysis is based on the implementation in
multiaddr/multiaddr.py
and related files.Core Requirements Compliance
1. Basic Multiaddr Format
✅ Compliant
2. Protocol Support
✅ Compliant
3. Binary Format
✅ Compliant
4. String Format
✅ Compliant
Feature Implementation
1. Core Operations
✅ Implemented
encapsulate()
- Adds one multiaddr to anotherdecapsulate()
- Removes a multiaddr from anothersplit()
- Splits a multiaddr into componentsjoin()
- Combines multiple multiaddrs2. Protocol Operations
✅ Implemented
3. Data Access
✅ Implemented
to_bytes()
- Binary representation__str__()
- String representationprotocols()
- Protocol listvalue_for_protocol()
- Protocol value extractionMissing or Incomplete Features
1. DNS Resolution
❌ Not Implemented
Implementation Details:
DNS Resolution System:
multiaddr/resolvers/
to handle DNS resolutionResolver
class with async resolution methodsRequired Changes:
Integration Points:
resolve()
method toMultiaddr
class2. Thin Waist Address Validation
❌ Not Implemented
Implementation Details:
Thin Waist Definition:
{ip4,ip6}/{tcp,udp}
Required Changes:
Validation Rules:
Integration Points:
Multiaddr
constructorencapsulate()
methodRecommendations
High Priority
Medium Priority
Low Priority
Conclusion
The py-multiaddr implementation follows the core libp2p addressing specifications well, but lacks some advanced features present in other implementations. The core functionality is solid, but there's room for improvement in terms of additional features like DNS resolution and thin waist address validation.
Beta Was this translation helpful? Give feedback.
All reactions