Skip to content

Commit 522c73e

Browse files
committed
Added ping background coroutine to check the connection
1 parent 7d34c96 commit 522c73e

File tree

6 files changed

+111
-29
lines changed

6 files changed

+111
-29
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ New features:
1717
request.
1818
* Added `Connection.sql` method to execute SQL statements for Tarantool 2
1919
(see asynctnt docs for details).
20+
* Added internal background coroutine with pings periodically a Tarantool
21+
instance to check if it is alive and to refresh schema if it is changed
22+
(default period is 5 seconds and is configured by `Connection.ping_timeout`
23+
parameter).
2024

2125
Changes:
2226
* Iteration over TarantoolTuple results in iterating over a raw tuple by

asynctnt/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,4 @@
33
Iterator, Response, TarantoolTuple, PushIterator
44
)
55

6-
__version__ = '1.0b2'
6+
__version__ = '1.0b3'

asynctnt/_testbase.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,7 @@ async def tnt_connect(self, *,
195195
fetch_schema=True,
196196
auto_refetch_schema=False,
197197
connect_timeout=None, reconnect_timeout=1/3,
198+
ping_timeout=0,
198199
request_timeout=None, encoding='utf-8',
199200
initial_read_buffer_size=None):
200201
self._conn = asynctnt.Connection(
@@ -207,6 +208,7 @@ async def tnt_connect(self, *,
207208
connect_timeout=connect_timeout,
208209
reconnect_timeout=reconnect_timeout,
209210
request_timeout=request_timeout,
211+
ping_timeout=ping_timeout,
210212
encoding=encoding,
211213
initial_read_buffer_size=initial_read_buffer_size,
212214
loop=self.loop)

asynctnt/connection.py

Lines changed: 68 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,11 @@ class Connection:
3131
'_host', '_port', '_username', '_password',
3232
'_fetch_schema', '_auto_refetch_schema', '_initial_read_buffer_size',
3333
'_encoding', '_connect_timeout', '_reconnect_timeout',
34-
'_request_timeout', '_loop', '_state', '_state_prev',
34+
'_request_timeout', '_ping_timeout', '_loop', '_state', '_state_prev',
3535
'_transport', '_protocol', '_db',
36-
'_disconnect_waiter', '_reconnect_coro',
37-
'_connect_lock', '_disconnect_lock'
36+
'_disconnect_waiter', '_reconnect_task',
37+
'_connect_lock', '_disconnect_lock',
38+
'_ping_task', '__create_task'
3839
)
3940

