Skip to content

Commit cbe39af

Browse files
committed
fix: fix many typecheck errors
1 parent e01dbd3 commit cbe39af

File tree

16 files changed

+874
-319
lines changed

16 files changed

+874
-319
lines changed

multiaddr/codecs/__init__.py

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,44 @@
11
import importlib
2+
from typing import Dict, Union, Any
23

34

45
# These are special sizes
56
LENGTH_PREFIXED_VAR_SIZE = -1
67

78

8-
class NoneCodec:
9-
SIZE = 0
10-
IS_PATH = False
9+
class CodecBase:
10+
SIZE: int
11+
IS_PATH: bool
1112

13+
def to_string(self, proto: Any, buf: bytes) -> str:
14+
raise NotImplementedError
1215

13-
CODEC_CACHE = {}
16+
def to_bytes(self, proto: Any, string: str) -> bytes:
17+
raise NotImplementedError
1418

1519

16-
def codec_by_name(name):
17-
if name is None: # Special “do nothing – expect nothing” pseudo-codec
18-
return NoneCodec
20+
class NoneCodec(CodecBase):
21+
SIZE: int = 0
22+
IS_PATH: bool = False
23+
24+
def to_string(self, proto: Any, buf: bytes) -> str:
25+
return ""
26+
27+
def to_bytes(self, proto: Any, string: str) -> bytes:
28+
return b""
29+
30+
31+
CODEC_CACHE: Dict[str, CodecBase] = {}
32+
33+
34+
def codec_by_name(name: Union[str, None]) -> CodecBase:
35+
if name is None: # Special "do nothing – expect nothing" pseudo-codec
36+
return NoneCodec()
1937
codec = CODEC_CACHE.get(name)
20-
if not codec:
21-
codec = CODEC_CACHE[name] = importlib.import_module(".{0}".format(name), __name__)
38+
if codec is None:
39+
module = importlib.import_module(f".{name}", __name__)
40+
codec_class = getattr(module, "Codec")
41+
assert codec_class is not None, f"Codec {name} not found"
42+
codec = codec_class()
43+
CODEC_CACHE[name] = codec
2244
return codec

multiaddr/codecs/cid.py

Lines changed: 72 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,17 @@
11
import base58
22
import cid
3+
from typing import Dict, List
4+
from ..codecs import CodecBase
35

46
from . import LENGTH_PREFIXED_VAR_SIZE
57

6-
78
SIZE = LENGTH_PREFIXED_VAR_SIZE
89
IS_PATH = False
910

1011

