22asynchronous client
33"""
44
5- import aiohttp
65import json
7- import six
86import ssl
97import warnings
8+
9+ import aiohttp
10+ import six
1011from aiohttp .client import _RequestContextManager
1112
1213from .baseclient import BaseClient
1314from .baseclient import BaseModelizedStreamResponse
15+ from .baseclient import DEFAULT_VERSION
1416from .errors import Etcd3Exception
1517from .errors import Etcd3StreamError
1618from .errors import get_client_error
1719from .utils import iter_json_string , Etcd3Warning
1820
1921
2022class ModelizedResponse (object ):
21- def __init__ (self , method , resp , decode = True ):
23+ def __init__ (self , client , method , resp , decode = True ):
24+ self .client = client
2225 self ._coro = resp
2326 self ._method = method
2427 self ._resp = None
2528 self ._decode = decode
2629
2730 async def __modelize (self ):
2831 self ._resp = await self ._coro
29- await AioClient ._raise_for_status (self ._resp )
32+ await self . client ._raise_for_status (self ._resp )
3033 data = await self ._resp .json ()
31- return AioClient ._modelizeResponseData (self ._method , data , self ._decode )
34+ return self . client ._modelizeResponseData (self ._method , data , self ._decode )
3235
3336 def __await__ (self ):
3437 return self .__modelize ().__await__ ()
@@ -39,10 +42,11 @@ class ModelizedStreamResponse(BaseModelizedStreamResponse):
3942 Model of a stream response
4043 """
4144
42- def __init__ (self , method , resp , decode = True ):
45+ def __init__ (self , client , method , resp , decode = True ):
4346 """
4447 :param resp: aiohttp.ClientResponse
4548 """
49+ self .client = client
4650 self .resp = resp
4751 self .decode = decode
4852 self .method = method
@@ -75,17 +79,20 @@ async def __aiter__(self):
7579 async def __anext__ (self ):
7680 if isinstance (self .resp , _RequestContextManager ):
7781 self .resp = await self .resp
78- await AioClient ._raise_for_status (self .resp )
82+ await self . client ._raise_for_status (self .resp )
7983 data = await self .resp_iter .next ()
8084 data = json .loads (str (data , encoding = 'utf-8' ))
8185 if data .get ('error' ): # pragma: no cover
8286 # {"error":{"grpc_code":14,"http_code":503,"message":"rpc error: code = Unavailable desc = transport is closing","http_status":"Service Unavailable"}}
8387 err = data .get ('error' )
8488 raise get_client_error (err .get ('message' ), code = err .get ('code' ), status = err .get ('http_code' ))
85- return AioClient ._modelizeResponseData (self .method , data , decode = self .decode )
89+ r = self .client ._modelizeResponseData (self .method , data , decode = self .decode )
90+ if r .result :
91+ r = r .result
92+ return r
8693
8794
88- class ResponseIter ():
95+ class ResponseIter (object ):
8996 """
9097 yield response content by every json object
9198 we don't yield by line, because the content of etcd's gRPC-JSON-Gateway stream response
@@ -126,14 +133,16 @@ async def next(self):
126133
127134
128135class AioClient (BaseClient ):
129- def __init__ (self , host = 'localhost ' , port = 2379 , protocol = 'http' ,
136+ def __init__ (self , host = '127.0.0.1 ' , port = 2379 , protocol = 'http' ,
130137 cert = (), verify = None ,
131138 timeout = None , headers = None , user_agent = None , pool_size = 30 ,
132- username = None , password = None , token = None ):
139+ username = None , password = None , token = None ,
140+ server_version = DEFAULT_VERSION , cluster_version = DEFAULT_VERSION ):
133141 super (AioClient , self ).__init__ (host = host , port = port , protocol = protocol ,
134142 cert = cert , verify = verify ,
135143 timeout = timeout , headers = headers , user_agent = user_agent , pool_size = pool_size ,
136- username = username , password = password , token = token )
144+ username = username , password = password , token = token ,
145+ server_version = server_version , cluster_version = cluster_version )
137146 self .ssl_context = None
138147 if self .cert :
139148 if verify is False :
@@ -151,8 +160,7 @@ def __init__(self, host='localhost', port=2379, protocol='http',
151160 # the ssl problem is a pain in the ass, seems i can never get it right
152161 # https://github.com/requests/requests/issues/1847
153162 # https://stackoverflow.com/questions/44316292/ssl-sslerror-tlsv1-alert-protocol-version
154- self .ssl_context = ssl_context = ssl .SSLContext ()
155- ssl_context .protocol = ssl .PROTOCOL_TLS
163+ self .ssl_context = ssl_context = ssl .SSLContext (protocol = ssl .PROTOCOL_TLS )
156164 if not hasattr (ssl , 'PROTOCOL_TLSv1_1' ): # should support TLSv1.2 to pass the test
157165 warnings .warn (Etcd3Warning ("the openssl version of your python is too old to support TLSv1.1+,"
158166 "please upgrade you python" ))
@@ -174,13 +182,11 @@ async def __aenter__(self):
174182 async def __aexit__ (self , exc_type , exc_val , exc_tb ):
175183 await self .close ()
176184
177- @classmethod
178- def _modelizeResponse (cls , method , resp , decode = True ):
179- return ModelizedResponse (method , resp , decode )
185+ def _modelizeResponse (self , method , resp , decode = True ):
186+ return ModelizedResponse (self , method , resp , decode )
180187
181- @classmethod
182- def _modelizeStreamResponse (cls , method , resp , decode = True ):
183- return ModelizedStreamResponse (method , resp , decode )
188+ def _modelizeStreamResponse (self , method , resp , decode = True ):
189+ return ModelizedStreamResponse (self , method , resp , decode )
184190
185191 def _get (self , url , ** kwargs ):
186192 r"""
@@ -212,7 +218,7 @@ async def _raise_for_status(resp):
212218 try :
213219 data = await resp .json ()
214220 except Exception :
215- error = resp .content
221+ error = resp ._content or resp . reason
216222 code = 2
217223 else :
218224 error = data .get ('error' )
0 commit comments