Skip to content

Commit 0131010

Browse files
isapegoivandasch
andauthored
GG-33820 [IGNITE-14705] Fix handling collections with binary objects (#46)
(cherry picked from commit 746dd13) Co-authored-by: Ivan Daschinsky <[email protected]>
1 parent eeef976 commit 0131010

File tree

17 files changed

+260
-174
lines changed

17 files changed

+260
-174
lines changed

pygridgain/aio_client.py

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
from .exceptions import BinaryTypeError, CacheError, ReconnectError, connection_errors
3333
from .queries.query import CacheInfo
3434
from .stream import AioBinaryStream, READ_BACKWARD
35-
from .utils import cache_id, entity_id, status_to_exception, is_wrapped
35+
from .utils import cache_id, entity_id, status_to_exception
3636

3737

3838
__all__ = ['AioClient']
@@ -270,11 +270,24 @@ async def unwrap_binary(self, value: Any) -> Any:
270270
:return: the result of the Binary Object unwrapping with all other data
271271
left intact.
272272
"""
273-
if is_wrapped(value):
274-
blob, offset = value
275-
with AioBinaryStream(self, blob) as stream:
276-
data_class = await BinaryObject.parse_async(stream)
277-
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
273+
if isinstance(value, tuple) and len(value) == 2:
274+
if type(value[0]) is bytes and type(value[1]) is int:
275+
blob, offset = value
276+
with AioBinaryStream(self, blob) as stream:
277+
data_class = await BinaryObject.parse_async(stream)
278+
return await BinaryObject.to_python_async(stream.read_ctype(data_class, direction=READ_BACKWARD),
279+
client=self)
280+
281+
if isinstance(value[0], int):
282+
col_type, collection = value
283+
if isinstance(collection, list):
284+
coros = [self.unwrap_binary(v) for v in collection]
285+
return col_type, await asyncio.gather(*coros)
286+
287+
if isinstance(collection, dict):
288+
coros = [asyncio.gather(self.unwrap_binary(k), self.unwrap_binary(v))
289+
for k, v in collection.items()]
290+
return col_type, dict(await asyncio.gather(*coros))
278291
return value
279292

280293
@status_to_exception(CacheError)
@@ -352,7 +365,7 @@ async def get_best_node(
352365

353366
key, key_hint = self._get_affinity_key(c_id, key, key_hint)
354367

355-
hashcode = await key_hint.hashcode_async(key, self)
368+
hashcode = await key_hint.hashcode_async(key, client=self)
356369

357370
best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
358371
if best_node:

pygridgain/client.py

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
from .queries.query import CacheInfo
6262
from .stream import BinaryStream, READ_BACKWARD
6363
from .utils import (
64-
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable, is_wrapped,
64+
cache_id, capitalize, entity_id, schema_id, process_delimiter, status_to_exception, is_iterable,
6565
get_field_by_id, unsigned
6666
)
6767
from .binary import GenericObjectMeta
@@ -540,17 +540,26 @@ def query_binary_type(self, binary_type: Union[int, str], schema: Union[int, dic
540540

541541
def unwrap_binary(self, value: Any) -> Any:
542542
"""
543-
Detects and recursively unwraps Binary Object.
543+
Detects and recursively unwraps Binary Object or collections of BinaryObject.
544544
545-
:param value: anything that could be a Binary Object,
545+
:param value: anything that could be a Binary Object or collection of BinaryObject,
546546
:return: the result of the Binary Object unwrapping with all other data
547547
left intact.
548548
"""
549-
if is_wrapped(value):
550-
blob, offset = value
551-
with BinaryStream(self, blob) as stream:
552-
data_class = BinaryObject.parse(stream)
553-
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), self)
549+
if isinstance(value, tuple) and len(value) == 2:
550+
if type(value[0]) is bytes and type(value[1]) is int:
551+
blob, offset = value
552+
with BinaryStream(self, blob) as stream:
553+
data_class = BinaryObject.parse(stream)
554+
return BinaryObject.to_python(stream.read_ctype(data_class, direction=READ_BACKWARD), client=self)
555+
556+
if isinstance(value[0], int):
557+
col_type, collection = value
558+
if isinstance(collection, list):
559+
return col_type, [self.unwrap_binary(v) for v in collection]
560+
561+
if isinstance(collection, dict):
562+
return col_type, {self.unwrap_binary(k): self.unwrap_binary(v) for k, v in collection.items()}
554563
return value
555564

556565
@status_to_exception(CacheError)
@@ -620,7 +629,7 @@ def get_best_node(
620629
return conn
621630

622631
key, key_hint = self._get_affinity_key(c_id, key, key_hint)
623-
hashcode = key_hint.hashcode(key, self)
632+
hashcode = key_hint.hashcode(key, client=self)
624633

625634
best_node = self._get_node_by_hashcode(c_id, hashcode, parts)
626635
if best_node:

pygridgain/datatypes/base.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,11 +50,11 @@ class GridGainDataType(metaclass=GridGainDataTypeMeta):
5050
a.k.a. parser/constructor classes, both object and payload varieties.
5151
"""
5252
@classmethod
53-
async def hashcode_async(cls, value, *args, **kwargs):
54-
return cls.hashcode(value, *args, **kwargs)
53+
async def hashcode_async(cls, value, **kwargs):
54+
return cls.hashcode(value, **kwargs)
5555

5656
@classmethod
57-
def hashcode(cls, value, *args, **kwargs):
57+
def hashcode(cls, value, **kwargs):
5858
return 0
5959

6060
@classmethod
@@ -74,9 +74,9 @@ async def from_python_async(cls, stream, value, **kwargs):
7474
cls.from_python(stream, value, **kwargs)
7575

7676
@classmethod
77-
def to_python(cls, ctypes_object, *args, **kwargs):
77+
def to_python(cls, ctypes_object, **kwargs):
7878
raise NotImplementedError
7979

8080
@classmethod
81-
async def to_python_async(cls, ctypes_object, *args, **kwargs):
82-
return cls.to_python(ctypes_object, *args, **kwargs)
81+
async def to_python_async(cls, ctypes_object, **kwargs):
82+
return cls.to_python(ctypes_object, **kwargs)

pygridgain/datatypes/cache_properties.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,12 +117,12 @@ async def parse_async(cls, stream):
117117
return cls.parse(stream)
118118

119119
@classmethod
120-
def to_python(cls, ctypes_object, *args, **kwargs):
121-
return cls.prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
120+
def to_python(cls, ctypes_object, **kwargs):
121+
return cls.prop_data_class.to_python(ctypes_object.data, **kwargs)
122122

123123
@classmethod
124-
async def to_python_async(cls, ctypes_object, *args, **kwargs):
125-
return cls.to_python(ctypes_object, *args, **kwargs)
124+
async def to_python_async(cls, ctypes_object, **kwargs):
125+
return cls.to_python(ctypes_object, **kwargs)
126126

127127
@classmethod
128128
def from_python(cls, stream, value):
@@ -302,6 +302,6 @@ def from_python(cls, stream, value):
302302
)
303303

