Skip to content

Commit 9d52740

Browse files
committed
Adding cluster API
1 parent a171df7 commit 9d52740

File tree

4 files changed

+377
-0
lines changed

4 files changed

+377
-0
lines changed

arangoasync/cluster.py

Lines changed: 261 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,261 @@
1+
__all__ = ["Cluster"]
2+
3+
from typing import List, Optional
4+
5+
from arangoasync.exceptions import (
6+
ClusterEndpointsError,
7+
ClusterHealthError,
8+
ClusterMaintenanceModeError,
9+
ClusterServerIDError,
10+
ClusterServerRoleError,
11+
ClusterStatisticsError,
12+
)
13+
from arangoasync.executor import ApiExecutor
14+
from arangoasync.request import Method, Request
15+
from arangoasync.response import Response
16+
from arangoasync.result import Result
17+
from arangoasync.serialization import Deserializer, Serializer
18+
from arangoasync.typings import Json, Jsons, Params
19+
20+
21+
class Cluster:
22+
"""Cluster-specific endpoints."""
23+
24+
def __init__(self, executor: ApiExecutor) -> None:
25+
self._executor = executor
26+
27+
@property
28+
def serializer(self) -> Serializer[Json]:
29+
"""Return the serializer."""
30+
return self._executor.serializer
31+
32+
@property
33+
def deserializer(self) -> Deserializer[Json, Jsons]:
34+
"""Return the deserializer."""
35+
return self._executor.deserializer
36+
37+
async def health(self) -> Result[Json]:
38+
"""Queries the health of the cluster.
39+
40+
Returns:
41+
dict: Health status of the cluster.
42+
43+
Raises:
44+
ClusterHealthError: If retrieval fails.
45+
46+
References:
47+
- `get-the-cluster-health <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-cluster-health>`__
48+
""" # noqa: E501
49+
request = Request(
50+
method=Method.GET,
51+
endpoint="/_admin/cluster/health",
52+
prefix_needed=False,
53+
)
54+
55+
def response_handler(resp: Response) -> Json:
56+
if not resp.is_success:
57+
raise ClusterHealthError(resp, request)
58+
result: Json = self.deserializer.loads(resp.raw_body)
59+
return Response.format_body(result)
60+
61+
return await self._executor.execute(request, response_handler)
62+
63+
async def statistics(self, db_server: str) -> Result[Json]:
64+
"""Queries the statistics of the given DB-Server.
65+
66+
Args:
67+
db_server (str): The ID of the DB-Server.
68+
69+
Returns:
70+
dict: Statistics of the DB-Server.
71+
72+
Raises:
73+
ClusterStatisticsError: If retrieval fails.
74+
75+
References:
76+
- `get-the-statistics-of-a-db-server <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-statistics-of-a-db-server>`__
77+
""" # noqa: E501
78+
params: Params = {"DBserver": db_server}
79+
80+
request = Request(
81+
method=Method.GET,
82+
endpoint="/_admin/cluster/statistics",
83+
prefix_needed=False,
84+
params=params,
85+
)
86+
87+
def response_handler(resp: Response) -> Json:
88+
if not resp.is_success:
89+
raise ClusterStatisticsError(resp, request)
90+
result: Json = self.deserializer.loads(resp.raw_body)
91+
return Response.format_body(result)
92+
93+
return await self._executor.execute(request, response_handler)
94+
95+
async def endpoints(self) -> Result[List[str]]:
96+
"""Fetch all coordinator endpoints.
97+
98+
Returns:
99+
list: List of coordinator endpoints.
100+
101+
Raises:
102+
ClusterEndpointsError: If retrieval fails.
103+
104+
References:
105+
- `list-all-coordinator-endpoints <https://docs.arangodb.com/stable/develop/http-api/cluster/#list-all-coordinator-endpoints>`__
106+
""" # noqa: E501
107+
request = Request(
108+
method=Method.GET,
109+
endpoint="/_api/cluster/endpoints",
110+
prefix_needed=False,
111+
)
112+
113+
def response_handler(resp: Response) -> List[str]:
114+
if not resp.is_success:
115+
raise ClusterEndpointsError(resp, request)
116+
body: Json = self.deserializer.loads(resp.raw_body)
117+
return [item["endpoint"] for item in body["endpoints"]]
118+
119+
return await self._executor.execute(request, response_handler)
120+
121+
async def server_id(self) -> Result[str]:
122+
"""Get the ID of the current server.
123+
124+
Returns:
125+
str: Server ID.
126+
127+
Raises:
128+
ClusterServerIDError: If retrieval fails.
129+
130+
References:
131+
- `get-the-server-id <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-server-id>`__
132+
""" # noqa: E501
133+
request = Request(
134+
method=Method.GET,
135+
endpoint="/_admin/server/id",
136+
prefix_needed=False,
137+
)
138+
139+
def response_handler(resp: Response) -> str:
140+
if not resp.is_success:
141+
raise ClusterServerIDError(resp, request)
142+
return str(self.deserializer.loads(resp.raw_body)["id"])
143+
144+
return await self._executor.execute(request, response_handler)
145+
146+
async def server_role(self) -> Result[str]:
147+
"""Get the role of the current server
148+
149+
Returns:
150+
str: Server role. Possible values: "SINGLE", "COORDINATOR", "PRIMARY", "SECONDARY", "AGENT", "UNDEFINED".
151+
152+
Raises:
153+
ClusterServerRoleError: If retrieval fails.
154+
155+
References:
156+
- `get-the-server-role <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-server-role>`__
157+
""" # noqa: E501
158+
request = Request(
159+
method=Method.GET,
160+
endpoint="/_admin/server/role",
161+
prefix_needed=False,
162+
)
163+
164+
def response_handler(resp: Response) -> str:
165+
if not resp.is_success:
166+
raise ClusterServerRoleError(resp, request)
167+
return str(self.deserializer.loads(resp.raw_body)["role"])
168+
169+
return await self._executor.execute(request, response_handler)
170+
171+
async def toggle_maintenance_mode(self, mode: str) -> Result[Json]:
172+
"""Enable or disable the cluster supervision (agency) maintenance mode.
173+
174+
Args:
175+
mode (str): Maintenance mode. Allowed values are "on" or "off".
176+
177+
Returns:
178+
dict: Result of the operation.
179+
180+
Raises:
181+
ClusterMaintenanceModeError: If the toggle operation fails.
182+
183+
References:
184+
- `toggle-cluster-maintenance-mode <https://docs.arangodb.com/stable/develop/http-api/cluster/#toggle-cluster-maintenance-mode>`__
185+
""" # noqa: E501
186+
request = Request(
187+
method=Method.PUT,
188+
endpoint="/_admin/cluster/maintenance",
189+
prefix_needed=False,
190+
data=f'"{mode}"',
191+
)
192+
193+
def response_handler(resp: Response) -> Json:
194+
if not resp.is_success:
195+
raise ClusterMaintenanceModeError(resp, request)
196+
result: Json = self.deserializer.loads(resp.raw_body)
197+
return Response.format_body(result)
198+
199+
return await self._executor.execute(request, response_handler)
200+
201+
async def server_maintenance_mode(self, server_id: str) -> Result[Json]:
202+
"""Check whether the specified DB-Server is in maintenance mode and until when.
203+
204+
Args:
205+
server_id (str): Server ID.
206+
207+
Returns:
208+
dict: Maintenance status for the given server.
209+
210+
Raises:
211+
ClusterMaintenanceModeError: If retrieval fails.
212+
213+
References:
214+
- `get-the-maintenance-status-of-a-db-server <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-maintenance-status-of-a-db-server>`__
215+
""" # noqa: E501
216+
request = Request(
217+
method=Method.GET,
218+
endpoint=f"/_admin/cluster/maintenance/{server_id}",
219+
prefix_needed=False,
220+
)
221+
222+
def response_handler(resp: Response) -> Json:
223+
if not resp.is_success:
224+
raise ClusterMaintenanceModeError(resp, request)
225+
result: Json = self.deserializer.loads(resp.raw_body)
226+
return Response.format_body(result)
227+
228+
return await self._executor.execute(request, response_handler)
229+
230+
async def toggle_server_maintenance_mode(
231+
self, server_id: str, mode: str, timeout: Optional[int] = None
232+
) -> None:
233+
"""Enable or disable the maintenance mode for the given server.
234+
235+
Args:
236+
server_id (str): Server ID.
237+
mode (str): Maintenance mode. Allowed values are "normal" and "maintenance".
238+
timeout (int | None): After how many seconds the maintenance mode shall automatically end.
239+
240+
Raises:
241+
ClusterMaintenanceModeError: If the operation fails.
242+
243+
References:
244+
- `set-the-maintenance-status-of-a-db-server <https://docs.arangodb.com/stable/develop/http-api/cluster/#set-the-maintenance-status-of-a-db-server>`__
245+
""" # noqa: E501
246+
data: Json = {"mode": mode}
247+
if timeout is not None:
248+
data["timeout"] = timeout
249+
250+
request = Request(
251+
method=Method.PUT,
252+
endpoint=f"/_admin/cluster/maintenance/{server_id}",
253+
prefix_needed=False,
254+
data=self.serializer.dumps(data),
255+
)
256+
257+
def response_handler(resp: Response) -> None:
258+
if not resp.is_success:
259+
raise ClusterMaintenanceModeError(resp, request)
260+
261+
await self._executor.execute(request, response_handler)

