Skip to content

Commit 082f02c

Browse files
committed
More updates
* Removed unnecessary client components * Converted partition service to async * Ported compact compatibility test
1 parent a4697a4 commit 082f02c

File tree

8 files changed

+816
-38
lines changed

8 files changed

+816
-38
lines changed

hazelcast/asyncio/client.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@
88
from hazelcast.config import Config, IndexConfig
99
from hazelcast.internal.asyncio_connection import ConnectionManager, DefaultAddressProvider
1010
from hazelcast.core import DistributedObjectEvent, DistributedObjectInfo
11-
from hazelcast.cp import CPSubsystem, ProxySessionManager
1211
from hazelcast.discovery import HazelcastCloudAddressProvider
1312
from hazelcast.errors import IllegalStateError, InvalidConfigurationError
1413
from hazelcast.internal.asyncio_invocation import InvocationService, Invocation
1514
from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection
1615
from hazelcast.lifecycle import LifecycleService, LifecycleState, _InternalLifecycleService
1716
from hazelcast.internal.asyncio_listener import ClusterViewListenerService, ListenerService
1817
from hazelcast.near_cache import NearCacheManager
19-
from hazelcast.partition import PartitionService, _InternalPartitionService
18+
from hazelcast.internal.asyncio_partition import PartitionService, InternalPartitionService
2019
from hazelcast.protocol.codec import (
2120
client_add_distributed_object_listener_codec,
2221
client_get_distributed_objects_codec,
@@ -34,7 +33,7 @@
3433
from hazelcast.serialization import SerializationServiceV1
3534
from hazelcast.sql import SqlService, _InternalSqlService
3635
from hazelcast.internal.asyncio_statistics import Statistics
37-
from hazelcast.types import KeyType, ValueType, ItemType, MessageType
36+
from hazelcast.types import KeyType, ValueType
3837
from hazelcast.util import AtomicInteger, RoundRobinLB
3938

4039
__all__ = ("HazelcastClient",)
@@ -84,7 +83,7 @@ def __init__(self, config: Config | None = None, **kwargs):
8483
self._config,
8584
)
8685
self._address_provider = self._create_address_provider()
87-
self._internal_partition_service = _InternalPartitionService(self)
86+
self._internal_partition_service = InternalPartitionService(self)
8887
self._partition_service = PartitionService(
8988
self._internal_partition_service,
9089
self._serialization_service,
@@ -111,8 +110,6 @@ def __init__(self, config: Config | None = None, **kwargs):
111110
self._compact_schema_service,
112111
)
113112
self._proxy_manager = ProxyManager(self._context)
114-
self._cp_subsystem = CPSubsystem(self._context)
115-
self._proxy_session_manager = ProxySessionManager(self._context)
116113
self._lock_reference_id_generator = AtomicInteger(1)
117114
self._statistics = Statistics(
118115
self,
@@ -159,7 +156,6 @@ def _init_context(self):
159156
self._near_cache_manager,
160157
self._lock_reference_id_generator,
161158
self._name,
162-
self._proxy_session_manager,
163159
self._reactor,
164160
self._compact_schema_service,
165161
)
@@ -278,7 +274,6 @@ async def shutdown(self) -> None:
278274
if self._internal_lifecycle_service.running:
279275
self._internal_lifecycle_service.fire_lifecycle_event(LifecycleState.SHUTTING_DOWN)
280276
self._internal_lifecycle_service.shutdown()
281-
self._proxy_session_manager.shutdown().result()
282277
self._near_cache_manager.destroy_near_caches()
283278
await self._connection_manager.shutdown()
284279
self._invocation_service.shutdown()
@@ -301,10 +296,6 @@ def partition_service(self) -> PartitionService:
301296
def cluster_service(self) -> ClusterService:
302297
return self._cluster_service
303298

304-
@property
305-
def cp_subsystem(self) -> CPSubsystem:
306-
return self._cp_subsystem
307-
308299
def _create_address_provider(self):
309300
config = self._config
310301
cluster_members = config.cluster_members
@@ -360,7 +351,6 @@ def __init__(self):
360351
self.near_cache_manager = None
361352
self.lock_reference_id_generator = None
362353
self.name = None
363-
self.proxy_session_manager = None
364354
self.reactor = None
365355
self.compact_schema_service = None
366356