304304
@classmethod
305-
def to_python(cls, ctypes_object, *args, **kwargs):
305+
def to_python(cls, ctypes_object, **kwargs):
306306
prop_data_class = prop_map(ctypes_object.prop_code)
307-
return prop_data_class.to_python(ctypes_object.data, *args, **kwargs)
307+
return prop_data_class.to_python(ctypes_object.data, **kwargs)

pygridgain/datatypes/complex.py

Lines changed: 26 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -91,22 +91,21 @@ def __build_final_class(cls, fields):
9191
)
9292

9393
@classmethod
94-
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
94+
def to_python_not_null(cls, ctypes_object, **kwargs):
9595
result = []
9696
for i in range(ctypes_object.length):
9797
result.append(
9898
AnyDataObject.to_python(
99-
getattr(ctypes_object, f'element_{i}'),
100-
*args, **kwargs
99+
getattr(ctypes_object, f'element_{i}'), **kwargs
101100
)
102101
)
103102
return ctypes_object.type_id, result
104103

105104
@classmethod
106-
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
105+
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
107106
result = [
108107
await AnyDataObject.to_python_async(
109-
getattr(ctypes_object, f'element_{i}'), *args, **kwargs
108+
getattr(ctypes_object, f'element_{i}'), **kwargs
110109
)
111110
for i in range(ctypes_object.length)]
112111
return ctypes_object.type_id, result
@@ -224,8 +223,6 @@ class CollectionObject(Nullable):
224223
_type_id = TYPE_COL
225224
_header_class = None
226225
type_code = TC_COLLECTION
227-
pythonic = list
228-
default = []
229226