arangoasync/database.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
from arangoasync.aql import AQL
1313
from arangoasync.backup import Backup
14+
from arangoasync.cluster import Cluster
1415
from arangoasync.collection import Collection, StandardCollection
1516
from arangoasync.connection import Connection
1617
from arangoasync.errno import HTTP_FORBIDDEN, HTTP_NOT_FOUND
@@ -189,6 +190,15 @@ def backup(self) -> Backup:
189190
"""
190191
return Backup(self._executor)
191192

193+
@property
194+
def cluster(self) -> Cluster:
195+
"""Return Cluster API wrapper.
196+
197+
Returns:
198+
arangoasync.cluster.Cluster: Cluster API wrapper.
199+
"""
200+
return Cluster(self._executor)
201+
192202
async def properties(self) -> Result[DatabaseProperties]:
193203
"""Return database properties.
194204

arangoasync/exceptions.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,30 @@ class ClientConnectionError(ArangoClientError):
271271
"""The request was unable to reach the server."""
272272

273273

274+
class ClusterEndpointsError(ArangoServerError):
275+
"""Failed to retrieve coordinator endpoints."""
276+
277+
278+
class ClusterHealthError(ArangoServerError):
279+
"""Failed to retrieve cluster health."""
280+
281+
282+
class ClusterMaintenanceModeError(ArangoServerError):
283+
"""Failed to enable/disable cluster supervision maintenance mode."""
284+
285+
286+
class ClusterServerRoleError(ArangoServerError):
287+
"""Failed to retrieve server role in a cluster."""
288+
289+
290+
class ClusterServerIDError(ArangoServerError):
291+
"""Failed to retrieve server ID."""
292+
293+
294+
class ClusterStatisticsError(ArangoServerError):
295+
"""Failed to retrieve DB-Server statistics."""
296+
297+
274298
class CursorCloseError(ArangoServerError):
275299
"""Failed to delete the cursor result from server."""
276300

