|
2 | 2 |
|
3 | 3 | import base58
|
4 | 4 | import cid
|
| 5 | +import varint |
5 | 6 |
|
6 | 7 | from ..codecs import CodecBase
|
7 | 8 | from . import LENGTH_PREFIXED_VAR_SIZE
|
| 9 | +from ..exceptions import BinaryParseError |
8 | 10 |
|
9 | 11 | SIZE = LENGTH_PREFIXED_VAR_SIZE
|
10 | 12 | IS_PATH = False
|
|
63 | 65 | "Qm": [46],
|
64 | 66 | }
|
65 | 67 |
|
66 |
| -PROTO_NAME_TO_CIDv1_CODEC: Dict[str, str] = { |
67 |
| - # The "p2p" multiaddr protocol requires all keys to use the "libp2p-key" multicodec |
| 68 | +PROTO_NAME_TO_CIDv1_CODEC = { |
68 | 69 | "p2p": "libp2p-key",
|
| 70 | + "ipfs": "dag-pb", |
69 | 71 | }
|
70 | 72 |
|
71 | 73 |
|
| 74 | +def _is_binary_cidv0_multihash(buf: bytes) -> bool: |
| 75 | + """Check if the given bytes represent a CIDv0 multihash.""" |
| 76 | + try: |
| 77 | + # CIDv0 is just a base58btc encoded multihash |
| 78 | + decoded = base58.b58decode(base58.b58encode(buf).decode("ascii")) |
| 79 | + return len(decoded) == len(buf) and decoded == buf |
| 80 | + except Exception: |
| 81 | + return False |
| 82 | + |
| 83 | + |
72 | 84 | class Codec(CodecBase):
|
73 | 85 | SIZE = SIZE
|
74 | 86 | IS_PATH = IS_PATH
|
75 | 87 |
|
76 |
| - def to_bytes(self, proto, string): |
77 |
| - expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name) |
78 |
| - |
79 |
| - if len(string) in CIDv0_PREFIX_TO_LENGTH.get(string[0:2], ()): # CIDv0 |
80 |
| - # Upgrade the wire (binary) representation of any received CIDv0 string |
81 |
| - # to CIDv1 if we can determine which multicodec value to use |
82 |
| - if expected_codec: |
83 |
| - cid_obj = cid.make_cid(1, expected_codec, base58.b58decode(string)) |
84 |
| - assert isinstance(cid_obj.buffer, bytes) |
85 |
| - return cid_obj.buffer |
86 |
| - |
87 |
| - return base58.b58decode(string) |
88 |
| - else: # CIDv1+ |
89 |
| - parsed = cid.from_string(string) |
90 |
| - |
91 |
| - # Ensure CID has correct codec for protocol |
92 |
| - if expected_codec and parsed.codec != expected_codec: |
93 |
| - raise ValueError( |
94 |
| - '"{0}" multiaddr CIDs must use the "{1}" multicodec'.format( |
95 |
| - proto.name, expected_codec |
96 |
| - ) |
97 |
| - ) |
98 |
| - |
99 |
| - return parsed.buffer |
| 88 | + def to_bytes(self, proto, value: str) -> bytes: |
| 89 | + """Convert a CID string to its binary representation.""" |
| 90 | + if not value: |
| 91 | + raise ValueError("CID string cannot be empty") |
| 92 | + |
| 93 | + # First try to parse as CIDv0 (base58btc encoded multihash) |
| 94 | + try: |
| 95 | + decoded = base58.b58decode(value) |
| 96 | + if _is_binary_cidv0_multihash(decoded): |
| 97 | + # Add length prefix for CIDv0 |
| 98 | + return varint.encode(len(decoded)) + decoded |
| 99 | + except Exception: |
| 100 | + pass |
| 101 | + |
| 102 | + # If not CIDv0, try to parse as CIDv1 |
| 103 | + try: |
| 104 | + parsed = cid.make_cid(value) |
| 105 | + # Add length prefix for CIDv1 |
| 106 | + return varint.encode(len(parsed.buffer)) + parsed.buffer |
| 107 | + except ValueError: |
| 108 | + raise ValueError(f"Invalid CID: {value}") |
| 109 | + |
| 110 | + def to_string(self, proto, buf: bytes) -> str: |
| 111 | + """Convert a binary CID to its string representation.""" |
| 112 | + if not buf: |
| 113 | + raise ValueError("CID buffer cannot be empty") |
100 | 114 |
|
101 |
| - def to_string(self, proto, buf): |
102 | 115 | expected_codec = PROTO_NAME_TO_CIDv1_CODEC.get(proto.name)
|
103 | 116 |
|
104 |
| - if _is_binary_cidv0_multihash(buf): # CIDv0 |
105 |
| - if not expected_codec: |
106 |
| - # Simply encode as base58btc as there is nothing better to do |
107 |
| - return base58.b58encode(buf).decode("ascii") |
| 117 | + try: |
| 118 | + if _is_binary_cidv0_multihash(buf): # CIDv0 |
| 119 | + if not expected_codec: |
| 120 | + # Simply encode as base58btc as there is nothing better to do |
| 121 | + return base58.b58encode(buf).decode("ascii") |
108 | 122 |
|
109 |
| - # "Implementations SHOULD display peer IDs using the first (raw |
110 |
| - # base58btc encoded multihash) format until the second format is |
111 |
| - # widely supported." |
112 |
| - # |
113 |
| - # In the future the following line should instead convert the multihash |
114 |
| - # to CIDv1 and with the `expected_codec` and wrap it in base32: |
115 |
| - # return cid.make_cid(1, expected_codec, buf).encode("base32").decode("ascii") |
116 |
| - return base58.b58encode(buf).decode("ascii") |
117 |
| - else: # CIDv1+ |
118 |
| - parsed = cid.from_bytes(buf) |
119 |
| - |
120 |
| - # Ensure CID has correct codec for protocol |
121 |
| - if expected_codec and parsed.codec != expected_codec: |
122 |
| - raise ValueError( |
123 |
| - '"{0}" multiaddr CIDs must use the "{1}" multicodec'.format( |
124 |
| - proto.name, expected_codec |
| 123 | + # "Implementations SHOULD display peer IDs using the first (raw |
| 124 | + # base58btc encoded multihash) format until the second format is |
| 125 | + # widely supported." |
| 126 | + return base58.b58encode(buf).decode("ascii") |
| 127 | + else: # CIDv1+ |
| 128 | + parsed = cid.from_bytes(buf) |
| 129 | + |
| 130 | + # Ensure CID has correct codec for protocol |
| 131 | + if expected_codec and parsed.codec != expected_codec: |
| 132 | + raise ValueError( |
| 133 | + '"{0}" multiaddr CIDs must use the "{1}" multicodec'.format( |
| 134 | + proto.name, expected_codec |
| 135 | + ) |
125 | 136 | )
|
126 |
| - ) |
127 |
| - |
128 |
| - # "Implementations SHOULD display peer IDs using the first (raw |
129 |
| - # base58btc encoded multihash) format until the second format is |
130 |
| - # widely supported." |
131 |
| - if expected_codec and _is_binary_cidv0_multihash(parsed.multihash): |
132 |
| - return base58.b58encode(parsed.multihash).decode("ascii") |
133 |
| - |
134 |
| - return parsed.encode("base32").decode("ascii") |
135 |
| - |
136 |
| - |
137 |
| -def _is_binary_cidv0_multihash(buf: bytes) -> bool: |
138 |
| - if buf.startswith(b"\x12\x20") and len(buf) == 34: # SHA2-256 |
139 |
| - return True |
140 | 137 |
|
141 |
| - if (buf[0] == 0x00 and buf[1] in range(43)) and len(buf) == (buf[1] + 2): # Identity hash |
142 |
| - return True |
| 138 | + # "Implementations SHOULD display peer IDs using the first (raw |
| 139 | + # base58btc encoded multihash) format until the second format is |
| 140 | + # widely supported." |
| 141 | + if expected_codec and _is_binary_cidv0_multihash(parsed.multihash): |
| 142 | + return base58.b58encode(parsed.multihash).decode("ascii") |
143 | 143 |
|
144 |
| - return False |
| 144 | + return parsed.encode("base32").decode("ascii") |
| 145 | + except Exception as e: |
| 146 | + raise BinaryParseError(str(e), buf, proto.name, e) from e |
0 commit comments