Skip to content

Commit 7c1d0cc

Browse files
committed
IGNITE-14465 Add the ability to set and get cluster state
This closes #27
1 parent 7cbfe32 commit 7c1d0cc

20 files changed

+716
-83
lines changed

pyignite/aio_client.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from itertools import chain
1818
from typing import Iterable, Type, Union, Any, Dict
1919

20+
from .aio_cluster import AioCluster
2021
from .api import cache_get_node_partitions_async
2122
from .api.binary import get_binary_type_async, put_binary_type_async
2223
from .api.cache_config import cache_get_names_async
@@ -92,7 +93,7 @@ async def _connect(self, nodes):
9293

9394
if not self.partition_aware:
9495
try:
95-
if self.protocol_version is None:
96+
if self.protocol_context is None:
9697
# open connection before adding to the pool
9798
await conn.connect()
9899

@@ -120,7 +121,7 @@ async def _connect(self, nodes):
120121

121122
await asyncio.gather(*reconnect_coro, return_exceptions=True)
122123

123-
if self.protocol_version is None:
124+
if self.protocol_context is None:
124125
raise ReconnectError('Can not connect.')
125126

126127
async def close(self):
@@ -460,3 +461,11 @@ def sql(
460461
return AioSqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type,
461462
distributed_joins, local, replicated_only, enforce_join_order, collocated,
462463
lazy, include_field_names, max_rows, timeout)
464+
465+
def get_cluster(self) -> 'AioCluster':
466+
"""
467+
Gets client cluster facade.
468+
469+
:return: AioClient cluster facade.
470+
"""
471+
return AioCluster(self)