tests/test_cluster.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import pytest
2+
from packaging import version
3+
4+
from arangoasync.client import ArangoClient
5+
from arangoasync.exceptions import (
6+
ClusterEndpointsError,
7+
ClusterHealthError,
8+
ClusterMaintenanceModeError,
9+
ClusterServerIDError,
10+
ClusterServerRoleError,
11+
ClusterStatisticsError,
12+
)
13+
14+
15+
@pytest.mark.asyncio
16+
async def test_cluster(
17+
url, sys_db_name, bad_db, token, enterprise, cluster, db_version
18+
):
19+
if not cluster:
20+
pytest.skip("Cluster API is only tested in cluster setups")
21+
if not enterprise or db_version < version.parse("3.12.0"):
22+
pytest.skip(
23+
"For simplicity, the cluster API is only tested in the latest versions"
24+
)
25+
26+
# Test errors
27+
with pytest.raises(ClusterHealthError):
28+
await bad_db.cluster.health()
29+
with pytest.raises(ClusterStatisticsError):
30+
await bad_db.cluster.statistics("foo")
31+
with pytest.raises(ClusterEndpointsError):
32+
await bad_db.cluster.endpoints()
33+
with pytest.raises(ClusterServerIDError):
34+
await bad_db.cluster.server_id()
35+
with pytest.raises(ClusterServerRoleError):
36+
await bad_db.cluster.server_role()
37+
with pytest.raises(ClusterMaintenanceModeError):
38+
await bad_db.cluster.toggle_maintenance_mode("on")
39+
with pytest.raises(ClusterMaintenanceModeError):
40+
await bad_db.cluster.toggle_server_maintenance_mode("PRMR0001", "normal")
41+
with pytest.raises(ClusterMaintenanceModeError):
42+
await bad_db.cluster.server_maintenance_mode("PRMR0001")
43+
44+
async with ArangoClient(hosts=url) as client:
45+
db = await client.db(
46+
sys_db_name, auth_method="superuser", token=token, verify=True
47+
)
48+
cluster = db.cluster
49+
50+
# Cluster health
51+
health = await cluster.health()
52+
assert "Health" in health
53+
54+
# DB-Server statistics
55+
db_server = None
56+
for server in health["Health"]:
57+
if server.startswith("PRMR"):
58+
db_server = server
59+
break
60+
assert db_server is not None, f"No DB server found in {health}"
61+
stats = await cluster.statistics(db_server)
62+
assert "enabled" in stats
63+
64+
# Cluster endpoints
65+
endpoints = await cluster.endpoints()
66+
assert len(endpoints) > 0
67+
68+
# Cluster server ID and role
69+
server_id = await cluster.server_id()
70+
assert isinstance(server_id, str)
71+
server_role = await cluster.server_role()
72+
assert isinstance(server_role, str)
73+
74+
# Maintenance mode
75+
await cluster.toggle_maintenance_mode("on")
76+
await cluster.toggle_maintenance_mode("off")
77+
await cluster.toggle_server_maintenance_mode(
78+
db_server, "maintenance", timeout=30
79+
)
80+
status = await cluster.server_maintenance_mode(db_server)
81+
assert isinstance(status, dict)
82+
await cluster.toggle_server_maintenance_mode(db_server, "normal")

0 commit comments

Comments
 (0)