Skip to content

Commit e48f4be

Browse files
ivandaschisapego
authored andcommitted
IGNITE-14472 Multiple performance improvements
This closes #28
1 parent 7c1d0cc commit e48f4be

File tree

9 files changed

+159
-115
lines changed

9 files changed

+159
-115
lines changed

pyignite/binary.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def write_footer(obj, stream, header, header_class, schema_items, offsets, initi
201201
stream.write(schema)
202202

203203
if save_to_buf:
204-
obj._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
204+
obj._buffer = stream.slice(initial_pos, stream.tell() - initial_pos)
205205
obj._hashcode = header.hash_code
206206

207207
def _setattr(self, attr_name: str, attr_value: Any):

pyignite/connection/aio_connection.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
158158

159159
with AioBinaryStream(self.client) as stream:
160160
await hs_request.from_python_async(stream)
161-
await self._send(stream.getbuffer(), reconnect=False)
161+
await self._send(stream.getvalue(), reconnect=False)
162162

163163
with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
164164
hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
@@ -185,7 +185,7 @@ async def _reconnect(self):
185185
except connection_errors:
186186
pass
187187

188-
async def request(self, data: Union[bytes, bytearray, memoryview]) -> bytearray:
188+
async def request(self, data: Union[bytes, bytearray]) -> bytearray:
189189
"""
190190
Perform request.
191191
@@ -195,7 +195,7 @@ async def request(self, data: Union[bytes, bytearray, memoryview]) -> bytearray:
195195
await self._send(data)
196196
return await self._recv()
197197

198-
async def _send(self, data: Union[bytes, bytearray, memoryview], reconnect=True):
198+
async def _send(self, data: Union[bytes, bytearray], reconnect=True):
199199
if self.closed:
200200
raise SocketError('Attempt to use closed connection.')
201201

@@ -212,21 +212,43 @@ async def _recv(self, reconnect=True) -> bytearray:
212212
if self.closed:
213213
raise SocketError('Attempt to use closed connection.')
214214

215-
with BytesIO() as stream:
215+
data = bytearray(1024)
216+
buffer = memoryview(data)
217+
bytes_total_received, bytes_to_receive = 0, 0
218+
while True:
216219
try:
217-
buf = await self._reader.readexactly(4)
218-
response_len = int.from_bytes(buf, PROTOCOL_BYTE_ORDER)
220+
chunk = await self._reader.read(len(buffer))
221+
bytes_received = len(chunk)
222+
if bytes_received == 0:
223+
raise SocketError('Connection broken.')
219224

220-
stream.write(buf)
221-
222-
stream.write(await self._reader.readexactly(response_len))
225+
buffer[0:bytes_received] = chunk
226+
bytes_total_received += bytes_received
223227
except connection_errors:
224228
self.failed = True
225229
if reconnect:
226230
await self._reconnect()
227231
raise
228232

229-
return bytearray(stream.getbuffer())
233+
if bytes_total_received < 4:
234+
continue
235+
elif bytes_to_receive == 0:
236+
response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
237+
bytes_to_receive = response_len
238+
239+
if response_len + 4 > len(data):
240+
buffer.release()
241+
data.extend(bytearray(response_len + 4 - len(data)))
242+
buffer = memoryview(data)[bytes_total_received:]
243+
continue
244+
245+
if bytes_total_received >= bytes_to_receive:
246+
buffer.release()
247+
break
248+
249+
buffer = buffer[bytes_received:]
250+
251+
return data
230252

231253
async def close(self):
232254
async with self._mux:

pyignite/connection/connection.py

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]:
212212

213213
with BinaryStream(self.client) as stream:
214214
hs_request.from_python(stream)
215-
self.send(stream.getbuffer(), reconnect=False)
215+
self.send(stream.getvalue(), reconnect=False)
216216

217217
with BinaryStream(self.client, self.recv(reconnect=False)) as stream:
218218
hs_response = HandshakeResponse.parse(stream, self.protocol_context)
@@ -235,7 +235,7 @@ def reconnect(self):
235235
except connection_errors:
236236
pass
237237

238-
def request(self, data: Union[bytes, bytearray, memoryview], flags=None) -> bytearray:
238+
def request(self, data: Union[bytes, bytearray], flags=None) -> bytearray:
239239
"""
240240
Perform request.
241241
@@ -245,7 +245,7 @@ def request(self, data: Union[bytes, bytearray, memoryview], flags=None) -> byte
245245
self.send(data, flags=flags)
246246
return self.recv()
247247

248-
def send(self, data: Union[bytes, bytearray, memoryview], flags=None, reconnect=True):
248+
def send(self, data: Union[bytes, bytearray], flags=None, reconnect=True):
249249
"""
250250
Send data down the socket.
251251
@@ -275,35 +275,46 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
275275
:param flags: (optional) OS-specific flags.
276276
:param reconnect: (optional) reconnect on failure, default True.
277277
"""
278-
def _recv(buffer, num_bytes):
279-
bytes_to_receive = num_bytes
280-
while bytes_to_receive > 0:
281-
try:
282-
bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs)
283-
if bytes_rcvd == 0:
284-
raise SocketError('Connection broken.')
285-
except connection_errors:
286-
self.failed = True
287-
if reconnect:
288-
self.reconnect()
289-
raise
290-
291-
buffer = buffer[bytes_rcvd:]
292-
bytes_to_receive -= bytes_rcvd
293-
294278
if self.closed:
295279
raise SocketError('Attempt to use closed connection.')
296280

