Skip to content

Commit 4c9207a

Browse files
isapegoivandasch
andauthored
GG-33939 IGNITE-15118 Implement handshake timeout (#55)
(cherry picked from commit de07126) Co-authored-by: Ivan Daschinsky <[email protected]>
1 parent 8fdc81c commit 4c9207a

File tree

15 files changed

+396
-84
lines changed

15 files changed

+396
-84
lines changed

.travis.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,4 @@ jobs:
5252
env: TOXENV=py39
5353

5454
install: pip install tox
55-
script: tox
55+
script: tox

examples/transactions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,12 @@ def sync_example():
130130

131131

132132
if __name__ == '__main__':
133+
client = Client()
134+
with client.connect('127.0.0.1', 10800):
135+
if not client.protocol_context.is_transactions_supported():
136+
print("'Transactions' API is not supported by cluster. Finishing...")
137+
exit(0)
138+
133139
print("Starting sync example")
134140
sync_example()
135141

pygridgain/aio_client.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
6666
"""
6767
Initialize client.
6868
69+
For the use of the SSL-related parameters see
70+
https://docs.python.org/3/library/ssl.html#ssl-certificates.
71+
6972
:param compact_footer: (optional) use compact (True, recommended) or
7073
full (False) schema approach when serializing Complex objects.
7174
Default is to use the same approach the server is using (None).
@@ -74,7 +77,37 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
7477
:param partition_aware: (optional) try to calculate the exact data
7578
placement from the key before to issue the key operation to the
7679
server node, `True` by default,
77-
:param event_listeners: (optional) event listeners.
80+
:param event_listeners: (optional) event listeners,
81+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
82+
with node. Default is 10.0 seconds,
83+
:param use_ssl: (optional) set to True if Ignite server uses SSL
84+
on its binary connector. Defaults to use SSL when username
85+
and password has been supplied, not to use SSL otherwise,
86+
:param ssl_version: (optional) SSL version constant from standard
87+
`ssl` module. Defaults to TLS v1.2,
88+
:param ssl_ciphers: (optional) ciphers to use. If not provided,
89+
`ssl` default ciphers are used,
90+
:param ssl_cert_reqs: (optional) determines how the remote side
91+
certificate is treated:
92+
93+
* `ssl.CERT_NONE` − remote certificate is ignored (default),
94+
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
95+
if provided,
96+
* `ssl.CERT_REQUIRED` − valid remote certificate is required,
97+
98+
:param ssl_keyfile: (optional) a path to SSL key file to identify
99+
local (client) party,
100+
:param ssl_keyfile_password: (optional) password for SSL key file,
101+
can be provided when key file is encrypted to prevent OpenSSL
102+
password prompt,
103+
:param ssl_certfile: (optional) a path to ssl certificate file
104+
to identify local (client) party,
105+
:param ssl_ca_certfile: (optional) a path to a trusted certificate
106+
or a certificate chain. Required to check the validity of the remote
107+
(server-side) certificate,
108+
:param username: (optional) user name to authenticate to Ignite
109+
cluster,
110+
:param password: (optional) password to authenticate to Ignite cluster.
78111
"""
79112
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
80113
self._registry_mux = asyncio.Lock()

pygridgain/client.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -346,6 +346,9 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
346346
"""
347347
Initialize client.
348348
349+
For the use of the SSL-related parameters see
350+
https://docs.python.org/3/library/ssl.html#ssl-certificates.
351+
349352
:param compact_footer: (optional) use compact (True, recommended) or
350353
full (False) schema approach when serializing Complex objects.
351354
Default is to use the same approach the server is using (None).
@@ -354,7 +357,41 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = True,
354357
:param partition_aware: (optional) try to calculate the exact data
355358
placement from the key before to issue the key operation to the
356359
server node, `True` by default,
357-
:param event_listeners: (optional) event listeners.
360+
:param event_listeners: (optional) event listeners,
361+
:param timeout: (optional) sets timeout (in seconds) for each socket
362+
operation including `connect`. 0 means non-blocking mode, which is
363+
virtually guaranteed to fail. Can accept integer or float value.
364+
Default is None (blocking mode),
365+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
366+
with node. Default is 10.0 seconds,
367+
:param use_ssl: (optional) set to True if Ignite server uses SSL
368+
on its binary connector. Defaults to use SSL when username
369+
and password has been supplied, not to use SSL otherwise,
370+
:param ssl_version: (optional) SSL version constant from standard
371+
`ssl` module. Defaults to TLS v1.2,
372+
:param ssl_ciphers: (optional) ciphers to use. If not provided,
373+
`ssl` default ciphers are used,
374+
:param ssl_cert_reqs: (optional) determines how the remote side
375+
certificate is treated:
376+
377+
* `ssl.CERT_NONE` − remote certificate is ignored (default),
378+
* `ssl.CERT_OPTIONAL` − remote certificate will be validated,
379+
if provided,
380+
* `ssl.CERT_REQUIRED` − valid remote certificate is required,
381+
382+
:param ssl_keyfile: (optional) a path to SSL key file to identify
383+
local (client) party,
384+
:param ssl_keyfile_password: (optional) password for SSL key file,
385+
can be provided when key file is encrypted to prevent OpenSSL
386+
password prompt,
387+
:param ssl_certfile: (optional) a path to ssl certificate file
388+
to identify local (client) party,
389+
:param ssl_ca_certfile: (optional) a path to a trusted certificate
390+
or a certificate chain. Required to check the validity of the remote
391+
(server-side) certificate,
392+
:param username: (optional) user name to authenticate to Ignite
393+
cluster,
394+
:param password: (optional) password to authenticate to Ignite cluster.
358395
"""
359396
super().__init__(compact_footer, partition_aware, event_listeners, **kwargs)
360397

pygridgain/connection/aio_connection.py

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
from collections import OrderedDict
1919
from typing import Union
2020

21-
from tzlocal import get_localzone
22-
2321
from pygridgain.constants import PROTOCOLS, PROTOCOL_BYTE_ORDER
2422
from pygridgain.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
2523
from .bitmask_feature import BitmaskFeature
@@ -106,11 +104,13 @@ def __init__(self, client: 'AioClient', host: str, port: int, username: str = No
106104
:param client: Ignite client object,
107105
:param host: Ignite server node's host name or IP,
108106
:param port: Ignite server node's port number,
107+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
108+
with node. Default is 10.0 seconds,
109109
:param use_ssl: (optional) set to True if Ignite server uses SSL
110110
on its binary connector. Defaults to use SSL when username
111111
and password has been supplied, not to use SSL otherwise,
112112
:param ssl_version: (optional) SSL version constant from standard
113-
`ssl` module. Defaults to TLS v1.1, as in Ignite 2.5,
113+
`ssl` module. Defaults to TLS v1.2,
114114
:param ssl_ciphers: (optional) ciphers to use. If not provided,
115115
`ssl` default ciphers are used,
116116
:param ssl_cert_reqs: (optional) determines how the remote side
@@ -153,7 +153,6 @@ async def connect(self):
153153
"""
154154
if self.alive:
155155
return
156-
self._closed = False
157156
await self._connect()
158157

159158
async def _connect(self):
@@ -164,27 +163,28 @@ async def _connect(self):
164163
detecting_protocol = True
165164
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
166165

167-
try:
168-
self._on_handshake_start()
169-
result = await self._connect_version()
170-
except HandshakeError as e:
171-
if e.expected_version in PROTOCOLS:
172-
self.client.protocol_context.version = e.expected_version
166+
while True:
167+
try:
168+
self._on_handshake_start()
173169
result = await self._connect_version()
174-
else:
170+
self._on_handshake_success(result)
171+
return
172+
except HandshakeError as e:
173+
if e.expected_version in PROTOCOLS:
174+
self.client.protocol_context.version = e.expected_version
175+
continue
176+
else:
177+
self._on_handshake_fail(e)
178+
raise e
179+
except AuthenticationError as e:
175180
self._on_handshake_fail(e)
176181
raise e
177-
except AuthenticationError as e:
178-
self._on_handshake_fail(e)
179-
raise e
180-
except Exception as e:
181-
self._on_handshake_fail(e)
182-
# restore undefined protocol version
183-
if detecting_protocol:
184-
self.client.protocol_context = None
185-
raise e
186-
187-
self._on_handshake_success(result)
182+
except Exception as e:
183+
self._on_handshake_fail(e)
184+
# restore undefined protocol version
185+
if detecting_protocol:
186+
self.client.protocol_context = None
187+
raise e
188188

189189
def process_connection_lost(self, err, reconnect=False):
190190
self.failed = True
@@ -213,11 +213,13 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
213213

214214
ssl_context = create_ssl_context(self.ssl_params)
215215
handshake_fut = self._loop.create_future()
216-
self._transport, _ = await self._loop.create_connection(
217-
lambda: BaseProtocol(self, handshake_fut),
218-
host=self.host, port=self.port, ssl=ssl_context)
219-
220-
hs_response = await handshake_fut
216+
self._closed = False
217+
self._transport, _ = await self._loop.create_connection(lambda: BaseProtocol(self, handshake_fut),
218+
host=self.host, port=self.port, ssl=ssl_context)
219+
try:
220+
hs_response = await asyncio.wait_for(handshake_fut, self.handshake_timeout)
221+
except asyncio.TimeoutError:
222+
raise ConnectionError('timed out')
221223

222224
if hs_response.op_code == 0:
223225
await self.close()

pygridgain/connection/connection.py

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from tzlocal import get_localzone
2222

2323
from pygridgain.constants import PROTOCOLS, DEFAULT_HOST, DEFAULT_PORT, PROTOCOL_BYTE_ORDER
24-
from pygridgain.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError
24+
from pygridgain.exceptions import HandshakeError, SocketError, connection_errors, AuthenticationError, ParameterError
2525
from .bitmask_feature import BitmaskFeature
2626

2727
from .handshake import HandshakeRequest, HandshakeResponse
@@ -36,8 +36,9 @@
3636

3737
class BaseConnection:
3838
def __init__(self, client, host: str = None, port: int = None, username: str = None, password: str = None,
39-
**ssl_params):
39+
handshake_timeout: float = 10.0, **ssl_params):
4040
self.client = client
41+
self.handshake_timeout = handshake_timeout
4142
self.host = host if host else DEFAULT_HOST
4243
self.port = port if port else DEFAULT_PORT
4344
self.username = username
@@ -46,6 +47,9 @@ def __init__(self, client, host: str = None, port: int = None, username: str = N
4647

4748
self.timezone = get_localzone().tzname(None)
4849

50+
if handshake_timeout <= 0.0:
51+
raise ParameterError("handshake_timeout should be positive")
52+
4953
check_ssl_params(ssl_params)
5054

5155
if self.username and self.password and 'use_ssl' not in ssl_params:
@@ -166,8 +170,9 @@ class Connection(BaseConnection):
166170
* binary protocol connector. Encapsulates handshake and failover reconnection.
167171
"""
168172

169-
def __init__(self, client: 'Client', host: str, port: int, timeout: float = None,
170-
username: str = None, password: str = None, **ssl_params):
173+
def __init__(self, client: 'Client', host: str, port: int, username: str = None, password: str = None,
174+
timeout: float = None, handshake_timeout: float = 10.0,
175+
**ssl_params):
171176
"""
172177
Initialize connection.
173178
@@ -181,11 +186,13 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
181186
operation including `connect`. 0 means non-blocking mode, which is
182187
virtually guaranteed to fail. Can accept integer or float value.
183188
Default is None (blocking mode),
189+
:param handshake_timeout: (optional) sets timeout (in seconds) for performing handshake (connection)
190+
with node. Default is 10.0.
184191
:param use_ssl: (optional) set to True if GridGain server uses SSL
185192
on its binary connector. Defaults to use SSL when username
186193
and password has been supplied, not to use SSL otherwise,
187194
:param ssl_version: (optional) SSL version constant from standard
188-
`ssl` module. Defaults to TLS v1.1, as in GridGain 8.5,
195+
`ssl` module. Defaults to TLS v1.2,
189196
:param ssl_ciphers: (optional) ciphers to use. If not provided,
190197
`ssl` default ciphers are used,
191198
:param ssl_cert_reqs: (optional) determines how the remote side
@@ -211,7 +218,7 @@ def __init__(self, client: 'Client', host: str, port: int, timeout: float = None
211218
:param password: (optional) password to authenticate to GridGain
212219
cluster.
213220
"""
214-
super().__init__(client, host, port, username, password, **ssl_params)
221+
super().__init__(client, host, port, username, password, handshake_timeout, **ssl_params)
215222
self.timeout = timeout
216223
self._socket = None
217224

@@ -230,27 +237,29 @@ def connect(self):
230237
detecting_protocol = True
231238
self.client.protocol_context = ProtocolContext(max(PROTOCOLS), BitmaskFeature.all_supported())
232239

233-
try:
234-
self._on_handshake_start()
235-
result = self._connect_version()
236-
except HandshakeError as e:
237-
if e.expected_version in PROTOCOLS:
238-
self.client.protocol_context.version = e.expected_version
240+
while True:
241+
try:
242+
self._on_handshake_start()
239243
result = self._connect_version()
240-
else:
244+
self._socket.settimeout(self.timeout)
245+
self._on_handshake_success(result)
246+
return
247+
except HandshakeError as e:
248+
if e.expected_version in PROTOCOLS:
249+
self.client.protocol_context.version = e.expected_version
250+
continue
251+
else:
252+
self._on_handshake_fail(e)
253+
raise e
254+
except AuthenticationError as e:
241255
self._on_handshake_fail(e)
242256
raise e
243-
except AuthenticationError as e:
244-
self._on_handshake_fail(e)
245-
raise e
246-
except Exception as e:
247-
self._on_handshake_fail(e)
248-
# restore undefined protocol version
249-
if detecting_protocol:
250-
self.client.protocol_context = None
251-
raise e
252-
253-
self._on_handshake_success(result)
257+
except Exception as e:
258+
self._on_handshake_fail(e)
259+
# restore undefined protocol version
260+
if detecting_protocol:
261+
self.client.protocol_context = None
262+
raise e
254263

255264
def _connect_version(self) -> Union[dict, OrderedDict]:
256265
"""
@@ -259,7 +268,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]:
259268
"""
260269

261270
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
262-
self._socket.settimeout(self.timeout)
271+
self._socket.settimeout(self.handshake_timeout)
263272
self._socket = wrap(self._socket, self.ssl_params)
264273
self._socket.connect((self.host, self.port))
265274

pygridgain/connection/protocol_context.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,9 @@ def __eq__(self, other):
4141
def __str__(self):
4242
return f'ProtocolContext(version={self._version}, features={self._features})'
4343

44+
def __repr__(self):
45+
return self.__str__()
46+
4447
def _ensure_consistency(self):
4548
if not self.is_feature_flags_supported():
4649
self._features = None

pygridgain/transaction.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525

2626
def _validate_int_enum_param(value: Union[int, IntEnum], cls: Type[IntEnum]):
27-
if value not in cls:
27+
if value not in set(v.value for v in cls): # Use this trick to disable warning on python 3.7
2828
raise ValueError(f'{value} not in {cls}')
2929
return value
3030

0 commit comments

Comments
 (0)