diff --git a/src/coherence/client.py b/src/coherence/client.py index e193d9d..916c9ab 100644 --- a/src/coherence/client.py +++ b/src/coherence/client.py @@ -517,6 +517,17 @@ async def is_empty(self) -> bool: :return: `true` if this map contains no key-value mappings. """ + @abc.abstractmethod + async def is_ready(self) -> bool: + """ + Returns `true` if this map is ready to be used. + An example of when this method would return `false` would be where a + partitioned cache service that owns this cache has no storage-enabled + members. + + :return: `true` if this map is ready to be used. + """ + @abc.abstractmethod async def size(self) -> int: """ @@ -846,6 +857,20 @@ async def is_empty(self) -> bool: v = await self._client_stub.isEmpty(r) return self._request_factory.serializer.deserialize(v.value) + @_pre_call_cache + async def is_ready(self) -> bool: + try: + r = self._request_factory.is_ready_request() + v = await self._client_stub.isReady(r) + return self._request_factory.serializer.deserialize(v.value) + except grpc.aio._call.AioRpcError as e: + if e.code() == grpc.StatusCode.UNIMPLEMENTED: + raise OperationNotSupportedError( + "This operation is not supported by the gRPC proxy for the connected Coherence Server." + "Please upgrade to a version that supports this operation." + ) from e + raise # Re-raise all other gRPC errors + @_pre_call_cache async def size(self) -> int: r = self._request_factory.size_request() @@ -1364,6 +1389,11 @@ async def is_empty(self) -> bool: await dispatcher.dispatch(self._stream_handler) return dispatcher.result() + async def is_ready(self) -> bool: + dispatcher: UnaryDispatcher[bool] = self._request_factory.is_ready_request() + await dispatcher.dispatch(self._stream_handler) + return dispatcher.result() + @_pre_call_cache async def size(self) -> int: dispatcher: UnaryDispatcher[int] = self._request_factory.size_request() @@ -2701,3 +2731,9 @@ async def handle_zero_id_response(self, response: ProxyResponse) -> None: self._events_manager._emitter.emit( MapLifecycleEvent.TRUNCATED.value, self._events_manager._named_map.name ) + + +class OperationNotSupportedError(Exception): + """Exception raised when the requested operation is not supported by the server.""" + + pass diff --git a/src/coherence/util.py b/src/coherence/util.py index 45872a0..423f7ba 100644 --- a/src/coherence/util.py +++ b/src/coherence/util.py @@ -43,6 +43,7 @@ InvokeAllRequest, InvokeRequest, IsEmptyRequest, + IsReadyRequest, KeySetRequest, MapListenerRequest, PageRequest, @@ -557,6 +558,10 @@ def is_empty_request(self) -> IsEmptyRequest: r = IsEmptyRequest(scope=self._scope, cache=self._cache_name) return r + def is_ready_request(self) -> IsReadyRequest: + r = IsReadyRequest(scope=self._scope, cache=self._cache_name) + return r + def size_request(self) -> SizeRequest: r = SizeRequest(scope=self._scope, cache=self._cache_name) return r @@ -1020,6 +1025,16 @@ def is_empty_request(self) -> UnaryDispatcher[bool]: self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer) ) + def is_ready_request(self) -> UnaryDispatcher[bool]: + named_cache_request = NamedCacheRequest( + type=NamedCacheRequestType.IsReady, + cacheId=self.cache_id, + ) + + return UnaryDispatcher( + self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer) + ) + def size_request(self) -> UnaryDispatcher[int]: named_cache_request = NamedCacheRequest( type=NamedCacheRequestType.Size, diff --git a/tests/e2e/test_client.py b/tests/e2e/test_client.py index c017b8d..4404674 100644 --- a/tests/e2e/test_client.py +++ b/tests/e2e/test_client.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022, 2024, Oracle and/or its affiliates. +# Copyright (c) 2022, 2025, Oracle and/or its affiliates. # Licensed under the Universal Permissive License v 1.0 as shown at # https://oss.oracle.com/licenses/upl. @@ -12,8 +12,8 @@ from grpc.aio import AioRpcError import tests -from coherence import Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout -from coherence.client import CacheOptions +from coherence import COH_LOG, Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout +from coherence.client import CacheOptions, OperationNotSupportedError from coherence.event import MapLifecycleEvent from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor from coherence.processor import ExtractorProcessor @@ -363,6 +363,16 @@ async def test_is_empty(cache: NamedCache[str, str]) -> None: assert r is True +@pytest.mark.asyncio +async def test_is_ready(cache: NamedCache[str, str]) -> None: + try: + r: bool = await cache.is_ready() + assert r is True + except OperationNotSupportedError as e: + COH_LOG.info("Skipped " + str(e)) + pytest.skip("Server version does not support is_ready method.") + + # noinspection PyShadowingNames @pytest.mark.asyncio async def test_size(cache: NamedCache[str, str]) -> None: