Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python-client/pypegasus/base/ttypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -310,8 +310,8 @@ def __ne__(self, other):
class host_port_types(Enum):
kHostTypeInvalid = 0
kHostTypeIpv4 = 1
kHostTypeGroup = 2

kHostTypeIpv6 = 2
kHostTypeGroup = 3

class host_port:

Expand Down
187 changes: 167 additions & 20 deletions python-client/pypegasus/pgclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import os
import logging.config
import six
import ipaddress

from thrift.Thrift import TMessageType, TApplicationException
from twisted.internet import defer
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks, succeed, fail
from twisted.internet.defer import inlineCallbacks, succeed, fail, returnValue
from twisted.internet.protocol import ClientCreator
from twisted.names import client, dns

from pypegasus import rrdb, replication
from pypegasus.base.ttypes import *
Expand All @@ -50,7 +52,7 @@
DEFAULT_TIMEOUT = 2000 # ms
META_CHECK_INTERVAL = 2 # s
MAX_TIMEOUT_THRESHOLD = 5 # times

MAX_META_QUERY_THRESHOLD = 1 # times

class BaseSession(object):

Expand Down Expand Up @@ -245,20 +247,146 @@ class MetaSessionManager(SessionManager):
def __init__(self, table_name, timeout):
SessionManager.__init__(self, table_name, timeout)
self.addr_list = []

def add_meta_server(self, meta_addr):
rpc_addr = rpc_address()
if rpc_addr.from_string(meta_addr):
ip_port = meta_addr.split(':')
if not len(ip_port) == 2:
self.host_ports = []
self.query_times = 0

# validate if the given string is a valid IP address
def is_valid_ip(self, address):
try:
socket.inet_pton(socket.AF_INET, address)
return True
except Exception as e:
# maybe ipv6
try:
socket.inet_pton(socket.AF_INET6, address)
return True
except Exception as e:
return False

def get_host_type(self, ip_list):
has_ipv4 = False
has_ipv6 = False

for ip_str in ip_list:
try:
ip = ipaddress.ip_address(ip_str)
if ip.version == 4:
has_ipv4 = True
elif ip.version == 6:
has_ipv6 = True
except ValueError:
continue

if has_ipv4 and has_ipv6:
return host_port_types.kHostTypeGroup
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the server side, kHostTypeGroup means that the host_port class stores a list of addresses. However, in the Python client it is defined as a mix of IPv4 and IPv6, which is inconsistent with the server-side definition. What is the rationale behind this design?

Copy link
Contributor Author

@WJSGDBZ WJSGDBZ Mar 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@empiredan already fix this to be compatible with the server

elif has_ipv4:
return host_port_types.kHostTypeIpv4
elif has_ipv6:
return host_port_types.kHostTypeIpv6
else:
return host_port_types.kHostTypeInvalid

def resolve_all_ips(self, hostname):

def extract_ips(result, record_type):
answers, _, _ = result
ips = []
for answer in answers:
if record_type == dns.A and isinstance(answer.payload, dns.Record_A):
ips.append(answer.payload.dottedQuad())
elif record_type == dns.AAAA and isinstance(answer.payload, dns.Record_AAAA):
if isinstance(answer.payload.address, bytes):
ip6_bytes = answer.payload.address
if len(ip6_bytes) == 16:
ip_str = socket.inet_ntop(socket.AF_INET6, ip6_bytes)
ips.append(ip_str)
else:
logger.error('Invalid IPv6 bytes length: %s', len(ip6_bytes))
else:
ips.append(str(answer.payload.address))
return ips

resolver = client.getResolver()

# query A record (ipv4)
d_a = resolver.query(dns.Query(hostname, dns.A, dns.IN))
d_a_ips = d_a.addCallback(lambda res: extract_ips(res, dns.A))

# query AAAA record (ipv6)
d_aaaa = resolver.query(dns.Query(hostname, dns.AAAA, dns.IN))
d_aaaa_ips = d_aaaa.addCallback(lambda res: extract_ips(res, dns.AAAA))

# gather A record and AAAA record
return defer.gatherResults([d_a_ips, d_aaaa_ips], consumeErrors=True) \
.addCallback(lambda results: results[0] + results[1])

@inlineCallbacks
def add_meta_server(self, meta_addr):
if not isinstance(meta_addr, str):
logger.error("meta_addr must be a string: %s", meta_addr)
returnValue(False)

