Skip to content

Commit effde65

Browse files
committed
Adding rebalance operations
1 parent 9d52740 commit effde65

File tree

3 files changed

+214
-1
lines changed

3 files changed

+214
-1
lines changed

arangoasync/cluster.py

Lines changed: 191 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
__all__ = ["Cluster"]
22

3-
from typing import List, Optional
3+
from typing import List, Optional, cast
44

55
from arangoasync.exceptions import (
66
ClusterEndpointsError,
77
ClusterHealthError,
88
ClusterMaintenanceModeError,
9+
ClusterRebalanceError,
910
ClusterServerIDError,
1011
ClusterServerRoleError,
1112
ClusterStatisticsError,
@@ -259,3 +260,192 @@ def response_handler(resp: Response) -> None:
259260
raise ClusterMaintenanceModeError(resp, request)
260261

261262
await self._executor.execute(request, response_handler)
263+
264+
async def calculate_imbalance(self) -> Result[Json]:
265+
"""Computes the current cluster imbalance and returns the result.
266+
267+
Returns:
268+
dict: Cluster imbalance information.
269+
270+
Raises:
271+
ClusterRebalanceError: If retrieval fails.
272+
273+
References:
274+
- `get-the-current-cluster-imbalance <https://docs.arangodb.com/stable/develop/http-api/cluster/#get-the-current-cluster-imbalance>`__
275+
""" # noqa: E501
276+
request = Request(method=Method.GET, endpoint="/_admin/cluster/rebalance")
277+
278+
def response_handler(resp: Response) -> Json:
279+
if not resp.is_success:
280+
raise ClusterRebalanceError(resp, request)
281+
result: Json = self.deserializer.loads(resp.raw_body)
282+
return Response.format_body(result)
283+
284+
return await self._executor.execute(request, response_handler)
285+
286+
async def calculate_rebalance_plan(
287+
self,
288+
databases_excluded: Optional[List[str]] = None,
289+
exclude_system_collections: Optional[bool] = None,
290+
leader_changes: Optional[bool] = None,
291+
maximum_number_of_moves: Optional[int] = None,
292+
move_followers: Optional[bool] = None,
293+
move_leaders: Optional[bool] = None,
294+
pi_factor: Optional[float] = None,
295+
version: int = 1,
296+
) -> Result[Json]:
297+
"""Compute a set of move shard operations to improve balance.
298+
299+
Args:
300+
databases_excluded (list | None): List of database names to be excluded from
301+
the analysis.
302+
exclude_system_collections (bool | None): Ignore system collections in the
303+
rebalance plan.
304+
leader_changes (bool | None): Allow leader changes without moving data.
305+
maximum_number_of_moves (int | None): Maximum number of moves to be computed.
306+
move_followers (bool | None): Allow moving shard followers.
307+
move_leaders (bool | None): Allow moving shard leaders.
308+
pi_factor (float | None): A weighting factor that should remain untouched.
309+
version (int): Must be set to 1.
310+
311+
Returns:
312+
dict: Cluster rebalance plan.
313+
314+
Raises:
315+
ClusterRebalanceError: If retrieval fails.
316+
317+
References:
318+
- `compute-a-set-of-move-shard-operations-to-improve-balance <https://docs.arangodb.com/stable/develop/http-api/cluster/#compute-a-set-of-move-shard-operations-to-improve-balance>`__
319+
""" # noqa: E501
320+
data: Json = dict(version=version)
321+
if databases_excluded is not None:
322+
data["databasesExcluded"] = databases_excluded
323+
if exclude_system_collections is not None:
324+
data["excludeSystemCollections"] = exclude_system_collections
325+
if leader_changes is not None:
326+
data["leaderChanges"] = leader_changes
327+
if maximum_number_of_moves is not None:
328+
data["maximumNumberOfMoves"] = maximum_number_of_moves
329+
if move_followers is not None:
330+
data["moveFollowers"] = move_followers
331+
if move_leaders is not None:
332+
data["moveLeaders"] = move_leaders
333+
if pi_factor is not None:
334+
data["piFactor"] = pi_factor
335+
336+
request = Request(
337+
method=Method.POST,
338+
endpoint="/_admin/cluster/rebalance",
339+
prefix_needed=False,
340+
data=self.serializer.dumps(data),
341+
)
342+
343+
def response_handler(resp: Response) -> Json:
344+
if not resp.is_success:
345+
raise ClusterRebalanceError(resp, request)
346+
result: Json = self.deserializer.loads(resp.raw_body)
347+
return cast(Json, result["result"])
348+
349+
return await self._executor.execute(request, response_handler)
350+
351+
async def rebalance(
352+
self,
353+
databases_excluded: Optional[List[str]] = None,
354+
exclude_system_collections: Optional[bool] = None,
355+
leader_changes: Optional[bool] = None,
356+
maximum_number_of_moves: Optional[int] = None,
357+
move_followers: Optional[bool] = None,
358+
move_leaders: Optional[bool] = None,
359+
pi_factor: Optional[float] = None,
360+
version: int = 1,
361+
) -> Result[Json]:
362+
"""Compute and execute a set of move shard operations to improve balance.
363+
364+
Args:
365+
databases_excluded (list | None): List of database names to be excluded from
366+
the analysis.
367+
exclude_system_collections (bool | None): Ignore system collections in the
368+
rebalance plan.
369+
leader_changes (bool | None): Allow leader changes without moving data.
370+
maximum_number_of_moves (int | None): Maximum number of moves to be computed.
371+
move_followers (bool | None): Allow moving shard followers.
372+
move_leaders (bool | None): Allow moving shard leaders.
373+
pi_factor (float | None): A weighting factor that should remain untouched.
374+
version (int): Must be set to 1.
375+
376+
Returns:
377+
dict: Cluster rebalance plan.
378+
379+
Raises:
380+
ClusterRebalanceError: If retrieval fails.
381+
382+
References:
383+
- `compute-and-execute-a-set-of-move-shard-operations-to-improve-balance <https://docs.arangodb.com/stable/develop/http-api/cluster/#compute-and-execute-a-set-of-move-shard-operations-to-improve-balance>`__
384+
""" # noqa: E501
385+
data: Json = dict(version=version)
386+
if databases_excluded is not None:
387+
data["databasesExcluded"] = databases_excluded
388+
if exclude_system_collections is not None:
389+
data["excludeSystemCollections"] = exclude_system_collections
390+
if leader_changes is not None:
391+
data["leaderChanges"] = leader_changes
392+
if maximum_number_of_moves is not None:
393+
data["maximumNumberOfMoves"] = maximum_number_of_moves
394+
if move_followers is not None:
395+
data["moveFollowers"] = move_followers
396+
if move_leaders is not None:
397+
data["moveLeaders"] = move_leaders
398+
if pi_factor is not None:
399+
data["piFactor"] = pi_factor
400+
401+
request = Request(
402+
method=Method.PUT,
403+
endpoint="/_admin/cluster/rebalance",
404+
prefix_needed=False,
405+
data=self.serializer.dumps(data),
406+
)
407+
408+
def response_handler(resp: Response) -> Json:
409+
if not resp.is_success:
410+
raise ClusterRebalanceError(resp, request)
411+
result: Json = self.deserializer.loads(resp.raw_body)
412+
return cast(Json, result["result"])
413+
414+
return await self._executor.execute(request, response_handler)
415+
416+
async def execute_rebalance_plan(
417+
self,
418+
moves: List[Json],
419+
version: int = 1,
420+
) -> Result[int]:
421+
"""Execute a set of move shard operations.
422+
423+
Args:
424+
moves (list): List of move shard operations to be executed.
425+
version (int): Must be set to 1.
426+
427+
Returns:
428+
int: Indicates whether the methods have been accepted and scheduled for execution.
429+
430+
Raises:
431+
ClusterRebalanceError: If the execution fails.
432+
433+
References:
434+
- `execute-a-set-of-move-shard-operations <https://docs.arangodb.com/stable/develop/http-api/cluster/#execute-a-set-of-move-shard-operations>`__
435+
""" # noqa: E501
436+
data: Json = dict(version=version, moves=moves)
437+
438+
request = Request(
439+
method=Method.POST,
440+
endpoint="/_admin/cluster/rebalance/execute",
441+
data=self.serializer.dumps(data),
442+
prefix_needed=False,
443+
)
444+
445+
def response_handler(resp: Response) -> int:
446+
if not resp.is_success:
447+
raise ClusterRebalanceError(resp, request)
448+
result: int = self.deserializer.loads(resp.raw_body)["code"]
449+
return result
450+
451+
return await self._executor.execute(request, response_handler)

arangoasync/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,10 @@ class ClusterMaintenanceModeError(ArangoServerError):
283283
"""Failed to enable/disable cluster supervision maintenance mode."""
284284

285285

286+
class ClusterRebalanceError(ArangoServerError):
287+
"""Failed to execute cluster rebalancing operation."""
288+
289+
286290
class ClusterServerRoleError(ArangoServerError):
287291
"""Failed to retrieve server role in a cluster."""
288292

tests/test_cluster.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
ClusterEndpointsError,
77
ClusterHealthError,
88
ClusterMaintenanceModeError,
9+
ClusterRebalanceError,
910
ClusterServerIDError,
1011
ClusterServerRoleError,
1112
ClusterStatisticsError,
@@ -40,6 +41,14 @@ async def test_cluster(
4041
await bad_db.cluster.toggle_server_maintenance_mode("PRMR0001", "normal")
4142
with pytest.raises(ClusterMaintenanceModeError):
4243
await bad_db.cluster.server_maintenance_mode("PRMR0001")
44+
with pytest.raises(ClusterRebalanceError):
45+
await bad_db.cluster.calculate_imbalance()
46+
with pytest.raises(ClusterRebalanceError):
47+
await bad_db.cluster.rebalance()
48+
with pytest.raises(ClusterRebalanceError):
49+
await bad_db.cluster.calculate_rebalance_plan()
50+
with pytest.raises(ClusterRebalanceError):
51+
await bad_db.cluster.execute_rebalance_plan(moves=[])
4352

4453
async with ArangoClient(hosts=url) as client:
4554
db = await client.db(
@@ -80,3 +89,13 @@ async def test_cluster(
8089
status = await cluster.server_maintenance_mode(db_server)
8190
assert isinstance(status, dict)
8291
await cluster.toggle_server_maintenance_mode(db_server, "normal")
92+
93+
# Rebalance
94+
result = await cluster.calculate_imbalance()
95+
assert isinstance(result, dict)
96+
result = await cluster.calculate_rebalance_plan()
97+
assert isinstance(result, dict)
98+
result = await cluster.execute_rebalance_plan(moves=[])
99+
assert result == 200
100+
result = await cluster.rebalance()
101+
assert isinstance(result, dict)

0 commit comments

Comments
 (0)