Skip to content

Commit 887773c

Browse files
authored
Coordination service: CRUD operations (#715)
1 parent 9cb71fe commit 887773c

File tree

10 files changed

+378
-0
lines changed

10 files changed

+378
-0
lines changed

tests/coordination/__init__.py

Whitespace-only changes.
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import pytest
2+
3+
import ydb
4+
from ydb import aio
5+
6+
from ydb.coordination import (
7+
NodeConfig,
8+
ConsistencyMode,
9+
RateLimiterCountersMode,
10+
CoordinationClient,
11+
)
12+
13+
14+
class TestCoordination:
15+
def test_coordination_node_lifecycle(self, driver_sync: ydb.Driver):
16+
client = CoordinationClient(driver_sync)
17+
node_path = "/local/test_node_lifecycle"
18+
19+
try:
20+
client.delete_node(node_path)
21+
except ydb.SchemeError:
22+
pass
23+
24+
with pytest.raises(ydb.SchemeError):
25+
client.describe_node(node_path)
26+
27+
initial_config = NodeConfig(
28+
session_grace_period_millis=1000,
29+
attach_consistency_mode=ConsistencyMode.STRICT,
30+
read_consistency_mode=ConsistencyMode.STRICT,
31+
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
32+
self_check_period_millis=0,
33+
)
34+
client.create_node(node_path, initial_config)
35+
36+
node_conf = client.describe_node(node_path)
37+
assert node_conf == initial_config
38+
39+
updated_config = NodeConfig(
40+
session_grace_period_millis=12345,
41+
attach_consistency_mode=ConsistencyMode.STRICT,
42+
read_consistency_mode=ConsistencyMode.RELAXED,
43+
rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED,
44+
self_check_period_millis=10,
45+
)
46+
client.alter_node(node_path, updated_config)
47+
48+
node_conf = client.describe_node(node_path)
49+
assert node_conf == updated_config
50+
51+
client.delete_node(node_path)
52+
53+
with pytest.raises(ydb.SchemeError):
54+
client.describe_node(node_path)
55+
56+
async def test_coordination_node_lifecycle_async(self, aio_connection):
57+
client = aio.CoordinationClient(aio_connection)
58+
node_path = "/local/test_node_lifecycle"
59+
60+
try:
61+
await client.delete_node(node_path)
62+
except ydb.SchemeError:
63+
pass
64+
65+
with pytest.raises(ydb.SchemeError):
66+
await client.describe_node(node_path)
67+
68+
initial_config = NodeConfig(
69+
session_grace_period_millis=1000,
70+
attach_consistency_mode=ConsistencyMode.STRICT,
71+
read_consistency_mode=ConsistencyMode.STRICT,
72+
rate_limiter_counters_mode=RateLimiterCountersMode.UNSET,
73+
self_check_period_millis=0,
74+
)
75+
await client.create_node(node_path, initial_config)
76+
77+
node_conf = await client.describe_node(node_path)
78+
assert node_conf == initial_config
79+
80+
updated_config = NodeConfig(
81+
session_grace_period_millis=12345,
82+
attach_consistency_mode=ConsistencyMode.STRICT,
83+
read_consistency_mode=ConsistencyMode.RELAXED,
84+
rate_limiter_counters_mode=RateLimiterCountersMode.DETAILED,
85+
self_check_period_millis=10,
86+
)
87+
await client.alter_node(node_path, updated_config)
88+
89+
node_conf = await client.describe_node(node_path)
90+
assert node_conf == updated_config
91+
92+
await client.delete_node(node_path)
93+
94+
with pytest.raises(ydb.SchemeError):
95+
await client.describe_node(node_path)

ydb/_apis.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
ydb_operation_v1_pb2_grpc,
1212
ydb_topic_v1_pb2_grpc,
1313
ydb_query_v1_pb2_grpc,
14+
ydb_coordination_v1_pb2_grpc,
1415
)
1516

1617
from ._grpc.v4.protos import (
@@ -22,6 +23,7 @@
2223
ydb_operation_pb2,
2324
ydb_common_pb2,
2425
ydb_query_pb2,
26+
ydb_coordination_pb2,
2527
)
2628

2729
else:
@@ -33,6 +35,7 @@
3335
ydb_operation_v1_pb2_grpc,
3436
ydb_topic_v1_pb2_grpc,
3537
ydb_query_v1_pb2_grpc,
38+
ydb_coordination_v1_pb2_grpc,
3639
)
3740

3841
from ._grpc.common.protos import (
@@ -44,6 +47,7 @@
4447
ydb_operation_pb2,
4548
ydb_common_pb2,
4649
ydb_query_pb2,
50+
ydb_coordination_pb2,
4751
)
4852