try:
host, port_str = meta_addr.split(':', 1)
except ValueError:
logger.error("invalid address format (expected host:port): %s", meta_addr)
returnValue(False)

ips = []
if self.is_valid_ip(host):
ips = [host]
else:
try:
ips = yield self.resolve_all_ips(host)
hp = host_port()
if hp.from_string(meta_addr):
hp.type = self.get_host_type(ips)
self.host_ports.append(hp)
logger.info("resolved hostname %s to IP type:%s, addr:%s", hp.host, hp.type, ips)
except Exception as e:
logger.error("failed to resolve hostname %s: %s", host, e)
returnValue(False)

for ip in ips:
self.addr_list.append((ip, int(port_str)))

returnValue(True)

ip, port = ip_port[0], int(ip_port[1])
self.addr_list.append((ip, port))
@inlineCallbacks
def resolve_host(self, err):
if not self.host_ports:
returnValue(None)

new_addr_list = []
for host_port in self.host_ports:
host, port = host_port.to_host_port()
try:
ips = yield self.resolve_all_ips(host)
host_port.type = self.get_host_type(ips)
for ip in ips:
new_addr_list.append((ip, port))
logger.info("resolved hostname %s to IP type:%s, addr:%s", host_port.host, host_port.type, ips)
except Exception as e:
logger.error("failed to resolve hostname %s: %s", host, e)
continue

if not new_addr_list or new_addr_list == self.addr_list:
returnValue(None)

stale_sessions = []
for rpc_addr in list(self.session_dict):
ip, port = rpc_addr.to_ip_port()
if (ip, port) not in new_addr_list:
stale_sessions.append(self.session_dict.pop(rpc_addr))
logger.info("removed stale server: %s:%s", ip, port)

return True
else:
return False
self.addr_list = new_addr_list

for session in stale_sessions:
if session:
reactor.callFromThread(session.close)

returnValue(None)

@inlineCallbacks
def query_one(self, session):
Expand All @@ -276,8 +404,15 @@ def got_results(self, res):
self.name, result)
return result

logger.error('query partition info err. table: %s err: %s',
self.name, res)
# all meta server queries failed, maybe need to re-resolve hostname
# to get new IP addresses
self.query_times += 1
logger.error('query partition info err. table: %s, query_times: %d, err: %s',
self.name, self.query_times, res)

if self.query_times >= MAX_META_QUERY_THRESHOLD:
self.query_times = 0
raise RuntimeError("all meta server queries failed, try to re-resolve hostname")

def query(self):
ds = []
Expand All @@ -301,6 +436,7 @@ def query(self):

dlist = defer.DeferredList(ds, consumeErrors=True)
dlist.addCallback(self.got_results)
dlist.addErrback(self.resolve_host)
return dlist


Expand All @@ -323,7 +459,7 @@ def update_cfg(self, resp):
if resp.__class__.__name__ != "query_cfg_response":
logger.error('table: %s, query_cfg_response is error',
self.name)
return None
return defer.succeed(None)

self.query_cfg_response = resp
self.app_id = self.query_cfg_response.app_id
Expand Down Expand Up @@ -641,23 +777,34 @@ def __init__(self, meta_addrs=None, table_name='',
self.name = table_name
self.table = Table(table_name, self, timeout)
self.meta_session_manager = MetaSessionManager(table_name, timeout)
if isinstance(meta_addrs, list):
for meta_addr in meta_addrs:
self.meta_session_manager.add_meta_server(meta_addr)
self.initial_meta = False
self.meta_addrs = meta_addrs if isinstance(meta_addrs, list) else []
PegasusHash.populate_table()
self.timeout_times = 0
self.update_partition = False
self.timer = reactor.callLater(META_CHECK_INTERVAL, self.check_state)

@inlineCallbacks
def init(self):
"""
Initialize the client before you can use it.

:return: (DeferredList) True when initialized succeed, others when failed.
"""
if not self.initial_meta:
deferreds = []
self.initial_meta = True
for host_port in self.meta_addrs:
d = self.meta_session_manager.add_meta_server(host_port)
deferreds.append(d)
results = yield defer.DeferredList(deferreds, consumeErrors=True)
if not any(success for _, success in results):
raise Exception("all meta servers failed to initialize")

dlist = self.meta_session_manager.query()
dlist.addCallback(self.table.update_cfg)
return dlist
results = yield dlist
defer.returnValue(results)

def close(self):
"""
Expand Down
Loading