Skip to content

Commit e09f446

Browse files
authored
COH-29986 - Implement missing apis - add_index and remove_index in Python client (#139)
* COH-29986 - Implement missing apis - add_index and remove_index in Python client
1 parent ad12ecb commit e09f446

File tree

3 files changed

+118
-2
lines changed

3 files changed

+118
-2
lines changed

src/coherence/client.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,15 @@
3636
from .aggregator import AverageAggregator, EntryAggregator, PriorityAggregator, SumAggregator
3737
from .comparator import Comparator
3838
from .event import MapLifecycleEvent, MapListener, SessionLifecycleEvent
39+
from .extractor import ValueExtractor
3940
from .filter import Filter
4041
from .messages_pb2 import PageRequest # type: ignore
4142
from .processor import EntryProcessor
4243
from .serialization import Serializer, SerializerRegistry
4344
from .services_pb2_grpc import NamedCacheServiceStub
4445
from .util import RequestFactory
4546

47+
E = TypeVar("E")
4648
K = TypeVar("K")
4749
V = TypeVar("V")
4850
R = TypeVar("R")
@@ -456,6 +458,34 @@ def entries(
456458
:return: an AsyncIterator of MapEntry instances that satisfy the specified criteria
457459
"""
458460

461+
@abc.abstractmethod
462+
def add_index(
463+
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
464+
) -> None:
465+
"""
466+
Add an index to this map.
467+
468+
:param extractor: The :class: `coherence.extractor.ValueExtractor` object that is used to extract
469+
an indexable Object from a value stored in the
470+
indexed Map. Must not be 'None'.
471+
:param ordered: true if the contents of the indexed information
472+
should be ordered false otherwise.
473+
:param comparator: The :class: `coherence.comparator.Comparator` object which imposes an ordering
474+
on entries in the indexed map or None if the
475+
entries' values natural ordering should be used.
476+
"""
477+
478+
@abc.abstractmethod
479+
def remove_index(self, extractor: ValueExtractor[T, E]) -> None:
480+
"""
481+
Removes an index on this `NamedMap`.
482+
483+
:param extractor: The :class: `coherence.extractor.ValueExtractor` object that is used to extract
484+
an indexable Object from a value stored in the
485+
indexed Map. Must not be 'None'.
486+
487+
"""
488+
459489

460490
class NamedCache(NamedMap[K, V]):
461491
"""
@@ -743,6 +773,22 @@ async def remove_map_listener(self, listener: MapListener[K, V], listener_for: O
743773
else:
744774
await self._events_manager._remove_key_listener(listener, listener_for)
745775

776+
@_pre_call_cache
777+
async def add_index(
778+
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
779+
) -> None:
780+
if extractor is None:
781+
raise ValueError("A ValueExtractor must be specified")
782+
r = self._request_factory.add_index_request(extractor, ordered, comparator)
783+
await self._client_stub.addIndex(r)
784+
785+
@_pre_call_cache
786+
async def remove_index(self, extractor: ValueExtractor[T, E]) -> None:
787+
if extractor is None:
788+
raise ValueError("A ValueExtractor must be specified")
789+
r = self._request_factory.remove_index_request(extractor)
790+
await self._client_stub.removeIndex(r)
791+
746792
def _setup_event_handlers(self) -> None:
747793
"""
748794
Setup handlers to notify cache-level handlers of events.

src/coherence/util.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,10 @@
99

1010
from .aggregator import EntryAggregator
1111
from .comparator import Comparator
12+
from .extractor import ValueExtractor
1213
from .filter import Filter, Filters, MapEventFilter
1314
from .messages_pb2 import ( # type: ignore
15+
AddIndexRequest,
1416
AggregateRequest,
1517
ClearRequest,
1618
ContainsKeyRequest,
@@ -29,6 +31,7 @@
2931
PutAllRequest,
3032
PutIfAbsentRequest,
3133
PutRequest,
34+
RemoveIndexRequest,
3235
RemoveMappingRequest,
3336
RemoveRequest,
3437
ReplaceMappingRequest,
@@ -352,3 +355,29 @@ def __generate_next_request_id(self, prefix: str) -> str:
352355
"""Generates a prefix map-specific prefix when starting a MapEvent gRPC stream."""
353356
self.__next_request_id += 1
354357
return prefix + self.__uidPrefix + str(self.__next_request_id)
358+
359+
def add_index_request(
360+
self, extractor: ValueExtractor[T, E], ordered: bool = False, comparator: Optional[Comparator] = None
361+
) -> AddIndexRequest:
362+
r = AddIndexRequest(
363+
scope=self._scope,
364+
cache=self._cache_name,
365+
format=self._serializer.format,
366+
extractor=self._serializer.serialize(extractor),
367+
)
368+
r.sorted = ordered
369+
370+
if comparator is not None:
371+
r.comparator = self._serializer.serialize(comparator)
372+
373+
return r
374+
375+
def remove_index_request(self, extractor: ValueExtractor[T, E]) -> RemoveIndexRequest:
376+
r = RemoveIndexRequest(
377+
scope=self._scope,
378+
cache=self._cache_name,
379+
format=self._serializer.format,
380+
extractor=self._serializer.serialize(extractor),
381+
)
382+
383+
return r

tests/test_client.py

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
import pytest_asyncio
1111

1212
import tests
13-
from coherence import Filters, MapEntry, NamedCache, Session
13+
from coherence import Aggregators, Filters, MapEntry, NamedCache, Session
1414
from coherence.event import MapLifecycleEvent
15-
from coherence.extractor import ChainedExtractor, UniversalExtractor
15+
from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor
1616
from coherence.processor import ExtractorProcessor
1717
from tests.address import Address
1818
from tests.person import Person
@@ -562,3 +562,44 @@ def callback(n: str) -> None:
562562
assert not cache.active
563563
finally:
564564
await session.close()
565+
566+
567+
# noinspection PyShadowingNames,DuplicatedCode,PyUnresolvedReferences
568+
@pytest.mark.asyncio
569+
async def test_add_remove_index(setup_and_teardown_person_cache: NamedCache[str, Person]) -> None:
570+
cache: NamedCache[str, Person] = setup_and_teardown_person_cache
571+
572+
await cache.add_index(Extractors.extract("age"))
573+
result = await cache.aggregate(Aggregators.record(), None, Filters.greater("age", 25))
574+
# print(result)
575+
# {'@class': 'util.SimpleQueryRecord', 'results': [{'@class': 'util.SimpleQueryRecord.PartialResult',
576+
# 'partitionSet': {'@class': 'net.partition.PartitionSet', 'bits': [2147483647], 'markedCount': -1,
577+
# 'partitionCount': 31, 'tailMask': 2147483647}, 'steps': [{'@class': 'util.SimpleQueryRecord.PartialResult.Step',
578+
# 'efficiency': 5, 'filter': 'GreaterFilter(.age, 25)',
579+
# 'indexLookupRecords': [{'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord',
580+
# 'bytes': 6839, 'distinctValues': 5, 'extractor': '.age', 'index': 'Partitioned: Footprint=6.67KB, Size=5',
581+
# 'indexDesc': 'Partitioned: ', 'ordered': False}], 'keySetSizePost': 0, 'keySetSizePre': 7, 'millis': 0,
582+
# 'subSteps': []}]}], 'type': {'@class': 'aggregator.QueryRecorder.RecordType', 'enum': 'EXPLAIN'}}
583+
584+
idx_rec = result["results"][0].get("steps")[0].get("indexLookupRecords")[0]
585+
# print(idx_rec)
586+
# {'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': 6839, 'distinctValues': 5,
587+
# 'extractor': '.age', 'index': 'Partitioned: Footprint=6.67KB, Size=5', 'indexDesc': 'Partitioned: ',
588+
# 'ordered': False}
589+
assert "index" in idx_rec
590+
591+
await cache.remove_index(Extractors.extract("age"))
592+
result2 = await cache.aggregate(Aggregators.record(), None, Filters.greater("age", 25))
593+
print(result2)
594+
# {'@class': 'util.SimpleQueryRecord', 'results': [{'@class': 'util.SimpleQueryRecord.PartialResult',
595+
# 'partitionSet': {'@class': 'net.partition.PartitionSet', 'bits': [2147483647], 'markedCount': -1,
596+
# 'partitionCount': 31, 'tailMask': 2147483647}, 'steps': [{'@class': 'util.SimpleQueryRecord.PartialResult.Step',
597+
# 'efficiency': 7000, 'filter': 'GreaterFilter(.age, 25)',
598+
# 'indexLookupRecords': [{'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': -1,
599+
# 'distinctValues': -1, 'extractor': '.age', 'ordered': False}], 'keySetSizePost': 0, 'keySetSizePre': 7,
600+
# 'millis': 0, 'subSteps': []}]}], 'type': {'@class': 'aggregator.QueryRecorder.RecordType', 'enum': 'EXPLAIN'}}
601+
idx_rec = result2["results"][0].get("steps")[0].get("indexLookupRecords")[0]
602+
# print(idx_rec)
603+
# {'@class': 'util.SimpleQueryRecord.PartialResult.IndexLookupRecord', 'bytes': -1, 'distinctValues': -1,
604+
# 'extractor': '.age', 'ordered': False}
605+
assert "index" not in idx_rec

0 commit comments

Comments
 (0)