4953

@@ -56,6 +60,7 @@
5660
ydb_discovery = ydb_discovery_pb2
5761
ydb_operation = ydb_operation_pb2
5862
ydb_query = ydb_query_pb2
63+
ydb_coordination = ydb_coordination_pb2
5964

6065

6166
class CmsService(object):
@@ -134,3 +139,13 @@ class QueryService(object):
134139
ExecuteQuery = "ExecuteQuery"
135140
ExecuteScript = "ExecuteScript"
136141
FetchScriptResults = "FetchScriptResults"
142+
143+
144+
class CoordinationService(object):
145+
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub
146+
147+
Session = "Session"
148+
CreateNode = "CreateNode"
149+
AlterNode = "AlterNode"
150+
DropNode = "DropNode"
151+
DescribeNode = "DescribeNode"
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
import typing
2+
from dataclasses import dataclass
3+
4+
from .ydb_coordination_public_types import NodeConfig
5+
6+
if typing.TYPE_CHECKING:
7+
from ..v4.protos import ydb_coordination_pb2
8+
else:
9+
from ..common.protos import ydb_coordination_pb2
10+
11+
from .common_utils import IToProto
12+
13+
14+
@dataclass
15+
class CreateNodeRequest(IToProto):
16+
path: str
17+
config: typing.Optional[NodeConfig]
18+
19+
def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
20+
cfg_proto = self.config.to_proto() if self.config else None
21+
return ydb_coordination_pb2.CreateNodeRequest(
22+
path=self.path,
23+
config=cfg_proto,
24+
)
25+
26+
27+
@dataclass
28+
class AlterNodeRequest(IToProto):
29+
path: str
30+
config: NodeConfig
31+
32+
def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
33+
cfg_proto = self.config.to_proto() if self.config else None
34+
return ydb_coordination_pb2.AlterNodeRequest(
35+
path=self.path,
36+
config=cfg_proto,
37+
)
38+
39+
40+
@dataclass
41+
class DescribeNodeRequest(IToProto):
42+
path: str
43+
44+
def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest:
45+
return ydb_coordination_pb2.DescribeNodeRequest(
46+
path=self.path,
47+
)
48+
49+
50+
@dataclass
51+
class DropNodeRequest(IToProto):
52+
path: str
53+
54+
def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
55+
return ydb_coordination_pb2.DropNodeRequest(
56+
path=self.path,
57+
)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
from dataclasses import dataclass
2+
from enum import IntEnum
3+
import typing
4+
5+
6+
if typing.TYPE_CHECKING:
7+
from ..v4.protos import ydb_coordination_pb2
8+
else:
9+
from ..common.protos import ydb_coordination_pb2
10+
11+
12+
class ConsistencyMode(IntEnum):
13+
UNSET = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_UNSET
14+
STRICT = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_STRICT
15+
RELAXED = ydb_coordination_pb2.ConsistencyMode.CONSISTENCY_MODE_RELAXED
16+
17+
18+
class RateLimiterCountersMode(IntEnum):
19+
UNSET = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_UNSET
20+
AGGREGATED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_AGGREGATED
21+
DETAILED = ydb_coordination_pb2.RateLimiterCountersMode.RATE_LIMITER_COUNTERS_MODE_DETAILED
22+
23+
24+
@dataclass
25+
class NodeConfig:
26+
attach_consistency_mode: ConsistencyMode
27+
rate_limiter_counters_mode: RateLimiterCountersMode
28+
read_consistency_mode: ConsistencyMode
29+
self_check_period_millis: int
30+
session_grace_period_millis: int
31+
32+
@staticmethod
33+
def from_proto(msg: ydb_coordination_pb2.Config) -> "NodeConfig":
34+
return NodeConfig(
35+
attach_consistency_mode=msg.attach_consistency_mode,
36+
rate_limiter_counters_mode=msg.rate_limiter_counters_mode,
37+
read_consistency_mode=msg.read_consistency_mode,
38+
self_check_period_millis=msg.self_check_period_millis,
39+
session_grace_period_millis=msg.session_grace_period_millis,
40+
)
41+
42+
def to_proto(self) -> ydb_coordination_pb2.Config:
43+
return ydb_coordination_pb2.Config(
44+
attach_consistency_mode=self.attach_consistency_mode,
45+
rate_limiter_counters_mode=self.rate_limiter_counters_mode,
46+
read_consistency_mode=self.read_consistency_mode,
47+
self_check_period_millis=self.self_check_period_millis,
48+
session_grace_period_millis=self.session_grace_period_millis,
49+
)
50+
51+
52+
class DescribeResult:
53+
@staticmethod
54+
def from_proto(msg: ydb_coordination_pb2.DescribeNodeResponse) -> "NodeConfig":
55+
result = ydb_coordination_pb2.DescribeNodeResult()
56+
msg.operation.result.Unpack(result)
57+
return NodeConfig.from_proto(result.config)

