11from __future__ import annotations
22
3+ import ipaddress
34from typing import TYPE_CHECKING , Any , Optional
45
56from graphene import Boolean , Field , Int , List , ObjectType , String
67from infrahub_sdk .utils import extract_fields_first_node , is_valid_uuid
78
89from infrahub .core .constants import InfrahubKind
910from infrahub .core .manager import NodeManager
10- from infrahub .core .utils import collapse_ipv6
1111
1212if TYPE_CHECKING :
1313 from graphql import GraphQLResolveInfo
@@ -30,6 +30,72 @@ class NodeEdges(ObjectType):
3030 edges = Field (List (of_type = NodeEdge , required = True ), required = False )
3131
3232
33+ def _collapse_ipv6 (s : str ) -> str :
34+ """Collapse an ipv6 address, ipv6 network, or a partial ipv6 address in extended format, into its collapsed form.
35+ Raises an error if input does not resemble an IPv6 address in extended format. It means this function also raises
36+ an error if input string is the start of an IPv6 address in collapsed format.
37+ """
38+
39+ try :
40+ return str (ipaddress .IPv6Address (s ))
41+ except ipaddress .AddressValueError :
42+ pass
43+
44+ try :
45+ return ipaddress .IPv6Network (s ).with_prefixlen
46+ except ipaddress .AddressValueError :
47+ pass
48+
49+ # Input string might be an incomplete address in IPv6 format,
50+ # in which case we would like the collapsed form equivalent of this incomplete address for matching purposes.
51+ # To get it, we first try to pad the incomplete address with zeros, then we retrieve the collapsed form
52+ # of the full address, and we remove extra "::" or ":0" at the end of it.
53+
54+ error_message = "Input string does not match IPv6 extended format"
55+
56+ # Input string cannot be an IPv6 in extended format if it contains ":"
57+ if "::" in s :
58+ raise ValueError (error_message )
59+
60+ # Add padding to complete the address if needed
61+ segments = s .split (":" )
62+
63+ if len (segments ) == 0 :
64+ raise ValueError (error_message )
65+
66+ # If any of the non-last segments has less than 4 characters it means we deal with
67+ # a IPv6 collapsed form or an invalid address
68+ for segment in segments [:- 1 ]:
69+ if len (segment ) != 4 :
70+ raise ValueError (error_message )
71+
72+ # Add 0 padding to last segment
73+ if len (segments [- 1 ]) > 4 :
74+ raise ValueError (error_message )
75+
76+ segments [- 1 ] += "0" * (4 - len (segments [- 1 ]))
77+
78+ # Complete the address to have 8 segments by padding with zeros
79+ while len (segments ) < 8 :
80+ segments .append ("0000" )
81+
82+ # Create a full IPv6 address from the partial input
83+ full_address = ":" .join (segments )
84+
85+ # Create an IPv6Address object for validation and to build IPv6 collapsed form.
86+ ipv6_address = ipaddress .IPv6Address (full_address )
87+
88+ compressed_address = ipv6_address .compressed
89+
90+ # We padded with zeros so address might endswith "::" or ":0".
91+ if compressed_address .endswith (("::" , ":0" )):
92+ return compressed_address [:- 2 ]
93+
94+ # Otherwise, it means 8th segment of ipv6 address was not full and not composed of 0 only
95+ # e.g. 2001:0db8:0000:0000:0000:0000:03
96+ return compressed_address
97+
98+
3399async def search_resolver (
34100 root : dict , # pylint: disable=unused-argument
35101 info : GraphQLResolveInfo ,
@@ -52,8 +118,8 @@ async def search_resolver(
52118 else :
53119 try :
54120 # Convert any IPv6 address, network or partial address to collapsed format as it might be stored in db.
55- q = collapse_ipv6 (q )
56- except ValueError :
121+ q = _collapse_ipv6 (q )
122+ except ( ValueError , ipaddress . AddressValueError ) :
57123 pass
58124
59125 result .extend (
0 commit comments