230227
@classmethod
231228
def parse_not_null(cls, stream):
@@ -272,15 +269,15 @@ def __build_final_class(cls, fields):
272269
@classmethod
273270
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
274271
result = [
275-
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
272+
AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
276273
for i in range(ctypes_object.length)
277274
]
278275
return ctypes_object.type, result
279276

280277
@classmethod
281278
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
282279
result_coro = [
283-
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), *args, **kwargs)
280+
AnyDataObject.to_python_async(getattr(ctypes_object, f'element_{i}'), **kwargs)
284281
for i in range(ctypes_object.length)
285282
]
286283

@@ -362,35 +359,27 @@ def __build_final_class(cls, fields):
362359
)
363360

364361
@classmethod
365-
def _to_python(cls, ctypes_object, *args, **kwargs):
362+
def _to_python(cls, ctypes_object, **kwargs):
366363
map_cls = cls.__get_map_class(ctypes_object)
367364

368365
result = map_cls()
369366
for i in range(0, ctypes_object.length << 1, 2):
370-
k = AnyDataObject.to_python(
371-
getattr(ctypes_object, f'element_{i}'),
372-
*args, **kwargs
373-
)
374-
v = AnyDataObject.to_python(
375-
getattr(ctypes_object, f'element_{i + 1}'),
376-
*args, **kwargs
377-
)
367+
k = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i}'), **kwargs)
368+
v = AnyDataObject.to_python(getattr(ctypes_object, f'element_{i + 1}'), **kwargs)
378369
result[k] = v
379370
return result
380371

381372
@classmethod
382-
async def _to_python_async(cls, ctypes_object, *args, **kwargs):
373+
async def _to_python_async(cls, ctypes_object, **kwargs):
383374
map_cls = cls.__get_map_class(ctypes_object)
384375

385376
kv_pairs_coro = [
386377
asyncio.gather(
387378
AnyDataObject.to_python_async(
388-
getattr(ctypes_object, f'element_{i}'),
389-
*args, **kwargs
379+
getattr(ctypes_object, f'element_{i}'), **kwargs
390380
),
391381
AnyDataObject.to_python_async(
392-
getattr(ctypes_object, f'element_{i + 1}'),
393-
*args, **kwargs
382+
getattr(ctypes_object, f'element_{i + 1}'), **kwargs
394383
)
395384
) for i in range(0, ctypes_object.length << 1, 2)
396385
]
@@ -450,12 +439,12 @@ def _parse_header(cls, stream):
450439
return [('length', ctypes.c_int)], length
451440

452441
@classmethod
453-
def to_python(cls, ctypes_object, *args, **kwargs):
454-
return cls._to_python(ctypes_object, *args, **kwargs)
442+
def to_python(cls, ctypes_object, **kwargs):
443+
return cls._to_python(ctypes_object, **kwargs)
455444

456445
@classmethod
457-
async def to_python_async(cls, ctypes_object, *args, **kwargs):
458-
return await cls._to_python_async(ctypes_object, *args, **kwargs)
446+
async def to_python_async(cls, ctypes_object, **kwargs):
447+
return await cls._to_python_async(ctypes_object, **kwargs)
459448

460449
@classmethod
461450
def from_python(cls, stream, value, type_id=None):
@@ -485,8 +474,6 @@ class MapObject(Nullable, _MapBase):
485474
_type_name = NAME_MAP
486475
_type_id = TYPE_MAP
487476
type_code = TC_MAP
488-
pythonic = dict
489-
default = {}
490477

491478
@classmethod
492479
def parse_not_null(cls, stream):
@@ -508,12 +495,12 @@ def _parse_header(cls, stream):
508495
return fields, length
509496

