Skip to content

COH-32930 - Add functionality for is_ready() method on both gRPC v0 and v1 implementations #281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Aug 7, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 38 additions & 0 deletions src/coherence/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,17 @@ async def is_empty(self) -> bool:
:return: `true` if this map contains no key-value mappings.
"""

@abc.abstractmethod
async def is_ready(self) -> bool:
"""
Returns `true` if this map is ready to be used.
An example of when this method would return `false` would be where a
partitioned cache service that owns this cache has no storage-enabled
members.

:return: `true`if this map is ready to be used.
"""

@abc.abstractmethod
async def size(self) -> int:
"""
Expand Down Expand Up @@ -846,6 +857,22 @@ async def is_empty(self) -> bool:
v = await self._client_stub.isEmpty(r)
return self._request_factory.serializer.deserialize(v.value)

@_pre_call_cache
async def is_ready(self) -> bool:
try:
r = self._request_factory.is_ready_request()
v = await self._client_stub.isReady(r)
return self._request_factory.serializer.deserialize(v.value)
except grpc.aio._call.AioRpcError as e:
if e.code() == grpc.StatusCode.UNIMPLEMENTED:
raise OperationNotSupportedError(
"This operation is not supported by the gRPC proxy for the connected Coherence Server."
"Please upgrade to a version that supports this operation."
) from e
raise # Re-raise all other gRPC errors
except Exception as e:
raise RuntimeError("An unexpected error occurred in is_ready()") from e

@_pre_call_cache
async def size(self) -> int:
r = self._request_factory.size_request()
Expand Down Expand Up @@ -1364,6 +1391,11 @@ async def is_empty(self) -> bool:
await dispatcher.dispatch(self._stream_handler)
return dispatcher.result()

async def is_ready(self) -> bool:
dispatcher: UnaryDispatcher[bool] = self._request_factory.is_ready_request()
await dispatcher.dispatch(self._stream_handler)
return dispatcher.result()

@_pre_call_cache
async def size(self) -> int:
dispatcher: UnaryDispatcher[int] = self._request_factory.size_request()
Expand Down Expand Up @@ -2701,3 +2733,9 @@ async def handle_zero_id_response(self, response: ProxyResponse) -> None:
self._events_manager._emitter.emit(
MapLifecycleEvent.TRUNCATED.value, self._events_manager._named_map.name
)


class OperationNotSupportedError(Exception):
"""Exception raised when the requested operation is not supported by the server."""

pass
15 changes: 15 additions & 0 deletions src/coherence/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
InvokeAllRequest,
InvokeRequest,
IsEmptyRequest,
IsReadyRequest,
KeySetRequest,
MapListenerRequest,
PageRequest,
Expand Down Expand Up @@ -557,6 +558,10 @@ def is_empty_request(self) -> IsEmptyRequest:
r = IsEmptyRequest(scope=self._scope, cache=self._cache_name)
return r

def is_ready_request(self) -> IsReadyRequest:
r = IsReadyRequest(scope=self._scope, cache=self._cache_name)
return r

def size_request(self) -> SizeRequest:
r = SizeRequest(scope=self._scope, cache=self._cache_name)
return r
Expand Down Expand Up @@ -1020,6 +1025,16 @@ def is_empty_request(self) -> UnaryDispatcher[bool]:
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
)

def is_ready_request(self) -> UnaryDispatcher[bool]:
named_cache_request = NamedCacheRequest(
type=NamedCacheRequestType.IsReady,
cacheId=self.cache_id,
)

return UnaryDispatcher(
self.request_timeout, self.create_proxy_request(named_cache_request), BoolValueTransformer(self._serializer)
)

def size_request(self) -> UnaryDispatcher[int]:
named_cache_request = NamedCacheRequest(
type=NamedCacheRequestType.Size,
Expand Down
16 changes: 13 additions & 3 deletions tests/e2e/test_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022, 2024, Oracle and/or its affiliates.
# Copyright (c) 2022, 2025, Oracle and/or its affiliates.
# Licensed under the Universal Permissive License v 1.0 as shown at
# https://oss.oracle.com/licenses/upl.

Expand All @@ -12,8 +12,8 @@
from grpc.aio import AioRpcError

import tests
from coherence import Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
from coherence.client import CacheOptions
from coherence import COH_LOG, Aggregators, Filters, MapEntry, NamedCache, Session, request_timeout
from coherence.client import CacheOptions, OperationNotSupportedError
from coherence.event import MapLifecycleEvent
from coherence.extractor import ChainedExtractor, Extractors, UniversalExtractor
from coherence.processor import ExtractorProcessor
Expand Down Expand Up @@ -363,6 +363,16 @@ async def test_is_empty(cache: NamedCache[str, str]) -> None:
assert r is True


@pytest.mark.asyncio
async def test_is_ready(cache: NamedCache[str, str]) -> None:
try:
r: bool = await cache.is_ready()
assert r is True
except OperationNotSupportedError as e:
COH_LOG.info("Skipped " + str(e))
pytest.skip("Server version does not support is_ready method.")


# noinspection PyShadowingNames
@pytest.mark.asyncio
async def test_size(cache: NamedCache[str, str]) -> None:
Expand Down