22import socket
33import ssl # Python's ssl module implements TLS (despite the name)
44import grpc
5- from typing import Optional , Callable
5+ from typing import Optional
66from hiero_sdk_python .account .account_id import AccountId
77from hiero_sdk_python .channels import _Channel
88from hiero_sdk_python .address_book .node_address import NodeAddress
@@ -91,7 +91,7 @@ def __init__(self, account_id: AccountId, address: str, address_book: NodeAddres
9191 self ._address : _ManagedNodeAddress = _ManagedNodeAddress ._from_string (address )
9292 self ._verify_certificates : bool = True
9393 self ._root_certificates : Optional [bytes ] = None
94- self ._authority_override : Optional [str ] = self . _determine_authority_override ()
94+ self ._node_pem_cert : Optional [bytes ] = None
9595
9696 def _close (self ):
9797 """
@@ -115,13 +115,23 @@ def _get_channel(self):
115115 return self ._channel
116116
117117 if self ._address ._is_transport_security ():
118+ if self ._root_certificates :
119+ # Use the certificate that is provided
120+ self ._node_pem_cert = self ._root_certificates
121+ else :
122+ # Fetch pem_cert for the node
123+ self ._node_pem_cert = self ._fetch_server_certificate_pem ()
124+
125+ if not self ._node_pem_cert :
126+ raise ValueError ("No certificate available." )
127+
118128 # Validate certificate if verification is enabled
119129 if self ._verify_certificates :
120- self ._validate_tls_certificate_with_trust_manager ()
130+ self ._validate_tls_certificate_with_trust_manager ()
121131
122132 options = self ._build_channel_options ()
123133 credentials = grpc .ssl_channel_credentials (
124- root_certificates = self ._root_certificates ,
134+ root_certificates = self ._node_pem_cert ,
125135 private_key = None ,
126136 certificate_chain = None ,
127137 )
@@ -141,7 +151,9 @@ def _apply_transport_security(self, enabled: bool):
141151 return
142152 if not enabled and not self ._address ._is_transport_security ():
143153 return
154+
144155 self ._close ()
156+
145157 if enabled :
146158 self ._address = self ._address ._to_secure ()
147159 else :
@@ -154,39 +166,48 @@ def _set_root_certificates(self, root_certificates: Optional[bytes]):
154166 self ._root_certificates = root_certificates
155167 if self ._channel and self ._address ._is_transport_security ():
156168 self ._close ()
169+
157170 def _set_verify_certificates (self , verify : bool ):
158171 """
159172 Set whether TLS certificates should be verified.
160173 """
161174 if self ._verify_certificates == verify :
162175 return
176+
163177 self ._verify_certificates = verify
178+
164179 if verify and self ._channel and self ._address ._is_transport_security ():
165180 # Force channel recreation to ensure certificates are revalidated.
166181 self ._close ()
167182
168- def _determine_authority_override (self ) -> Optional [str ]:
169- """
170- Determine the hostname to use for TLS authority override.
171- """
172- if not self ._address_book or not self ._address_book ._addresses : # pylint: disable=protected-access
173- return None
174- for endpoint in self ._address_book ._addresses : # pylint: disable=protected-access
175- domain = endpoint .get_domain_name ()
176- if domain :
177- return domain
178- return None
179-
180183 def _build_channel_options (self ):
181184 """
182185 Build gRPC channel options for TLS connections.
186+
187+ The options `grpc.default_authority` and `grpc.ssl_target_name_override`
188+ are intentionally set to a fixed value ("127.0.0.1") to bypass standard
189+ TLS hostname verification.
190+
191+ This is REQUIRED because Hedera nodes are connected to via IP addresses
192+ from the address book, while their TLS certificates are not issued for
193+ those IPs. As a result, standard hostname verification would fail even
194+ for legitimate nodes.
195+
196+ Although hostname verification is disabled, transport security is NOT
197+ weakened. Instead of relying on hostnames, the SDK validates the server
198+ by performing certificate hash pinning. This guarantees the client is
199+ communicating with the correct Hedera node regardless of the hostname
200+ or IP address used to connect.
183201 """
184- if not self ._authority_override :
185- return None
186- host = self ._address ._get_host ()
187- if host == self ._authority_override :
188- return None
189- return [('grpc.ssl_target_name_override' , self ._authority_override )]
202+ options = [
203+ ("grpc.default_authority" , "127.0.0.1" ),
204+ ("grpc.ssl_target_name_override" , "127.0.0.1" ),
205+ ("grpc.keepalive_time_ms" , 100000 ),
206+ ("grpc.keepalive_timeout_ms" , 10000 ),
207+ ("grpc.keepalive_permit_without_calls" , 1 )
208+ ]
209+
210+ return options
190211
191212 def _validate_tls_certificate_with_trust_manager (self ):
192213 """
@@ -197,9 +218,7 @@ def _validate_tls_certificate_with_trust_manager(self):
197218 Note: If verification is enabled but no cert hash is available (e.g., in unit tests
198219 without address books), validation is skipped rather than raising an error.
199220 """
200- if not self ._address ._is_transport_security ():
201- return
202- if not self ._verify_certificates :
221+ if not self ._address ._is_transport_security () or not self ._verify_certificates :
203222 return
204223
205224 cert_hash = None
@@ -214,10 +233,7 @@ def _validate_tls_certificate_with_trust_manager(self):
214233
215234 # Create trust manager and validate certificate
216235 trust_manager = _HederaTrustManager (cert_hash , self ._verify_certificates )
217-
218- # Fetch server certificate and validate
219- pem_cert = self ._fetch_server_certificate_pem ()
220- trust_manager .check_server_trusted (pem_cert )
236+ trust_manager .check_server_trusted (self ._node_pem_cert )
221237
222238 @staticmethod
223239 def _normalize_cert_hash (cert_hash : bytes ) -> str :
@@ -228,6 +244,7 @@ def _normalize_cert_hash(cert_hash: bytes) -> str:
228244 decoded = cert_hash .decode ('utf-8' ).strip ().lower ()
229245 if decoded .startswith ("0x" ):
230246 decoded = decoded [2 :]
247+
231248 return decoded
232249 except UnicodeDecodeError :
233250 return cert_hash .hex ()
@@ -239,12 +256,22 @@ def _fetch_server_certificate_pem(self) -> bytes:
239256 Returns:
240257 bytes: PEM-encoded certificate bytes
241258 """
259+ if not self ._address_book :
260+ return None
261+
242262 host = self ._address ._get_host ()
243263 port = self ._address ._get_port ()
244- server_hostname = self . _authority_override or host
264+ server_hostname = host
245265
246266 # Create TLS context that accepts any certificate (we validate hash ourselves)
247267 context = ssl .create_default_context ()
268+ # Restrict SSL/TLS versions to TLSv1.2+ only for security
269+ if hasattr (context , 'minimum_version' ) and hasattr (ssl , 'TLSVersion' ):
270+ context .minimum_version = ssl .TLSVersion .TLSv1_2
271+ else :
272+ # Backwards compatibility for Python <3.7 that lacks minimum_version
273+ context .options |= ssl .OP_NO_TLSv1 | ssl .OP_NO_TLSv1_1
274+
248275 context .check_hostname = False
249276 context .verify_mode = ssl .CERT_NONE
250277
@@ -254,4 +281,4 @@ def _fetch_server_certificate_pem(self) -> bytes:
254281
255282 # Convert DER to PEM format (matching Java's PEM encoding)
256283 pem_cert = ssl .DER_cert_to_PEM_cert (der_cert ).encode ('utf-8' )
257- return pem_cert
284+ return pem_cert
0 commit comments