Skip to content

Commit bf531db

Browse files
committed
response optimizations + ConnectionState.IDLE
1 parent cc9cff4 commit bf531db

File tree

7 files changed

+102
-40
lines changed

7 files changed

+102
-40
lines changed

README.md

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,10 +68,10 @@ RPS on running 200k requests in 300 parallel coroutines (no `uvloop`):
6868
| ------------- |:-------------:| ---------:|
6969
| ping | 24961.12 | 28155.32 |
7070
| call | 21748.06 | 22103.14 |
71-
| eval | 20497.69 | 21456.38 |
72-
| select | 19968.26 | 23558.00 |
71+
| eval | 20497.69 | 24417.38 |
72+
| select | 19968.26 | 26151.00 |
7373
| insert | 20604.61 | 22256.69 |
74-
| update | 18852.46 | 21988.80 |
74+
| update | 18852.46 | 23175.80 |
7575

7676

7777
Let's enable uvloop. This is where asynctnt shines.
@@ -130,6 +130,9 @@ async def run():
130130
await conn.insert('tester', [i, 'hello{}'.format(i)])
131131

132132
values = await conn.select('tester', [])
133+
print('Code: {}'.format(values.code))
134+
print('Data: {}'.format(values.body))
135+
print(values.body2yaml()) # prints as yaml
133136

134137
await conn.disconnect()
135138

asynctnt/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
from .connection import Connection, connect
22
from .iproto.protocol import Iterator, Response
33

4-
__version__ = '0.0.7'
4+
__version__ = '0.0.8'

asynctnt/connection.py

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414

1515

1616
class ConnectionState(enum.IntEnum):
17-
CONNECTING = 0
18-
CONNECTED = 1
19-
RECONNECTING = 2
20-
DISCONNECTING = 3
21-
DISCONNECTED = 4
17+
IDLE = 0
18+
CONNECTING = 1
19+
CONNECTED = 2
20+
RECONNECTING = 3
21+
DISCONNECTING = 4
22+
DISCONNECTED = 5
2223

2324

2425
class Connection:
@@ -109,8 +110,8 @@ def __init__(self, *,
109110
self._protocol = None
110111
self._db = DbMock()
111112

112-
self._state = ConnectionState.DISCONNECTED
113-
self._state_prev = ConnectionState.DISCONNECTED
113+
self._state = ConnectionState.IDLE
114+
self._state_prev = ConnectionState.IDLE
114115
self._disconnect_waiter = None
115116
self._reconnect_coro = None
116117

@@ -167,6 +168,7 @@ async def _connect(self, return_exceptions=True):
167168
ConnectionState.CONNECTING,
168169
ConnectionState.CONNECTED,
169170
ConnectionState.DISCONNECTING,
171+
ConnectionState.DISCONNECTED,
170172
}
171173
if self._state in ignore_states:
172174
return
@@ -312,6 +314,7 @@ async def reconnect(self):
312314
Just calls disconnect() and connect()
313315
"""
314316
await self.disconnect()
317+
self._set_state(ConnectionState.IDLE)
315318
await self.connect()
316319

317320
@property
@@ -463,6 +466,7 @@ def ping(self, *, timeout=-1):
463466
"""
464467
Ping request coroutine
465468
:param timeout: Request timeout
469+
:rtype: asynctnt.Response
466470
"""
467471
return self._db.ping(timeout=timeout)
468472

@@ -474,6 +478,7 @@ def call16(self, func_name, args=None, *, timeout=-1):
474478
:param func_name: function name to call
475479
:param args: arguments to pass to the function (list object)
476480
:param timeout: Request timeout
481+
:rtype: asynctnt.Response
477482
"""
478483
return self._db.call16(func_name, args,
479484
timeout=timeout)
@@ -488,6 +493,7 @@ def call(self, func_name, args=None, *, timeout=-1):
488493
:param func_name: function name to call
489494
:param args: arguments to pass to the function (list object)
490495
:param timeout: Request timeout
496+
:rtype: asynctnt.Response
491497
"""
492498
return self._db.call(func_name, args,
493499
timeout=timeout)
@@ -500,6 +506,7 @@ def eval(self, expression, args=None, *, timeout=-1):
500506
:param args: arguments to pass to the function, that will
501507
execute your expression (list object)
502508
:param timeout: Request timeout
509+
:rtype: asynctnt.Response
503510
"""
504511
return self._db.eval(expression, args,
505512
timeout=timeout)
@@ -519,6 +526,7 @@ def select(self, space, key=None, **kwargs):
519526
* asynctnt.Iterator object
520527
* string with an iterator name
521528
:param timeout: Request timeout
529+
:rtype: asynctnt.Response
522530
"""
523531
return self._db.select(space, key, **kwargs)
524532

@@ -530,6 +538,7 @@ def insert(self, space, t, *, replace=False, timeout=-1):
530538
:param t: tuple to insert (list object)
531539
:param replace: performs replace request instead of insert
532540
:param timeout: Request timeout
541+
:rtype: asynctnt.Response
533542
"""
534543
return self._db.insert(space, t,
535544
replace=replace, timeout=timeout)
@@ -541,6 +550,7 @@ def replace(self, space, t, *, timeout=-1):
541550
:param space: space id or space name.
542551
:param t: tuple to insert (list object)
543552
:param timeout: Request timeout
553+
:rtype: asynctnt.Response
544554
"""
545555
return self._db.replace(space, t,
546556
timeout=timeout)
@@ -553,6 +563,7 @@ def delete(self, space, key, **kwargs):
553563
:param key: key to delete
554564
:param index: index id or name
555565
:param timeout: Request timeout
566+
:rtype: asynctnt.Response
556567
"""
557568
return self._db.delete(space, key, **kwargs)
558569

