Skip to content

Commit a91cdf7

Browse files
committed
COH-32930 - Add functionality for is_ready() method on both gRPC v0 and v1 implementations
1 parent c566970 commit a91cdf7

File tree

3 files changed

+62
-2
lines changed

3 files changed

+62
-2
lines changed

src/coherence/client.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,14 @@ async def is_empty(self) -> bool:
517517
:return: `true` if this map contains no key-value mappings.
518518
"""
519519

520+
@abc.abstractmethod
521+
async def is_ready(self) -> bool:
522+
"""
523+
Returns `true` if this map is ready to be used.
524+
525+
:return: `true`if this map is ready to be used.
526+
"""
527+
520528
@abc.abstractmethod
521529
async def size(self) -> int:
522530
"""
@@ -846,6 +854,22 @@ async def is_empty(self) -> bool:
846854
v = await self._client_stub.isEmpty(r)
847855
return self._request_factory.serializer.deserialize(v.value)
848856

857+
@_pre_call_cache
858+
async def is_ready(self) -> bool:
859+
try:
860+
r = self._request_factory.is_ready_request()
861+
v = await self._client_stub.isReady(r)
862+
return self._request_factory.serializer.deserialize(v.value)
863+
except grpc.aio._call.AioRpcError as e:
864+
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
865+
raise OperationNotSupportedError(
866+
"This operation is not supported by the gRPC proxy for the connected Coherence Server."
867+
"Please upgrade to a version that supports this operation."
868+
) from e
869+
raise # Re-raise all other gRPC errors
870+
except Exception as e:
871+
raise RuntimeError("An unexpected error occurred in is_ready()") from e
872+
849873
@_pre_call_cache
850874
async def size(self) -> int:
851875
r = self._request_factory.size_request()
@@ -1364,6 +1388,11 @@ async def is_empty(self) -> bool:
13641388
await dispatcher.dispatch(self._stream_handler)
13651389
return dispatcher.result()
13661390

1391+
async def is_ready(self) -> bool:
1392+
dispatcher: UnaryDispatcher[bool] = self._request_factory.is_ready_request()
1393+
await dispatcher.dispatch(self._stream_handler)
1394+
return dispatcher.result()
1395+
13671396
@_pre_call_cache
13681397
async def size(self) -> int:
13691398
dispatcher: UnaryDispatcher[int] = self._request_factory.size_request()
@@ -2701,3 +2730,9 @@ async def handle_zero_id_response(self, response: ProxyResponse) -> None:
27012730
self._events_manager._emitter.emit(
27022731
MapLifecycleEvent.TRUNCATED.value, self._events_manager._named_map.name
27032732
)
2733+
2734+
2735+
class OperationNotSupportedError(Exception):
2736+
"""Exception raised when the requested operation is not supported by the server."""
2737+
2738+
pass

src/coherence/util.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
InvokeAllRequest,
4444
InvokeRequest,
4545
IsEmptyRequest,
46+
IsReadyRequest,
4647
KeySetRequest,
4748
MapListenerRequest,
4849
PageRequest,
@@ -557,6 +558,10 @@ def is_empty_request(self) -> IsEmptyRequest:
557558
r = IsEmptyRequest(scope=self._scope, cache=self._cache_name)
558559
return r
559560

561+
def is_ready_request(self) -> IsReadyRequest:
562+
r = IsReadyRequest(scope=self._scope, cache=self._cache_name)
563+
return r
564+
560565
def size_request(self) -> SizeRequest:
561566
r = SizeRequest(scope=self._scope, cache=self._cache_name)
562567
return r
@@ -1020,6 +1025,16 @@ def is_empty_request(self) -> UnaryDispatcher[bool]:
10201025
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
10211026
)
10221027

1028+
def is_ready_request(self) -> UnaryDispatcher[bool]:
1029+
named_cache_request = NamedCacheRequest(
1030+
type=NamedCacheRequestType.IsReady,
1031+
cacheId=self.cache_id,
1032+
)
1033+
1034+
return UnaryDispatcher(
1035+
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
1036+
)
1037+
10231038
def size_request(self) -> UnaryDispatcher[int]:
10241039
named_cache_request = NamedCacheRequest(
10251040
type=NamedCacheRequestType.Size,

tests/e2e/test_client.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
from grpc.aio import AioRpcError
1313

1414
import tests
15-
from coherence import Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
16-
from coherence.client import CacheOptions
15+
from coherence import COH_LOG, Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
16+
from coherence.client import CacheOptions, OperationNotSupportedError
1717
from coherence.event import MapLifecycleEvent
1818
from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor
1919
from coherence.processor import ExtractorProcessor
@@ -363,6 +363,16 @@ async def test_is_empty(cache: NamedCache[str, str]) -> None:
363363
assert r is True
364364

365365

366+
@pytest.mark.asyncio
367+
async def test_is_ready(cache: NamedCache[str, str]) -> None:
368+
try:
369+
r: bool = await cache.is_ready()
370+
assert r is True
371+
except OperationNotSupportedError as e:
372+
COH_LOG.info("Skipped " + str(e))
373+
pytest.skip("Server version does not support is_ready method.")
374+
375+
366376
# noinspection PyShadowingNames
367377
@pytest.mark.asyncio
368378
async def test_size(cache: NamedCache[str, str]) -> None:

0 commit comments

Comments
 (0)