510497
@classmethod
511-
def to_python_not_null(cls, ctypes_object, *args, **kwargs):
512-
return ctypes_object.type, cls._to_python(ctypes_object, *args, **kwargs)
498+
def to_python_not_null(cls, ctypes_object, **kwargs):
499+
return ctypes_object.type, cls._to_python(ctypes_object, **kwargs)
513500

514501
@classmethod
515-
async def to_python_not_null_async(cls, ctypes_object, *args, **kwargs):
516-
return ctypes_object.type, await cls._to_python_async(ctypes_object, *args, **kwargs)
502+
async def to_python_not_null_async(cls, ctypes_object, **kwargs):
503+
return ctypes_object.type, await cls._to_python_async(ctypes_object, **kwargs)
517504

518505
@classmethod
519506
def from_python_not_null(cls, stream, value, **kwargs):
@@ -558,7 +545,7 @@ class BinaryObject(Nullable):
558545
COMPACT_FOOTER = 0x0020
559546

560547
@classmethod
561-
def hashcode(cls, value: object, client: Optional['Client']) -> int:
548+
def hashcode(cls, value: object, client: Optional['Client'] = None) -> int:
562549
# binary objects's hashcode implementation is special in the sense
563550
# that you need to fully serialize the object to calculate
564551
# its hashcode
@@ -569,7 +556,7 @@ def hashcode(cls, value: object, client: Optional['Client']) -> int:
569556
return value._hashcode
570557

571558
@classmethod
572-
async def hashcode_async(cls, value: object, client: Optional['AioClient']) -> int:
559+
async def hashcode_async(cls, value: object, client: Optional['AioClient'] = None) -> int:
573560
if not value._hashcode and client:
574561
with AioBinaryStream(client) as stream:
575562
await value._from_python_async(stream, save_to_buf=True)
@@ -681,7 +668,7 @@ def __build_final_class(cls, stream, header, header_class, object_fields, fields
681668
return final_class
682669

683670
@classmethod
684-
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwargs):
671+
def to_python_not_null(cls, ctypes_object, client: 'Client' = None, **kwargs):
685672
type_id = ctypes_object.type_id
686673
if not client:
687674
raise ParseError(f'Can not query binary type {type_id}')
@@ -693,14 +680,13 @@ def to_python_not_null(cls, ctypes_object, client: 'Client' = None, *args, **kwa
693680
for field_name, field_type in data_class.schema.items():
694681
setattr(
695682
result, field_name, field_type.to_python(
696-
getattr(ctypes_object.object_fields, field_name),
697-
client, *args, **kwargs
683+
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
698684
)
699685
)
700686
return result
701687

702688
@classmethod
703-
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, *args, **kwargs):
689+
async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = None, **kwargs):
704690
type_id = ctypes_object.type_id
705691
if not client:
706692
raise ParseError(f'Can not query binary type {type_id}')
@@ -712,7 +698,7 @@ async def to_python_not_null_async(cls, ctypes_object, client: 'AioClient' = Non
712698
field_values = await asyncio.gather(
713699
*[
714700
field_type.to_python_async(
715-
getattr(ctypes_object.object_fields, field_name), client, *args, **kwargs
701+
getattr(ctypes_object.object_fields, field_name), client=client, **kwargs
716702
)
717703
for field_name, field_type in data_class.schema.items()
718704
]

pygridgain/datatypes/expiry_policy.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,14 +81,14 @@ async def parse_async(cls, stream):
8181
return cls.parse(stream)
8282

8383
@classmethod
84-
def to_python(cls, ctypes_object):
84+
def to_python(cls, ctypes_object, **kwargs):
8585
if ctypes_object == 0:
8686
return None
8787

8888
return ExpiryPolicy(create=ctypes_object.create, update=ctypes_object.update, access=ctypes_object.access)
8989

9090
@classmethod
91-
async def to_python_async(cls, ctypes_object):
91+
async def to_python_async(cls, ctypes_object, **kwargs):
9292
return cls.to_python(ctypes_object)
9393

9494
@classmethod

0 commit comments

Comments
 (0)