297281
kwargs = {}
298282
if flags is not None:
299283
kwargs['flags'] = flags
300284

301-
data = bytearray(4)
302-
_recv(memoryview(data), 4)
303-
response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
285+
data = bytearray(1024)
286+
buffer = memoryview(data)
287+
bytes_total_received, bytes_to_receive = 0, 0
288+
while True:
289+
try:
290+
bytes_received = self._socket.recv_into(buffer, len(buffer), **kwargs)
291+
if bytes_received == 0:
292+
raise SocketError('Connection broken.')
293+
bytes_total_received += bytes_received
294+
except connection_errors:
295+
self.failed = True
296+
if reconnect:
297+
self.reconnect()
298+
raise
299+
300+
if bytes_total_received < 4:
301+
continue
302+
elif bytes_to_receive == 0:
303+
response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
304+
bytes_to_receive = response_len
305+
306+
if response_len + 4 > len(data):
307+
buffer.release()
308+
data.extend(bytearray(response_len + 4 - len(data)))
309+
buffer = memoryview(data)[bytes_total_received:]
310+
continue
311+
312+
if bytes_total_received >= bytes_to_receive:
313+
buffer.release()
314+
break
315+
316+
buffer = buffer[bytes_received:]
304317

305-
data.extend(bytearray(response_len))
306-
_recv(memoryview(data)[4:], response_len)
307318
return data
308319

309320
def close(self):

pyignite/datatypes/internal.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
from ..stream import READ_BACKWARD
3737

3838

39-
def tc_map(key: bytes, _memo_map: dict = {}):
39+
_tc_map = {}
40+
41+
42+
def tc_map(key: bytes):
4043
"""
4144
Returns a default parser/generator class for the given type code.
4245
@@ -49,7 +52,8 @@ def tc_map(key: bytes, _memo_map: dict = {}):
4952
of the “type code-type class” mapping,
5053
:return: parser/generator class for the type code.
5154
"""
52-
if not _memo_map:
55+
global _tc_map
56+
if not _tc_map:
5357
from pyignite.datatypes import (
5458
Null, ByteObject, ShortObject, IntObject, LongObject, FloatObject,
5559
DoubleObject, CharObject, BoolObject, UUIDObject, DateObject,
@@ -64,7 +68,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
6468
MapObject, BinaryObject, WrappedDataObject,
6569
)
6670