ydb/aio/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
from .driver import Driver # noqa
22
from .table import SessionPool, retry_operation # noqa
33
from .query import QuerySessionPool, QuerySession, QueryTxContext # noqa
4+
from .coordination_client import CoordinationClient # noqa

ydb/aio/coordination_client.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
from typing import Optional
2+
3+
from ydb._grpc.grpcwrapper.ydb_coordination import (
4+
CreateNodeRequest,
5+
DescribeNodeRequest,
6+
AlterNodeRequest,
7+
DropNodeRequest,
8+
)
9+
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig
10+
from ydb.coordination.base_coordination_client import BaseCoordinationClient
11+
12+
13+
class CoordinationClient(BaseCoordinationClient):
14+
async def create_node(self, path: str, config: Optional[NodeConfig] = None, settings=None):
15+
return await self._call_create(
16+
CreateNodeRequest(path=path, config=config).to_proto(),
17+
settings=settings,
18+
)
19+
20+
async def describe_node(self, path: str, settings=None) -> NodeConfig:
21+
return await self._call_describe(
22+
DescribeNodeRequest(path=path).to_proto(),
23+
settings=settings,
24+
)
25+
26+
async def alter_node(self, path: str, new_config: NodeConfig, settings=None):
27+
return await self._call_alter(
28+
AlterNodeRequest(path=path, config=new_config).to_proto(),
29+
settings=settings,
30+
)
31+
32+
async def delete_node(self, path: str, settings=None):
33+
return await self._call_delete(
34+
DropNodeRequest(path=path).to_proto(),
35+
settings=settings,
36+
)
37+
38+
async def lock(self):
39+
raise NotImplementedError("Will be implemented in future release")

ydb/coordination/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from .coordination_client import CoordinationClient
2+
3+
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import (
4+
NodeConfig,
5+
ConsistencyMode,
6+
RateLimiterCountersMode,
7+
DescribeResult,
8+
)
9+
10+
__all__ = ["CoordinationClient", "NodeConfig", "ConsistencyMode", "RateLimiterCountersMode", "DescribeResult"]
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
from ydb import _apis, issues
2+
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig, DescribeResult
3+
import logging
4+
5+
6+
logger = logging.getLogger(__name__)
7+
8+
9+
def wrapper_create_node(rpc_state, response_pb):
10+
issues._process_response(response_pb.operation)
11+
12+
13+
def wrapper_describe_node(rpc_state, response_pb) -> NodeConfig:
14+
issues._process_response(response_pb.operation)
15+
return DescribeResult.from_proto(response_pb)
16+
17+
18+
def wrapper_delete_node(rpc_state, response_pb):
19+
issues._process_response(response_pb.operation)
20+
21+
22+
def wrapper_alter_node(rpc_state, response_pb):
23+
issues._process_response(response_pb.operation)
24+
25+
26+
class BaseCoordinationClient:
27+
def __init__(self, driver):
28+
logger.warning("Experimental API: interface may change in future releases.")
29+
self._driver = driver
30+
31+
def _call_create(self, request, settings=None):
32+
return self._driver(
33+
request,
34+
_apis.CoordinationService.Stub,
35+
_apis.CoordinationService.CreateNode,
36+
wrap_result=wrapper_create_node,
37+
settings=settings,
38+
)
39+
40+
def _call_describe(self, request, settings=None):
41+
return self._driver(
42+
request,
43+
_apis.CoordinationService.Stub,
44+
_apis.CoordinationService.DescribeNode,
45+
wrap_result=wrapper_describe_node,
46+
settings=settings,
47+
)
48+
49+
def _call_alter(self, request, settings=None):
50+
return self._driver(
51+
request,
52+
_apis.CoordinationService.Stub,
53+
_apis.CoordinationService.AlterNode,
54+
wrap_result=wrapper_alter_node,
55+
settings=settings,
56+
)
57+
58+
def _call_delete(self, request, settings=None):
59+
return self._driver(
60+
request,
61+
_apis.CoordinationService.Stub,
62+
_apis.CoordinationService.DropNode,
63+
wrap_result=wrapper_delete_node,
64+
settings=settings,
65+
)

0 commit comments

Comments
 (0)