4041
def __init__(self, *,
@@ -44,9 +45,10 @@ def __init__(self, *,
4445
password=None,
4546
fetch_schema=True,
4647
auto_refetch_schema=True,
47-
connect_timeout=3,
48-
request_timeout=-1,
48+
connect_timeout=3.,
49+
request_timeout=-1.,
4950
reconnect_timeout=1. / 3.,
51+
ping_timeout=5.,
5052
encoding=None,
5153
initial_read_buffer_size=None,
5254
loop=None):
@@ -99,6 +101,12 @@ def __init__(self, *,
99101
:param reconnect_timeout:
100102
Time in seconds to wait before automatic reconnect
101103
(set to ``0`` or ``None`` to disable auto reconnect)
104+
:param ping_timeout:
105+
If specified (default is 5 seconds) a background task
106+
will be created which will ping Tarantool instance
107+
periodically to check if it is alive and update schema
108+
if it is changed
109+
(set to ``0`` or ``None`` to disable this task)
102110
:param encoding:
103111
The encoding to use for all strings
104112
encoding and decoding (default is ``utf-8``)
@@ -128,6 +136,7 @@ def __init__(self, *,
128136
self._connect_timeout = connect_timeout
129137
self._reconnect_timeout = reconnect_timeout or 0
130138
self._request_timeout = request_timeout
139+
self._ping_timeout = ping_timeout or 0
131140

132141
self._loop = loop or asyncio.get_event_loop()
133142

@@ -138,9 +147,16 @@ def __init__(self, *,
138147
self._state = ConnectionState.DISCONNECTED
139148
self._state_prev = ConnectionState.DISCONNECTED
140149
self._disconnect_waiter = None
141-
self._reconnect_coro = None
150+
self._reconnect_task = None
142151
self._connect_lock = asyncio.Lock(loop=self._loop)
143152
self._disconnect_lock = asyncio.Lock(loop=self._loop)
153+
self._ping_task = None
154+
155+
if False and hasattr(self._loop, 'create_task'):
156+
self.__create_task = self._loop.create_task
157+
else:
158+
self.__create_task = functools.partial(asyncio.ensure_future,
159+
loop=self._loop)
144160

145161
def _set_state(self, new_state):
146162
if self._state != new_state:
@@ -168,22 +184,32 @@ def connection_lost(self, exc):
168184
# should not reconnect, close everything
169185
self.close()
170186

187+
async def _ping_task_func(self):
188+
while self._state == ConnectionState.CONNECTED:
189+
try:
190+
await self.ping(timeout=2.0)
191+
except asyncio.CancelledError:
192+
break
193+
except Exception as e:
194+
pass
195+
196+
await asyncio.sleep(self._ping_timeout, loop=self._loop)
197+
171198
def _start_reconnect(self, return_exceptions=False):
172199
if self._state in [ConnectionState.CONNECTING,
173200
ConnectionState.RECONNECTING]:
174201
logger.debug('%s Cannot start reconnect: already reconnecting',
175202
self.fingerprint)
176203
return
177204

178-
if self._reconnect_coro:
205+
if self._reconnect_task:
179206
return
180207

181208
logger.info('%s Started reconnecting', self.fingerprint)
182209
self._set_state(ConnectionState.RECONNECTING)
183210

184-
self._reconnect_coro = asyncio.ensure_future(
185-
self._connect(return_exceptions=return_exceptions),
186-
loop=self._loop
211+
self._reconnect_task = self.__create_task(
212+
self._connect(return_exceptions=return_exceptions)
187213
)
188214

189215
def protocol_factory(self, connected_fut, cls=protocol.Protocol):
@@ -211,7 +237,7 @@ async def _connect(self, return_exceptions=True):
211237
}
212238

213239
if self._state in ignore_states:
214-
self._reconnect_coro = None
240+
self._reconnect_task = None
215241
return
216242

217243
self._set_state(ConnectionState.CONNECTING)
@@ -222,6 +248,9 @@ async def full_connect():
222248

223249
if self._host.startswith('unix/'):
224250
unix_path = self._port
251+
assert isinstance(unix_path, str), \
252+
'port must be a str instance for ' \
253+
'unix socket'
225254
assert unix_path, \
226255
'No unix file path specified'
227256
assert os.path.exists(unix_path), \
@@ -271,8 +300,12 @@ async def full_connect():
271300
self._transport = tr
272301
self._protocol = pr
273302
self._db = self._protocol.get_common_db()
274-
self._reconnect_coro = None
303+
self._reconnect_task = None
275304
self._normalize_api()
305+
306+
if self._ping_timeout:
307+
self._ping_task = \
308+
self.__create_task(self._ping_task_func())
276309
return
277310
except TarantoolDatabaseError as e:
278311
skip_errors = {
@@ -287,7 +320,7 @@ async def full_connect():
287320
continue
288321

289322
if return_exceptions:
290-
self._reconnect_coro = None
323+
self._reconnect_task = None
291324
raise e
292325

293326
logger.exception(e)
@@ -298,15 +331,15 @@ async def full_connect():
298331
return # no reconnect, no return_exceptions
299332
except asyncio.CancelledError:
300333
logger.debug("connect is cancelled")
301-
self._reconnect_coro = None
334+
self._reconnect_task = None
302335
raise
303336
except Exception as e:
304337
if self._reconnect_timeout > 0:
305338
await self._wait_reconnect(e)
306339
continue
307340

308341
if return_exceptions:
309-
self._reconnect_coro = None
342+
self._reconnect_task = None
310343
raise e
311344

312345
logger.exception(e)
@@ -341,9 +374,13 @@ async def disconnect(self):
341374
self._set_state(ConnectionState.DISCONNECTING)
342375

343376
logger.info('%s Disconnecting...', self.fingerprint)
344-
if self._reconnect_coro:
345-
self._reconnect_coro.cancel()
346-
self._reconnect_coro = None
377+
if self._reconnect_task:
378+
self._reconnect_task.cancel()
379+
self._reconnect_task = None
380+
381+
if self._ping_task and not self._ping_task.done():
382+
self._ping_task.cancel()
383+
self._ping_task = None
347384

348385
self._db = _DbMock()
349386
if self._transport:
@@ -373,9 +410,13 @@ def close(self):
373410
self._set_state(ConnectionState.DISCONNECTING)
374411
logger.info('%s Disconnecting...', self.fingerprint)
375412

376-
if self._reconnect_coro:
377-
self._reconnect_coro.cancel()
378-
self._reconnect_coro = None
413+
if self._reconnect_task and not self._reconnect_task.done():
414+
self._reconnect_task.cancel()
415+
self._reconnect_task = None
416+
417+
if self._ping_task and not self._ping_task.done():
418+
self._ping_task.cancel()
419+
self._ping_task = None
379420

380421
if self._transport:
381422
self._transport.close()
@@ -549,7 +590,7 @@ async def refetch_schema(self):
549590
"""
550591
await self._protocol.refetch_schema()
551592

552-
def ping(self, *, timeout=-1) -> _MethodRet:
593+
def ping(self, *, timeout=-1.0) -> _MethodRet:
553594
"""
554595
Ping request coroutine
555596
@@ -560,7 +601,7 @@ def ping(self, *, timeout=-1) -> _MethodRet:
560601
return self._db.ping(timeout=timeout)
561602

562603
def call16(self, func_name, args=None, *,
563-
timeout=-1, push_subscribe=False) -> _MethodRet:
604+
timeout=-1.0, push_subscribe=False) -> _MethodRet:
564605
"""
565606
Call16 request coroutine. It is a call with an old behaviour
566607
(return result of a Tarantool procedure is wrapped into a tuple,
@@ -578,7 +619,7 @@ def call16(self, func_name, args=None, *,
578619
push_subscribe=push_subscribe)
579620

580621
def call(self, func_name, args=None, *,
581-
timeout=-1, push_subscribe=False) -> _MethodRet:
622+
timeout=-1.0, push_subscribe=False) -> _MethodRet:
582623
"""
583624
Call request coroutine. It is a call with a new behaviour
584625
(return result of a Tarantool procedure is not wrapped into
@@ -611,7 +652,7 @@ def call(self, func_name, args=None, *,
611652
timeout=timeout, push_subscribe=push_subscribe)
612653

613654
def eval(self, expression, args=None, *,
614-
timeout=-1, push_subscribe=False) -> _MethodRet:
655+
timeout=-1.0, push_subscribe=False) -> _MethodRet:
615656
"""
616657
Eval request coroutine.
617658
@@ -716,7 +757,7 @@ def insert(self, space, t, *, replace=False, timeout=-1) -> _MethodRet:
716757
replace=replace,
717758
timeout=timeout)
718759

719-
def replace(self, space, t, *, timeout=-1) -> _MethodRet:
760+
def replace(self, space, t, *, timeout=-1.0) -> _MethodRet:
720761
"""
721762
Replace request coroutine. Same as insert, but replace.
722763
@@ -821,7 +862,7 @@ def upsert(self, space, t, operations, **kwargs) -> _MethodRet:
821862
return self._db.upsert(space, t, operations, **kwargs)
822863

823864
def sql(self, query, args=None, *,
824-
parse_metadata=True, timeout=-1) -> _MethodRet:
865+
parse_metadata=True, timeout=-1.0) -> _MethodRet:
825866
"""
826867
Executes an SQL statement (only for Tarantool > 2)
827868

tests/test_common.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import asyncio
2+
13
import asynctnt
24
from asynctnt._testbase import ensure_version
35
from asynctnt.exceptions import TarantoolNotConnectedError, \
@@ -202,3 +204,37 @@ async def test__schema_refetch_next_byte(self):
202204
await self.conn.call('func_hello')
203205
except TarantoolDatabaseError as e:
204206
self.fail(e)
207+
208+
async def test__schema_refetch_unknown_space(self):
209+
await self.tnt_reconnect(auto_refetch_schema=True,
210+
username='t1', password='t1',
211+
ping_timeout=0.1)
212+
213+
async def func():
214+
# trying to select from an unknown space until it is created
215+
while True:
216+
try:
217+
await self.conn.select('spacex')
218+
return
219+
except Exception as e:
220+
pass
221+
222+
await asyncio.sleep(0.1, loop=self.loop)
223+
224+
f = asyncio.ensure_future(asyncio.wait_for(func(), loop=self.loop,
225+
timeout=1),
226+
loop=self.loop)
227+
228+
# Changing scheme
229+
try:
230+
await self.conn.eval(
231+
"s = box.schema.create_space('spacex');"
232+
"s:create_index('primary');"
233+
)
234+
except TarantoolDatabaseError as e:
235+
self.fail(e)
236+
237+
try:
238+
await f
239+
except (asyncio.TimeoutError, asyncio.CancelledError) as e:
240+
self.fail('Schema is not updated: %s %s' % (type(e), e))

tests/test_op_sql.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,6 @@ async def test__sql_update_multiple(self):
5454

5555
res = await self.conn.sql("update sql_space set name = 'uno'")
5656

57-
print(await self.conn.sql("select * from sql_space"))
5857
self.assertEqual(2, res.rowcount, 'rowcount ok')
5958

6059
@ensure_version(min=(2, 0))

0 commit comments

Comments
 (0)