Skip to content

Commit dc2407e

Browse files
committed
Add support for multiple endpoints
This commits adds a new `endpoints` parameter for the `BaseClient` class. This parameter, which must be used alternatively to `host` and `port`, can be set to a list of EtcdEndpoint, which is a simple data class with `address` and `port` attributes. A new decorator is present: `retry all hosts`. This decorator if applied to a function of a class descendant from `BaseClient` has the function calls retried against all the `endpoints` if an exception occours. If one of the tries returns with no errors the return value is propagated; if all the tries throws exceptions of the same type the first one is propagated; if they throw different exceptions an Etcd3Exception is thrown. All the failed tries are logged.
1 parent 0f6d32b commit dc2407e

File tree

4 files changed

+76
-18
lines changed

4 files changed

+76
-18
lines changed

etcd3/aio_client.py

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import six
1111
from aiohttp.client import _RequestContextManager
1212

13+
from .baseclient import retry_all_hosts
1314
from .baseclient import BaseClient
1415
from .baseclient import BaseModelizedStreamResponse
1516
from .baseclient import DEFAULT_VERSION
@@ -133,16 +134,17 @@ async def next(self):
133134

134135

135136
class AioClient(BaseClient):
136-
def __init__(self, host='127.0.0.1', port=2379, protocol='http',
137+
def __init__(self, host=None, port=None, endpoints=None, protocol='http',
137138
cert=(), verify=None,
138139
timeout=None, headers=None, user_agent=None, pool_size=30,
139140
username=None, password=None, token=None,
140141
server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION):
141-
super(AioClient, self).__init__(host=host, port=port, protocol=protocol,
142-
cert=cert, verify=verify,
143-
timeout=timeout, headers=headers, user_agent=user_agent, pool_size=pool_size,
144-
username=username, password=password, token=token,
145-
server_version=server_version, cluster_version=cluster_version)
142+
super(AioClient, self).__init__(
143+
host=host, port=port, endpoints=endpoints, protocol=protocol,
144+
cert=cert, verify=verify, timeout=timeout, headers=headers,
145+
user_agent=user_agent, pool_size=pool_size,
146+
username=username, password=password, token=token,
147+
server_version=server_version, cluster_version=cluster_version)
146148
self.ssl_context = None
147149
if self.cert:
148150
if verify is False:
@@ -225,6 +227,7 @@ async def _raise_for_status(resp):
225227
code = data.get('code')
226228
raise get_client_error(error, code, status, resp)
227229

230+
@retry_all_hosts
228231
def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs):
229232
"""
230233
call ETCDv3 RPC and return response object

etcd3/baseclient.py

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from .apis import MaintenanceAPI
1818
from .apis import WatchAPI
1919
from .errors import UnsupportedServerVersion
20+
from .errors import Etcd3Exception
2021
from .stateful import Lease
2122
from .stateful import Lock
2223
from .stateful import Txn
@@ -25,6 +26,8 @@
2526
from .swaggerdefs import get_spec
2627
from .utils import Etcd3Warning
2728
from .utils import log
29+
from .utils import check_param
30+
from .utils import EtcdEndpoint
2831
from .version import __version__
2932

3033

@@ -48,15 +51,54 @@ def __iter__(self):
4851
DEFAULT_VERSION = '3.3.0'
4952

5053

54+
def retry_all_hosts(func):
55+
def wrapper(self, *args, **kwargs):
56+
errors = []
57+
got_result = False
58+
for i in range(len(self.endpoints)):
59+
endpoint = self.endpoints.pop(0)
60+
self.endpoints.append(endpoint)
61+
self.host = endpoint.host
62+
self.port = endpoint.port
63+
try:
64+
ret = func(self, *args, **kwargs)
65+
got_result = True
66+
break
67+
except Exception as e:
68+
errors.append(e)
69+
log.warning('Failed to call %s(args: %s, kwargs: %s) on '
70+
'endpoint %s' %
71+
(func.__name__, args, kwargs, endpoint))
72+
if not got_result:
73+
exception_types = [x.__class__ for x in errors]
74+
if len(set(exception_types)) == 1:
75+
log.error('Failed to call %s(args: %s, kwargs: %s) on all '
76+
'endpoints: %s. Got errors: %s' %
77+
(func.__name__, args, kwargs, self.endpoints, errors))
78+
raise errors[0]
79+
else:
80+
raise Etcd3Exception(
81+
'Failed to call %s(args: %s, kwargs: %s) on all '
82+
'endpoints: %s. Got errors: %s' %
83+
(func.__name__, args, kwargs, self.endpoints, errors))
84+
# elif len(errors) > 0:
85+
# log.warning('Got errors %s, retried successfully')
86+
return ret
87+
return wrapper
88+
89+
5190
class BaseClient(AuthAPI, ClusterAPI, KVAPI, LeaseAPI, MaintenanceAPI,
5291
WatchAPI, ExtraAPI, LockAPI):
53-
def __init__(self, host='127.0.0.1', port=2379, protocol='http',
54-
cert=(), verify=None,
55-
timeout=None, headers=None, user_agent=None, pool_size=30,
92+
@check_param(at_most_one_of=['port', 'endpoints'], at_least_one_of=['port', 'endpoints'])
93+
@check_param(at_most_one_of=['host', 'endpoints'], at_least_one_of=['host', 'endpoints'])
94+
def __init__(self, host=None, port=None, endpoints=None, protocol='http', cert=(),
95+
verify=None, timeout=None, headers=None, user_agent=None, pool_size=30,
5696
username=None, password=None, token=None,
5797
server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION):
58-
self.host = host
59-
self.port = port
98+
if host is not None:
99+
self.endpoints = ([EtcdEndpoint(host, port)])
100+
else:
101+
self.endpoints = endpoints
60102
self.cert = cert
61103
self.protocol = protocol
62104
if cert:
@@ -79,6 +121,7 @@ def __init__(self, host='127.0.0.1', port=2379, protocol='http',
79121
self._get_prefix()
80122
self.api_spec = SwaggerSpec(get_spec(self.server_version))
81123

124+
@retry_all_hosts
82125
def _retrieve_version(self): # pragma: no cover
83126
try:
84127
import requests

etcd3/client.py

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import six
99

1010
from .baseclient import BaseClient
11+
from .baseclient import retry_all_hosts
1112
from .baseclient import BaseModelizedStreamResponse
1213
from .baseclient import DEFAULT_VERSION
1314
from .errors import Etcd3Exception
@@ -86,8 +87,8 @@ def iter_response(resp):
8687

8788

8889
class Client(BaseClient):
89-
def __init__(self, host='127.0.0.1', port=2379, protocol='http',
90-
cert=(), verify=None,
90+
def __init__(self, host=None, port=None, endpoints=None,
91+
protocol='http', cert=(), verify=None,
9192
timeout=None, headers=None, user_agent=None, pool_size=30,
9293
username=None, password=None, token=None, max_retries=0,
9394
server_version=DEFAULT_VERSION, cluster_version=DEFAULT_VERSION):
@@ -100,11 +101,12 @@ def __init__(self, host='127.0.0.1', port=2379, protocol='http',
100101
which we retry a request, import urllib3's ``Retry`` class and pass
101102
that instead.
102103
"""
103-
super(Client, self).__init__(host=host, port=port, protocol=protocol,
104-
cert=cert, verify=verify,
105-
timeout=timeout, headers=headers, user_agent=user_agent, pool_size=pool_size,
106-
username=username, password=password, token=token,
107-
server_version=server_version, cluster_version=cluster_version)
104+
super(Client, self).__init__(
105+
host=host, port=port, endpoints=endpoints, protocol=protocol,
106+
cert=cert, verify=verify, timeout=timeout, headers=headers,
107+
user_agent=user_agent, pool_size=pool_size,
108+
username=username, password=password, token=token,
109+
server_version=server_version, cluster_version=cluster_version)
108110
self._session = requests.session()
109111
self._session.cert = self.cert
110112
self._session.verify = self.verify
@@ -164,6 +166,7 @@ def _post(self, url, data=None, json=None, **kwargs):
164166
"""
165167
return self._session.post(url, data=data, json=json, **kwargs)
166168

169+
@retry_all_hosts
167170
def call_rpc(self, method, data=None, stream=False, encode=True, raw=False, **kwargs): # TODO: add modelize param
168171
"""
169172
call ETCDv3 RPC and return response object

etcd3/utils.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -382,3 +382,12 @@ def find_executable(executable, path=None): # pragma: no cover
382382
f = os.path.join(p, execname)
383383
if os.path.isfile(f):
384384
return f
385+
386+
387+
class EtcdEndpoint():
388+
def __init__(self, host='127.0.0.1', port=2379):
389+
self.host = host
390+
self.port = port
391+
392+
def __repr__(self):
393+
return "EtcdEndpoint(host=%s, port=%s)" % (self.host, self.port)

0 commit comments

Comments
 (0)