Skip to content

Commit 6d38aeb

Browse files
authored
COH-32930 - Add functionality for is_ready() method on both gRPC v0 and v1 implementations (#281)
* COH-32930 - Add functionality for is_ready() method on both gRPC v0 and v1 implementations
1 parent c566970 commit 6d38aeb

File tree

3 files changed

+64
-3
lines changed

3 files changed

+64
-3
lines changed

src/coherence/client.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -517,6 +517,17 @@ async def is_empty(self) -> bool:
517517
:return: `true` if this map contains no key-value mappings.
518518
"""
519519

520+
@abc.abstractmethod
521+
async def is_ready(self) -> bool:
522+
"""
523+
Returns `true` if this map is ready to be used.
524+
An example of when this method would return `false` would be where a
525+
partitioned cache service that owns this cache has no storage-enabled
526+
members.
527+
528+
:return: `true` if this map is ready to be used.
529+
"""
530+
520531
@abc.abstractmethod
521532
async def size(self) -> int:
522533
"""
@@ -846,6 +857,20 @@ async def is_empty(self) -> bool:
846857
v = await self._client_stub.isEmpty(r)
847858
return self._request_factory.serializer.deserialize(v.value)
848859

860+
@_pre_call_cache
861+
async def is_ready(self) -> bool:
862+
try:
863+
r = self._request_factory.is_ready_request()
864+
v = await self._client_stub.isReady(r)
865+
return self._request_factory.serializer.deserialize(v.value)
866+
except grpc.aio._call.AioRpcError as e:
867+
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
868+
raise OperationNotSupportedError(
869+
"This operation is not supported by the gRPC proxy for the connected Coherence Server."
870+
"Please upgrade to a version that supports this operation."
871+
) from e
872+
raise # Re-raise all other gRPC errors
873+
849874
@_pre_call_cache
850875
async def size(self) -> int:
851876
r = self._request_factory.size_request()
@@ -1364,6 +1389,11 @@ async def is_empty(self) -> bool:
13641389
await dispatcher.dispatch(self._stream_handler)
13651390
return dispatcher.result()
13661391

1392+
async def is_ready(self) -> bool:
1393+
dispatcher: UnaryDispatcher[bool] = self._request_factory.is_ready_request()
1394+
await dispatcher.dispatch(self._stream_handler)
1395+
return dispatcher.result()
1396+
13671397
@_pre_call_cache
13681398
async def size(self) -> int:
13691399
dispatcher: UnaryDispatcher[int] = self._request_factory.size_request()
@@ -2701,3 +2731,9 @@ async def handle_zero_id_response(self, response: ProxyResponse) -> None:
27012731
self._events_manager._emitter.emit(
27022732
MapLifecycleEvent.TRUNCATED.value, self._events_manager._named_map.name
27032733
)
2734+
2735+
2736+
class OperationNotSupportedError(Exception):
2737+
"""Exception raised when the requested operation is not supported by the server."""
2738+
2739+
pass

src/coherence/util.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
InvokeAllRequest,
4444
InvokeRequest,
4545
IsEmptyRequest,
46+
IsReadyRequest,
4647
KeySetRequest,
4748
MapListenerRequest,
4849
PageRequest,
@@ -557,6 +558,10 @@ def is_empty_request(self) -> IsEmptyRequest:
557558
r = IsEmptyRequest(scope=self._scope, cache=self._cache_name)
558559
return r
559560

561+
def is_ready_request(self) -> IsReadyRequest:
562+
r = IsReadyRequest(scope=self._scope, cache=self._cache_name)
563+
return r
564+
560565
def size_request(self) -> SizeRequest:
561566
r = SizeRequest(scope=self._scope, cache=self._cache_name)
562567
return r
@@ -1020,6 +1025,16 @@ def is_empty_request(self) -> UnaryDispatcher[bool]:
10201025
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
10211026
)
10221027

1028+
def is_ready_request(self) -> UnaryDispatcher[bool]:
1029+
named_cache_request = NamedCacheRequest(
1030+
type=NamedCacheRequestType.IsReady,
1031+
cacheId=self.cache_id,
1032+
)
1033+
1034+
return UnaryDispatcher(
1035+
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
1036+
)
1037+
10231038
def size_request(self) -> UnaryDispatcher[int]:
10241039
named_cache_request = NamedCacheRequest(
10251040
type=NamedCacheRequestType.Size,

tests/e2e/test_client.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
# Copyright (c) 2022, 2024, Oracle and/or its affiliates.
1+
# Copyright (c) 2022, 2025, Oracle and/or its affiliates.
22
# Licensed under the Universal Permissive License v 1.0 as shown at
33
# https://oss.oracle.com/licenses/upl.
44

@@ -12,8 +12,8 @@
1212
from grpc.aio import AioRpcError
1313

1414
import tests
15-
from coherence import Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
16-
from coherence.client import CacheOptions
15+
from coherence import COH_LOG, Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
16+
from coherence.client import CacheOptions, OperationNotSupportedError
1717
from coherence.event import MapLifecycleEvent
1818
from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor
1919
from coherence.processor import ExtractorProcessor
@@ -363,6 +363,16 @@ async def test_is_empty(cache: NamedCache[str, str]) -> None:
363363
assert r is True
364364

365365

366+
@pytest.mark.asyncio
367+
async def test_is_ready(cache: NamedCache[str, str]) -> None:
368+
try:
369+
r: bool = await cache.is_ready()
370+
assert r is True
371+
except OperationNotSupportedError as e:
372+
COH_LOG.info("Skipped " + str(e))
373+
pytest.skip("Server version does not support is_ready method.")
374+
375+
366376
# noinspection PyShadowingNames
367377
@pytest.mark.asyncio
368378
async def test_size(cache: NamedCache[str, str]) -> None:

0 commit comments

Comments
 (0)