1112
# Spec: https://github.com/libp2p/specs/blob/master/peer-ids/peer-ids.md#string-representation
12-
CIDv0_PREFIX_TO_LENGTH = {
13-
# base58btc prefixes for valid lengths 1 – 42 with the identity hash function
13+
CIDv0_PREFIX_TO_LENGTH: Dict[str, List[int]] = {
14+
# base58btc prefixes for valid lengths 1 – 42 with the identity "hash" function
1415
'12': [5, 12, 19, 23, 30, 41, 52, 56],
1516
'13': [9, 16, 34, 45],
1617
'14': [27, 38, 49, 60],
@@ -62,71 +63,82 @@
6263
'Qm': [46],
6364
}
6465

65-
PROTO_NAME_TO_CIDv1_CODEC = {
66-
# The p2p multiaddr protocol requires all keys to use the libp2p-key multicodec
66+
PROTO_NAME_TO_CIDv1_CODEC: Dict[str, str] = {
67+
# The "p2p" multiaddr protocol requires all keys to use the "libp2p-key" multicodec
6768
"p2p": "libp2p-key",
6869
}
6970

7071

71-
def to_bytes(proto, string):
72-
expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
73-
74-
if len(string) in CIDv0_PREFIX_TO_LENGTH.get(string[0:2], ()): # CIDv0
75-
# Upgrade the wire (binary) representation of any received CIDv0 string
76-
# to CIDv1 if we can determine which multicodec value to use
77-
if expected_codec:
78-
return cid.make_cid(1, expected_codec, base58.b58decode(string)).buffer
79-
80-
return base58.b58decode(string)
81-
else: # CIDv1+
82-
parsed = cid.from_string(string)
83-
84-
# Ensure CID has correct codec for protocol
85-
if expected_codec and parsed.codec != expected_codec:
86-
raise ValueError("“{0}” multiaddr CIDs must use the “{1}” multicodec"
87-
.format(proto.name, expected_codec))
88-
89-
return parsed.buffer
90-
91-
92-
def _is_binary_cidv0_multihash(buf):
72+
class Codec(CodecBase):
73+
SIZE = SIZE
74+
IS_PATH = IS_PATH
75+
76+
def to_bytes(self, proto, string):
77+
expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
78+
79+
if len(string) in CIDv0_PREFIX_TO_LENGTH.get(string[0:2], ()): # CIDv0
80+
# Upgrade the wire (binary) representation of any received CIDv0 string
81+
# to CIDv1 if we can determine which multicodec value to use
82+
if expected_codec:
83+
cid_obj = cid.make_cid(1, expected_codec, base58.b58decode(string))
84+
assert isinstance(cid_obj.buffer, bytes)
85+
return cid_obj.buffer
86+
87+
return base58.b58decode(string)
88+
else: # CIDv1+
89+
parsed = cid.from_string(string)
90+
91+
# Ensure CID has correct codec for protocol
92+
if expected_codec and parsed.codec != expected_codec:
93+
raise ValueError(
94+
'"{0}" multiaddr CIDs must use the "{1}" multicodec'.format(
95+
proto.name, expected_codec
96+
)
97+
)
98+
99+
return parsed.buffer
100+
101+
def to_string(self, proto, buf):
102+
expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
103+
104+
if _is_binary_cidv0_multihash(buf): # CIDv0
105+
if not expected_codec:
106+
# Simply encode as base58btc as there is nothing better to do
107+
return base58.b58encode(buf).decode('ascii')
108+
109+
# "Implementations SHOULD display peer IDs using the first (raw
110+
# base58btc encoded multihash) format until the second format is
111+
# widely supported."
112+
#
113+
# In the future the following line should instead convert the multihash
114+
# to CIDv1 and with the `expected_codec` and wrap it in base32:
115+
# return cid.make_cid(1, expected_codec, buf).encode("base32").decode("ascii")
116+
return base58.b58encode(buf).decode("ascii")
117+
else: # CIDv1+
118+
parsed = cid.from_bytes(buf)
119+
120+
# Ensure CID has correct codec for protocol
121+
if expected_codec and parsed.codec != expected_codec:
122+
raise ValueError(
123+
'"{0}" multiaddr CIDs must use the "{1}" multicodec'.format(
124+
proto.name, expected_codec
125+
)
126+
)
127+
128+
# "Implementations SHOULD display peer IDs using the first (raw
129+
# base58btc encoded multihash) format until the second format is
130+
# widely supported."
131+
if expected_codec and _is_binary_cidv0_multihash(parsed.multihash):
132+
return base58.b58encode(parsed.multihash).decode("ascii")
133+
134+
return parsed.encode("base32").decode("ascii")
135+
136+
137+
def _is_binary_cidv0_multihash(buf: bytes) -> bool:
93138
if buf.startswith(b"\x12\x20") and len(buf) == 34: # SHA2-256
94139
return True
95140

96141
if (buf[0] == 0x00 and buf[1] in range(43)) and len(buf) == (buf[1] + 2): # Identity hash
97142
return True
98143

99144
return False
100-
101-
102-
def to_string(proto, buf):
103-
expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
104-
105-
if _is_binary_cidv0_multihash(buf): # CIDv0
106-
if not expected_codec:
107-
# Simply encode as base58btc as there is nothing better to do
108-
return base58.b58encode(buf).decode('ascii')
109-
110-
# “Implementations SHOULD display peer IDs using the first (raw
111-
# base58btc encoded multihash) format until the second format is
112-
# widely supported.”
113-
#
114-
# In the future the following line should instead convert the multihash
115-
# to CIDv1 and with the `expected_codec` and wrap it in base32:
116-
# return cid.make_cid(1, expected_codec, buf).encode("base32").decode("ascii")
117-
return base58.b58encode(buf).decode("ascii")
118-
else: # CIDv1+
119-
parsed = cid.from_bytes(buf)
120-
121-
# Ensure CID has correct codec for protocol
122-
if expected_codec and parsed.codec != expected_codec:
123-
raise ValueError("“{0}” multiaddr CIDs must use the “{1}” multicodec"
124-
.format(proto.name, expected_codec))
125-
126-
# “Implementations SHOULD display peer IDs using the first (raw
127-
# base58btc encoded multihash) format until the second format is
128-
# widely supported.”
129-
if expected_codec and _is_binary_cidv0_multihash(parsed.multihash):
130-
return base58.b58encode(parsed.multihash).decode("ascii")
131-
132-
return parsed.encode("base32").decode("ascii")

multiaddr/codecs/domain.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
import idna
22

3-
from . import LENGTH_PREFIXED_VAR_SIZE
3+
from ..codecs import CodecBase
4+
from ..exceptions import BinaryParseError
45

56

6-
SIZE = LENGTH_PREFIXED_VAR_SIZE
7+
SIZE = -1
78
IS_PATH = False
89

910

11+
class Codec(CodecBase):
12+
SIZE = SIZE
13+
IS_PATH = IS_PATH
14+
15+
def to_bytes(self, proto, string):
16+
return string.encode('utf-8')
17+
18+
def to_string(self, proto, buf):
19+
try:
20+
string = buf.decode("utf-8")
21+
for label in string.split("."):
22+
idna.check_label(label)
23+
return string
24+
except (ValueError, UnicodeDecodeError) as e:
25+
raise BinaryParseError(str(e), buf, proto)
26+
27+
1028
def to_bytes(proto, string):
1129
return idna.uts46_remap(string).encode("utf-8")
1230

multiaddr/codecs/fspath.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,27 @@
1-
import os
1+
from ..codecs import CodecBase
22

3-
from . import LENGTH_PREFIXED_VAR_SIZE
43

5-
6-
SIZE = LENGTH_PREFIXED_VAR_SIZE
4+
SIZE = -1
75
IS_PATH = True
86

97

10-
def to_bytes(proto, string):
11-
return os.fsencode(string)
8+
class Codec(CodecBase):
9+
SIZE = SIZE
10+
IS_PATH = IS_PATH
1211

12+
def to_bytes(self, proto, string):
13+
if len(string) == 0:
14+
raise ValueError("{0} value must not be empty".format(proto.name))
15+
# Remove leading slash unless the path is just '/'
16+
if string != '/' and string.startswith('/'):
17+
string = string[1:]
18+
return string.encode('utf-8')
1319

14-
def to_string(proto, buf):
15-
return os.fsdecode(buf)
20+
def to_string(self, proto, buf):
21+
if len(buf) == 0:
22+
raise ValueError("invalid length (should be > 0)")
23+
string = buf.decode('utf-8')
24+
# Always add a single leading slash
25+
if not string.startswith('/'):
26+
string = '/' + string
27+
return string

multiaddr/codecs/ip4.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import netaddr
2+
from ..codecs import CodecBase
23

34

45
SIZE = 32
56
IS_PATH = False
67

78

8-
def to_bytes(proto, string):
9-
return netaddr.IPAddress(string, version=4).packed
9+
class Codec(CodecBase):
10+
SIZE = SIZE
11+
IS_PATH = IS_PATH
1012

13+
def to_bytes(self, proto, string):
14+
return netaddr.IPAddress(string, version=4).packed
1115

12-
def to_string(proto, buf):
13-
return str(netaddr.IPAddress(int.from_bytes(buf, byteorder='big'), version=4))
16+
def to_string(self, proto, buf):
17+
return str(netaddr.IPAddress(int.from_bytes(buf, byteorder='big'), version=4))

multiaddr/codecs/ip6.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,17 @@
11
import netaddr
2+
from ..codecs import CodecBase
23

34

45
SIZE = 128
56
IS_PATH = False
67

78

8-
def to_bytes(proto, string):
9-
return netaddr.IPAddress(string, version=6).packed
9+
class Codec(CodecBase):
10+
SIZE = SIZE
11+
IS_PATH = IS_PATH
1012

13+
def to_bytes(self, proto, string):
14+
return netaddr.IPAddress(string, version=6).packed
1115

12-
def to_string(proto, buf):
13-
return str(netaddr.IPAddress(int.from_bytes(buf, byteorder='big'), version=6))
16+
def to_string(self, proto, buf):
17+
return str(netaddr.IPAddress(int.from_bytes(buf, byteorder='big'), version=6))

0 commit comments

Comments
 (0)