67-
_memo_map = {
71+
_tc_map = {
6872
TC_NULL: Null,
6973

7074
TC_BYTE: ByteObject,
@@ -110,7 +114,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
110114
TC_COMPLEX_OBJECT: BinaryObject,
111115
TC_ARRAY_WRAPPED_OBJECTS: WrappedDataObject,
112116
}
113-
return _memo_map[key]
117+
return _tc_map[key]
114118

115119

116120
class Conditional:
@@ -183,7 +187,7 @@ async def parse_async(self, stream):
183187
def __parse_length(self, stream):
184188
counter_type_len = ctypes.sizeof(self.counter_type)
185189
length = int.from_bytes(
186-
stream.mem_view(offset=counter_type_len),
190+
stream.slice(offset=counter_type_len),
187191
byteorder=PROTOCOL_BYTE_ORDER
188192
)
189193
stream.seek(counter_type_len, SEEK_CUR)
@@ -348,6 +352,9 @@ class AnyDataObject:
348352
"""
349353
_python_map = None
350354
_python_array_map = None
355+
_map_obj_type = None
356+
_collection_obj_type = None
357+
_binary_obj_type = None
351358

352359
@staticmethod
353360
def get_subtype(iterable, allow_none=False):
@@ -391,7 +398,7 @@ async def parse_async(cls, stream):
391398

392399
@classmethod
393400
def __data_class_parse(cls, stream):
394-
type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte)))
401+
type_code = stream.slice(offset=ctypes.sizeof(ctypes.c_byte))
395402
try:
396403
return tc_map(type_code)
397404
except KeyError:
@@ -416,15 +423,17 @@ def __data_class_from_ctype(cls, ctype_object):
416423
return tc_map(type_code)
417424

418425
@classmethod
419-
def _init_python_map(cls):
426+
def _init_python_mapping(cls):
420427
"""
421428
Optimizes Python types→Ignite types map creation for speed.
422429
423430
Local imports seem inevitable here.
424431
"""
425432
from pyignite.datatypes import (
426-
LongObject, DoubleObject, String, BoolObject, Null, UUIDObject,
427-
DateObject, TimeObject, DecimalObject, ByteArrayObject,
433+
LongObject, DoubleObject, String, BoolObject, Null, UUIDObject, DateObject, TimeObject,
434+
DecimalObject, ByteArrayObject, LongArrayObject, DoubleArrayObject, StringArrayObject,
435+
BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject, DecimalArrayObject,
436+
MapObject, CollectionObject, BinaryObject
428437
)
429438

430439
cls._python_map = {
@@ -442,17 +451,6 @@ def _init_python_map(cls):
442451
decimal.Decimal: DecimalObject,
443452
}
444453

445-
@classmethod
446-
def _init_python_array_map(cls):
447-
"""
448-
Optimizes Python types→Ignite array types map creation for speed.
449-
"""
450-
from pyignite.datatypes import (
451-
LongArrayObject, DoubleArrayObject, StringArrayObject,
452-
BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject,
453-
DecimalArrayObject,
454-
)
455-
456454
cls._python_array_map = {
457455
int: LongArrayObject,
458456
float: DoubleArrayObject,
@@ -466,18 +464,20 @@ def _init_python_array_map(cls):
466464
decimal.Decimal: DecimalArrayObject,
467465
}
468466

467+
cls._map_obj_type = MapObject
468+
cls._collection_obj_type = CollectionObject
469+
cls._binary_obj_type = BinaryObject
470+
469471
@classmethod
470472
def map_python_type(cls, value):
471-
from pyignite.datatypes import (
472-
MapObject, CollectionObject, BinaryObject,
473-
)
474-
475-
if cls._python_map is None:
476-
cls._init_python_map()
477-
if cls._python_array_map is None:
478-
cls._init_python_array_map()
473+
if cls._python_map is None or cls._python_array_map is None:
474+
cls._init_python_mapping()
479475

480476
value_type = type(value)
477+
478+
if value_type in cls._python_map:
479+
return cls._python_map[value_type]
480+
481481
if is_iterable(value) and value_type not in (str, bytearray, bytes):
482482
value_subtype = cls.get_subtype(value)
483483
if value_subtype in cls._python_array_map:
@@ -490,15 +490,15 @@ def map_python_type(cls, value):
490490
isinstance(value[0], int),
491491
isinstance(value[1], dict),
492492
]):
493-
return MapObject
493+
return cls._map_obj_type
494494

495495
if all([
496496
value_subtype is None,
497497
len(value) == 2,
498498
isinstance(value[0], int),
499499
is_iterable(value[1]),
500500
]):
501-
return CollectionObject
501+
return cls._collection_obj_type
502502

503503
# no default for ObjectArrayObject, sorry
504504

@@ -507,10 +507,8 @@ def map_python_type(cls, value):
507507
)
508508

509509
if is_binary(value):
510-
return BinaryObject
510+
return cls._binary_obj_type
511511

512-
if value_type in cls._python_map:
513-
return cls._python_map[value_type]
514512
raise TypeError(
515513
'Type `{}` is invalid.'.format(value_type)
516514
)

pyignite/datatypes/null_object.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ async def to_python_async(cls, ctypes_object, *args, **kwargs):
140140
def __check_null_input(cls, stream):
141141
type_len = ctypes.sizeof(ctypes.c_byte)
142142

143-
if stream.mem_view(offset=type_len) == TC_NULL:
143+
if stream.slice(offset=type_len) == TC_NULL:
144144
stream.seek(type_len, SEEK_CUR)
145145
return True, Null.build_c_type()
146146

pyignite/datatypes/standard.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def build_c_type(cls, length: int):
9191
@classmethod
9292
def parse_not_null(cls, stream):
9393
length = int.from_bytes(
94-
stream.mem_view(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
94+
stream.slice(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
9595
byteorder=PROTOCOL_BYTE_ORDER
9696
)
9797

pyignite/queries/query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ def perform(
122122
"""
123123
with BinaryStream(conn.client) as stream:
124124
self.from_python(stream, query_params)
125-
response_data = conn.request(stream.getbuffer())
125+
response_data = conn.request(stream.getvalue())
126126

127127
response_struct = self.response_type(protocol_context=conn.protocol_context,
128128
following=response_config, **kwargs)
@@ -154,7 +154,7 @@ async def perform_async(
154154
"""
155155
with AioBinaryStream(conn.client) as stream:
156156
await self.from_python_async(stream, query_params)
157-
data = await conn.request(stream.getbuffer())
157+
data = await conn.request(stream.getvalue())
158158

159159
response_struct = self.response_type(protocol_context=conn.protocol_context,
160160
following=response_config, **kwargs)

0 commit comments

Comments
 (0)