@@ -581,6 +592,7 @@ def upsert(self, space, t, operations, **kwargs):
581592
Please refer to
582593
https://tarantool.org/doc/book/box/box_space.html?highlight=update#lua-function.space_object.update
583594
:param timeout: Request timeout
595+
:rtype: asynctnt.Response
584596
"""
585597
return self._db.upsert(space, t, operations, **kwargs)
586598

asynctnt/iproto/coreproto.pyx

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,11 @@ cdef class CoreProtocol:
119119
resp = response_parse(p, packet_len, self.encoding)
120120
p = &p[packet_len] # skip entire packet
121121

122-
sync_obj = <object>resp.sync
122+
sync_obj = <object>resp._sync
123123

124124
req_p = cpython.dict.PyDict_GetItem(self.reqs, sync_obj)
125125
if req_p is NULL:
126-
logger.warning('sync %d not found', resp.sync)
126+
logger.warning('sync %d not found', resp._sync)
127127
continue
128128

129129
req = <Request>req_p
@@ -133,9 +133,9 @@ cdef class CoreProtocol:
133133
waiter = req.waiter
134134
if waiter is not None \
135135
and not waiter.done():
136-
if resp.code != 0:
136+
if resp._code != 0:
137137
waiter.set_exception(
138-
TarantoolDatabaseError(resp.code, resp.errmsg))
138+
TarantoolDatabaseError(resp._code, resp._errmsg))
139139
else:
140140
waiter.set_result(resp)
141141

asynctnt/iproto/response.pxd

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,16 @@ from libc.stdint cimport uint64_t, uint32_t, int64_t
33

44
cdef class Response:
55
cdef:
6-
public uint32_t code
7-
public uint64_t sync
8-
public int64_t schema_id
9-
public str errmsg
10-
public list body
11-
public bytes encoding
6+
uint32_t _code
7+
uint64_t _sync
8+
int64_t _schema_id
9+
str _errmsg
10+
list _body
11+
bytes _encoding
1212

1313
@staticmethod
1414
cdef inline Response new(bytes encoding)
1515

16-
cdef inline has_schema_id(self)
1716
cdef inline is_error(self)
1817

1918

asynctnt/iproto/response.pyx

Lines changed: 46 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,29 +9,57 @@ import yaml
99

1010

1111
cdef class Response:
12+
def __cinit__(self):
13+
self._sync = 0
14+
self._code = 0
15+
self._schema_id = -1
16+
self._errmsg = None
17+
self._body = None
18+
self._encoding = None
19+
1220
@staticmethod
1321
cdef inline Response new(bytes encoding):
1422
cdef Response resp
1523
resp = Response.__new__(Response)
16-
resp.sync = 0
17-
resp.code = 0
18-
resp.schema_id = -1
19-
resp.errmsg = None
20-
resp.body = None
21-
resp.encoding = encoding
24+
resp._encoding = encoding
2225
return resp
2326

24-
cdef inline has_schema_id(self):
25-
return self.schema_id != -1
26-
2727
cdef inline is_error(self):
28-
return self.code != 0
28+
return self._code != 0
2929

3030
def __repr__(self):
31-
return '<Response: code={}, sync={}>'.format(self.code, self.sync)
31+
body_len = None
32+
if self._body is not None:
33+
body_len = len(self._body)
34+
return '<Response: code={}, sync={}, body_len={}>'.format(
35+
self._code, self._sync, body_len)
3236

3337
def body2yaml(self):
34-
return yaml.dump(self.body, allow_unicode=True)
38+
return yaml.dump(self._body, allow_unicode=True)
39+
40+
@property
41+
def sync(self):
42+
return self._sync
43+
44+
@property
45+
def code(self):
46+
return self._code
47+
48+
@property
49+
def schema_id(self):
50+
return self._schema_id
51+
52+
@property
53+
def errmsg(self):
54+
return self._errmsg
55+
56+
@property
57+
def body(self):
58+
return self._body
59+
60+
@property
61+
def encoding(self):
62+
return self._encoding
3563

3664

3765
cdef object _decode_obj(const char** p, bytes encoding):
@@ -152,18 +180,18 @@ cdef Response response_parse(const char *buf, uint32_t buf_len,
152180
if mp_typeof(b[0]) != MP_UINT:
153181
raise TypeError('code type must be a MP_UINT')
154182

155-
resp.code = <uint32_t>mp_decode_uint(&b)
156-
resp.code &= 0x7FFF
183+
resp._code = <uint32_t>mp_decode_uint(&b)
184+
resp._code &= 0x7FFF
157185
elif key == tnt.TP_SYNC:
158186
if mp_typeof(b[0]) != MP_UINT:
159187
raise TypeError('sync type must be a MP_UINT')
160188

161-
resp.sync = mp_decode_uint(&b)
189+
resp._sync = mp_decode_uint(&b)
162190
elif key == tnt.TP_SCHEMA_ID:
163191
if mp_typeof(b[0]) != MP_UINT:
164192
raise TypeError('schema_id type must be a MP_UINT')
165193

166-
resp.schema_id = mp_decode_uint(&b)
194+
resp._schema_id = mp_decode_uint(&b)
167195
else:
168196
logger.warning('Unknown argument in header. Skipping.')
169197
mp_next(&b)
@@ -189,10 +217,10 @@ cdef Response response_parse(const char *buf, uint32_t buf_len,
189217
s = NULL
190218
s_len = 0
191219
s = mp_decode_str(&b, &s_len)
192-
resp.errmsg = s[:s_len].decode(encoding)
220+
resp._errmsg = s[:s_len].decode(encoding)
193221
elif key == tnt.TP_DATA:
194222
if mp_typeof(b[0]) != MP_ARRAY:
195223
raise TypeError('body data type must be a MP_ARRAY')
196-
resp.body = _response_parse_body_data(b, encoding)
224+
resp._body = _response_parse_body_data(b, encoding)
197225

198226
return resp

tests/test_connect.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ async def test__connect(self):
1515
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
1616
reconnect_timeout=0,
1717
loop=self.loop)
18+
self.assertEqual(conn.state, ConnectionState.IDLE)
19+
1820
await conn.connect()
1921
self.assertIsNotNone(conn._transport)
2022
self.assertIsNotNone(conn._protocol)
@@ -149,3 +151,21 @@ async def test__connect_tnt_restarted(self):
149151
finally:
150152
await conn.disconnect()
151153

154+
async def test__connect_force_disconnect(self):
155+
conn = asynctnt.Connection(host=self.tnt.host, port=44444,
156+
reconnect_timeout=0.3,
157+
loop=self.loop)
158+
self.ensure_future(conn.connect())
159+
await self.sleep(1)
160+
await conn.disconnect()
161+
self.assertEqual(conn.state, ConnectionState.DISCONNECTED)
162+
163+
async def test__close(self):
164+
conn = asynctnt.Connection(host=self.tnt.host, port=self.tnt.port,
165+
reconnect_timeout=0.3,
166+
loop=self.loop)
167+
await conn.connect()
168+
await self.sleep(0.1)
169+
conn.close()
170+
await self.sleep(0.1)
171+
self.assertEqual(conn.state, ConnectionState.DISCONNECTED)

0 commit comments

Comments
 (0)