pyignite/aio_cluster.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
This module contains `AioCluster` that lets you get info and change state of the
18+
whole cluster asynchronously.
19+
"""
20+
from pyignite.api.cluster import cluster_get_state_async, cluster_set_state_async
21+
from pyignite.exceptions import ClusterError
22+
from pyignite.utils import status_to_exception
23+
24+
25+
class AioCluster:
26+
"""
27+
Ignite cluster abstraction. Users should never use this class directly,
28+
but construct its instances with
29+
:py:meth:`~pyignite.aio_client.AioClient.get_cluster` method instead.
30+
"""
31+
32+
def __init__(self, client: 'AioClient'):
33+
self._client = client
34+
35+
@status_to_exception(ClusterError)
36+
async def get_state(self):
37+
"""
38+
Gets current cluster state.
39+
40+
:return: Current cluster state. This is one of ClusterState.INACTIVE,
41+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
42+
"""
43+
return await cluster_get_state_async(await self._client.random_node())
44+
45+
@status_to_exception(ClusterError)
46+
async def set_state(self, state):
47+
"""
48+
Changes current cluster state to the given.
49+
50+
Note: Deactivation clears in-memory caches (without persistence)
51+
including the system caches.
52+
53+
:param state: New cluster state. This is one of ClusterState.INACTIVE,
54+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
55+
"""
56+
return await cluster_set_state_async(await self._client.random_node(), state)

pyignite/api/cluster.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
from pyignite.api import APIResult
16+
from pyignite.connection import AioConnection, Connection
17+
from pyignite.datatypes import Byte
18+
from pyignite.exceptions import NotSupportedByClusterError
19+
from pyignite.queries import Query, query_perform
20+
from pyignite.queries.op_codes import OP_CLUSTER_GET_STATE, OP_CLUSTER_CHANGE_STATE
21+
22+
23+
def cluster_get_state(connection: 'Connection', query_id=None) -> 'APIResult':
24+
"""
25+
Get cluster state.
26+
27+
:param connection: Connection to use,
28+
:param query_id: (optional) a value generated by client and returned as-is
29+
in response.query_id. When the parameter is omitted, a random value
30+
is generated,
31+
:return: API result data object. Contains zero status and a state
32+
retrieved on success, non-zero status and an error description on failure.
33+
"""
34+
return __cluster_get_state(connection, query_id)
35+
36+
37+
async def cluster_get_state_async(connection: 'AioConnection', query_id=None) -> 'APIResult':
38+
"""
39+
Async version of cluster_get_state
40+
"""
41+
return await __cluster_get_state(connection, query_id)
42+
43+
44+
def __post_process_get_state(result):
45+
if result.status == 0:
46+
result.value = result.value['state']
47+
return result
48+
49+
50+
def __cluster_get_state(connection, query_id):
51+
if not connection.protocol_context.is_cluster_api_supported():
52+
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
53+
54+
query_struct = Query(OP_CLUSTER_GET_STATE, query_id=query_id)
55+
return query_perform(
56+
query_struct, connection,
57+
response_config=[('state', Byte)],
58+
post_process_fun=__post_process_get_state
59+
)
60+
61+
62+
def cluster_set_state(connection: 'Connection', state: int, query_id=None) -> 'APIResult':
63+
"""
64+
Set cluster state.
65+
66+
:param connection: Connection to use,
67+
:param state: State to set,
68+
:param query_id: (optional) a value generated by client and returned as-is
69+
in response.query_id. When the parameter is omitted, a random value
70+
is generated,
71+
:return: API result data object. Contains zero status if a value
72+
is written, non-zero status and an error description otherwise.
73+
"""
74+
return __cluster_set_state(connection, state, query_id)
75+
76+
77+
async def cluster_set_state_async(connection: 'AioConnection', state: int, query_id=None) -> 'APIResult':
78+
"""
79+
Async version of cluster_get_state
80+
"""
81+
return await __cluster_set_state(connection, state, query_id)
82+
83+
84+
def __post_process_set_state(result):
85+
if result.status == 0:
86+
result.value = result.value['state']
87+
return result
88+
89+
90+
def __cluster_set_state(connection, state, query_id):
91+
if not connection.protocol_context.is_cluster_api_supported():
92+
raise NotSupportedByClusterError('Cluster API is not supported by the cluster')
93+
94+
query_struct = Query(
95+
OP_CLUSTER_CHANGE_STATE,
96+
[
97+
('state', Byte)
98+
],
99+
query_id=query_id
100+
)
101+
return query_perform(
102+
query_struct, connection,
103+
query_params={
104+
'state': state,
105+
}
106+
)

pyignite/client.py

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
from .api import cache_get_node_partitions
5050
from .api.binary import get_binary_type, put_binary_type
5151
from .api.cache_config import cache_get_names
52+
from .cluster import Cluster
5253
from .cursors import SqlFieldsCursor
5354
from .cache import Cache, create_cache, get_cache, get_or_create_cache, BaseCache
5455
from .connection import Connection
@@ -83,32 +84,32 @@ def __init__(self, compact_footer: bool = None, partition_aware: bool = False, *
8384
self._partition_aware = partition_aware
8485
self.affinity_version = (0, 0)
8586
self._affinity = {'version': self.affinity_version, 'partition_mapping': defaultdict(dict)}
86-
self._protocol_version = None
87+
self._protocol_context = None
8788

8889
@property
89-
def protocol_version(self):
90+
def protocol_context(self):
9091
"""
91-
Returns the tuple of major, minor, and revision numbers of the used
92-
thin protocol version, or None, if no connection to the Ignite cluster
93-
was not yet established.
92+
Returns protocol context, or None, if no connection to the Ignite
93+
cluster was not yet established.
9494
9595
This method is not a part of the public API. Unless you wish to
9696
extend the `pyignite` capabilities (with additional testing, logging,
9797
examining connections, et c.) you probably should not use it.
9898
"""
99-
return self._protocol_version
99+
return self._protocol_context
100100

101-
@protocol_version.setter
102-
def protocol_version(self, value):
103-
self._protocol_version = value
101+
@protocol_context.setter
102+
def protocol_context(self, value):
103+
self._protocol_context = value
104104

105105
@property
106106
def partition_aware(self):
107107
return self._partition_aware and self.partition_awareness_supported_by_protocol
108108

109109
@property
110110
def partition_awareness_supported_by_protocol(self):
111-
return self.protocol_version is not None and self.protocol_version >= (1, 4, 0)
111+
return self.protocol_context is not None \
112+
and self.protocol_context.is_partition_awareness_supported()
112113

113114
@property
114115
def compact_footer(self) -> bool:
@@ -379,7 +380,7 @@ def _connect(self, nodes):
379380
conn = Connection(self, host, port, **self._connection_args)
380381

381382
try:
382-
if self.protocol_version is None or self.partition_aware:
383+
if self.protocol_context is None or self.partition_aware:
383384
# open connection before adding to the pool
384385
conn.connect()
385386

@@ -396,7 +397,7 @@ def _connect(self, nodes):
396397

397398
self._nodes.append(conn)
398399

399-
if self.protocol_version is None:
400+
if self.protocol_context is None:
400401
raise ReconnectError('Can not connect.')
401402

402403
def close(self):
@@ -727,3 +728,11 @@ def sql(
727728
return SqlFieldsCursor(self, c_id, query_str, page_size, query_args, schema, statement_type, distributed_joins,
728729
local, replicated_only, enforce_join_order, collocated, lazy, include_field_names,
729730
max_rows, timeout)
731+
732+
def get_cluster(self) -> 'Cluster':
733+
"""
734+
Gets client cluster facade.
735+
736+
:return: Client cluster facade.
737+
"""
738+
return Cluster(self)

pyignite/cluster.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
"""
17+
This module contains `Cluster` that lets you get info and change state of the
18+
whole cluster.
19+
"""
20+
from pyignite.api.cluster import cluster_get_state, cluster_set_state
21+
from pyignite.exceptions import ClusterError
22+
from pyignite.utils import status_to_exception
23+
24+
25+
class Cluster:
26+
"""
27+
Ignite cluster abstraction. Users should never use this class directly,
28+
but construct its instances with
29+
:py:meth:`~pyignite.client.Client.get_cluster` method instead.
30+
"""
31+
32+
def __init__(self, client: 'Client'):
33+
self._client = client
34+
35+
@status_to_exception(ClusterError)
36+
def get_state(self):
37+
"""
38+
Gets current cluster state.
39+
40+
:return: Current cluster state. This is one of ClusterState.INACTIVE,
41+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
42+
"""
43+
return cluster_get_state(self._client.random_node)
44+
45+
@status_to_exception(ClusterError)
46+
def set_state(self, state):
47+
"""
48+
Changes current cluster state to the given.
49+
50+
Note: Deactivation clears in-memory caches (without persistence)
51+
including the system caches.
52+
53+
:param state: New cluster state. This is one of ClusterState.INACTIVE,
54+
ClusterState.ACTIVE or ClusterState.ACTIVE_READ_ONLY.
55+
"""
56+
return cluster_set_state(self._client.random_node, state)

0 commit comments

Comments
 (0)