@@ -378,7 +368,6 @@ def init_context(
378368
near_cache_manager,
379369
lock_reference_id_generator,
380370
name,
381-
proxy_session_manager,
382371
reactor,
383372
compact_schema_service,
384373
):
@@ -394,6 +383,5 @@ def init_context(
394383
self.near_cache_manager = near_cache_manager
395384
self.lock_reference_id_generator = lock_reference_id_generator
396385
self.name = name
397-
self.proxy_session_manager = proxy_session_manager
398386
self.reactor = reactor
399387
self.compact_schema_service = compact_schema_service

hazelcast/internal/asyncio_compact.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,10 @@ async def send_schema_and_retry(
7171
async def callback():
7272
self._has_replicated_schemas = True
7373
self._compact_serializer.register_schema_to_type(schema, clazz)
74-
return await func(*args, **kwargs)
74+
maybe_coro = func(*args, **kwargs)
75+
# maybe_coro maybe a coroutine or None
76+
if maybe_coro:
77+
return await maybe_coro
7578

7679
return await self._replicate_schema(
7780
schema, request, CompactSchemaService._SEND_SCHEMA_RETRY_COUNT, callback()
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
import logging
2+
import uuid
3+
4+
import typing
5+
6+
from hazelcast.errors import ClientOfflineError
7+
from hazelcast.hash import hash_to_index
8+
from hazelcast.serialization.compact import SchemaNotReplicatedError
9+
10+
_logger = logging.getLogger(__name__)
11+
12+
13+
class _PartitionTable:
14+
__slots__ = ("connection", "version", "partitions")
15+
16+
def __init__(self, connection, version, partitions):
17+
self.connection = connection
18+
self.version = version
19+
self.partitions = partitions
20+
21+
def __repr__(self):
22+
return "PartitionTable(connection=%s, version=%s)" % (self.connection, self.version)
23+
24+
25+
class PartitionService:
26+
"""
27+
Allows retrieving information about the partition count, the partition
28+
owner or the partition id of a key.
29+
"""
30+
31+
__slots__ = ("_service", "_serialization_service", "_send_schema_and_retry_fn")
32+
33+
def __init__(self, internal_partition_service, serialization_service, send_schema_and_retry_fn):
34+
self._service = internal_partition_service
35+
self._serialization_service = serialization_service
36+
self._send_schema_and_retry_fn = send_schema_and_retry_fn
37+
38+
def get_partition_owner(self, partition_id: int) -> typing.Optional[uuid.UUID]:
39+
"""
40+
Returns the owner of the partition if it's set, ``None`` otherwise.
41+
42+
Args:
43+
partition_id: The partition id.
44+
45+
Returns:
46+
Owner of the partition
47+
"""
48+
return self._service.get_partition_owner(partition_id)
49+
50+
async def get_partition_id(self, key: typing.Any) -> int:
51+
"""
52+
Returns the partition id for a key data.
53+
54+
Args:
55+
key: The given key.
56+
57+
Returns:
58+
The partition id.
59+
"""
60+
try:
61+
key_data = self._serialization_service.to_data(key)
62+
except SchemaNotReplicatedError as e:
63+
await self._send_schema_and_retry_fn(e, lambda: None)
64+
return await self.get_partition_id(key)
65+
66+
return self._service.get_partition_id(key_data)
67+
68+
def get_partition_count(self) -> int:
69+
"""
70+
Returns partition count of the connected cluster.
71+
72+
If partition table is not fetched yet, this method returns ``0``.
73+
74+
Returns:
75+
The partition count
76+
"""
77+
return self._service.partition_count
78+
79+
80+
class InternalPartitionService:
81+
__slots__ = ("partition_count", "_client", "_partition_table")
82+
83+
def __init__(self, client):
84+
self.partition_count = 0
85+
self._client = client
86+
self._partition_table = _PartitionTable(None, -1, {})
87+
88+
def handle_partitions_view_event(self, connection, partitions, version):
89+
_logger.debug("Handling new partition table with version: %s", version)
90+
91+
table = self._partition_table
92+
if not self._should_be_applied(connection, partitions, version, table):
93+
return
94+
95+
new_partitions = self._prepare_partitions(partitions)
96+
new_table = _PartitionTable(connection, version, new_partitions)
97+
self._partition_table = new_table
98+
99+
def get_partition_owner(self, partition_id):
100+
return self._partition_table.partitions.get(partition_id, None)
101+
102+
def get_partition_id(self, key):
103+
if self.partition_count == 0:
104+
# Partition count can not be zero for the SYNC mode.
105+
# On the SYNC mode, we are waiting for the first connection to be established.
106+
# We are initializing the partition count with the value coming from the server with authentication.
107+
# This error is used only for ASYNC mode client.
108+
raise ClientOfflineError()
109+
return hash_to_index(key.get_partition_hash(), self.partition_count)
110+
111+
def check_and_set_partition_count(self, partition_count):
112+
if self.partition_count == 0:
113+
self.partition_count = partition_count
114+
return True
115+
return self.partition_count == partition_count
116+
117+
@classmethod
118+
def _should_be_applied(cls, connection, partitions, version, current):
119+
if not partitions:
120+
_logger.debug(
121+
"Partition view will not be applied since response is empty. "
122+
"Sending connection: %s, version: %s, current table: %s",
123+
connection,
124+
version,
125+
current,
126+
)
127+
return False
128+
129+
if connection != current.connection:
130+
_logger.debug(
131+
"Partition view event coming from a new connection. Old: %s, new: %s",
132+
current.connection,
133+
connection,
134+
)
135+
return True
136+
137+
if version <= current.version:
138+
_logger.debug(
139+
"Partition view will not be applied since response state version is older. "
140+
"Sending connection: %s, version: %s, current table: %s",
141+
connection,
142+
version,
143+
current,
144+
)
145+
return False
146+
147+
return True
148+
149+
@classmethod
150+
def _prepare_partitions(cls, partitions):
151+
new_partitions = {}
152+
for uuid, partition_list in partitions:
153+
for partition in partition_list:
154+
new_partitions[partition] = uuid
155+
return new_partitions
156+
157+
158+
def string_partition_strategy(key):
159+
if key is None:
160+
return None
161+
try:
162+
index_of = key.index("@")
163+
return key[index_of + 1 :]
164+
except ValueError:
165+
return key

hazelcast/internal/asyncio_proxy/base.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
from hazelcast.core import MemberInfo
77
from hazelcast.types import KeyType, ValueType, ItemType, MessageType, BlockingProxyType
88
from hazelcast.internal.asyncio_invocation import Invocation
9-
from hazelcast.partition import string_partition_strategy
109
from hazelcast.util import get_attr_name
1110

1211
MAX_SIZE = float("inf")
@@ -92,22 +91,6 @@ async def _ainvoke_on_partition(
9291
return await fut
9392

9493

95-
class PartitionSpecificProxy(Proxy[BlockingProxyType], abc.ABC):
96-
"""Provides basic functionality for Partition Specific Proxies."""
97-
98-
def __init__(self, service_name, name, context):
99-
super(PartitionSpecificProxy, self).__init__(service_name, name, context)
100-
partition_key = context.serialization_service.to_data(string_partition_strategy(self.name))
101-
self._partition_id = context.partition_service.get_partition_id(partition_key)
102-
103-
def _invoke(self, request, response_handler=_no_op_response_handler):
104-
invocation = Invocation(
105-
request, partition_id=self._partition_id, response_handler=response_handler
106-
)
107-
self._invocation_service.invoke(invocation)
108-
return invocation.future
109-
110-
11194
class ItemEventType:
11295
"""Type of item events."""
11396

tests/integration/asyncio/listener_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ async def _remove_member_test(self, is_smart):
5252
self.client_config["smart_routing"] = is_smart
5353
client = await self.create_client(self.client_config)
5454
await wait_for_partition_table(client)
55-
key_m1 = generate_key_owned_by_instance(client, self.m1.uuid)
55+
key_m1 = await generate_key_owned_by_instance(client, self.m1.uuid)
5656
random_map = await client.get_map(random_string())
5757
await random_map.add_entry_listener(added_func=self.collector)
5858
await asyncio.to_thread(self.m1.shutdown)
@@ -92,7 +92,7 @@ async def _add_member_test(self, is_smart):
9292
await random_map.add_entry_listener(added_func=self.collector, updated_func=self.collector)
9393
m2 = await asyncio.to_thread(self.cluster.start_member)
9494
await wait_for_partition_table(client)
95-
key_m2 = generate_key_owned_by_instance(client, m2.uuid)
95+
key_m2 = await generate_key_owned_by_instance(client, m2.uuid)
9696
assertion_succeeded = False
9797

9898
async def run():

tests/integration/asyncio/serialization/compact_compatibility/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)