Skip to content
Open
Empty file added tests/coordination/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions tests/coordination/test_coordination_alter_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import ydb
from ydb import _apis


def test_coordination_alter_node(driver_sync: ydb.Driver):
client = driver_sync.coordination_client
node_path = "/local/test_alter_node"

try:
client.delete_node(node_path)
except ydb.SchemeError:
pass

client.create_node(node_path)

new_config = _apis.ydb_coordination.Config(
session_grace_period_millis=12345,
attach_consistency_mode=_apis.ydb_coordination.ConsistencyMode.CONSISTENCY_MODE_STRICT,
read_consistency_mode=_apis.ydb_coordination.ConsistencyMode.CONSISTENCY_MODE_RELAXED,
)


client.alter_node(node_path, new_config)

node_config = client.describe_node(node_path)
assert node_config.session_grace_period_millis == 12345, "Session grace period not updated"
assert node_config.attach_consistency_mode == _apis.ydb_coordination.ConsistencyMode.CONSISTENCY_MODE_STRICT, \
"Attach consistency mode not updated"
assert node_config.read_consistency_mode == _apis.ydb_coordination.ConsistencyMode.CONSISTENCY_MODE_RELAXED, \
"Read consistency mode not updated"

client.delete_node(node_path)
18 changes: 18 additions & 0 deletions tests/coordination/test_coordination_describe_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import ydb

def test_coordination_nodes(driver_sync: ydb.Driver):
client = driver_sync.coordination_client
node_path = "/local/test_node"

try:
client.delete_node(node_path)
except ydb.SchemeError:
pass

client.create_node(node_path)

node_config = client.describe_node(node_path)

assert node_config.path == "/local/test_node"

client.delete_node(node_path)
14 changes: 14 additions & 0 deletions ydb/_apis.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
ydb_query_v1_pb2_grpc,
ydb_coordination_v1_pb2_grpc,
)

from ._grpc.v4.protos import (
Expand All @@ -22,6 +23,7 @@
ydb_operation_pb2,
ydb_common_pb2,
ydb_query_pb2,
ydb_coordination_pb2,
)

else:
Expand All @@ -33,6 +35,7 @@
ydb_operation_v1_pb2_grpc,
ydb_topic_v1_pb2_grpc,
ydb_query_v1_pb2_grpc,
ydb_coordination_v1_pb2_grpc,
)

from ._grpc.common.protos import (
Expand All @@ -44,6 +47,7 @@
ydb_operation_pb2,
ydb_common_pb2,
ydb_query_pb2,
ydb_coordination_pb2,
)


Expand All @@ -56,6 +60,7 @@
ydb_discovery = ydb_discovery_pb2
ydb_operation = ydb_operation_pb2
ydb_query = ydb_query_pb2
ydb_coordination = ydb_coordination_pb2


class CmsService(object):
Expand Down Expand Up @@ -134,3 +139,12 @@ class QueryService(object):
ExecuteQuery = "ExecuteQuery"
ExecuteScript = "ExecuteScript"
FetchScriptResults = "FetchScriptResults"

class CoordinationService(object):
Stub = ydb_coordination_v1_pb2_grpc.CoordinationServiceStub

Session = "Session"
CreateNode = "CreateNode"
AlterNode = "AlterNode"
DropNode = "DropNode"
DescribeNode = "DescribeNode"
70 changes: 70 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_coordination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import typing
from dataclasses import dataclass

import ydb

if typing.TYPE_CHECKING:
from ..v4.protos import ydb_coordination_pb2
else:
from ..common.protos import ydb_coordination_pb2

from .common_utils import IToProto, IFromProto, ServerStatus
from . import ydb_coordination_public_types as public_types


# -------------------- Requests --------------------

@dataclass
class CreateNodeRequest(IToProto):
path: str
config: typing.Optional[public_types.NodeConfig] = None
operation_params: typing.Any = None

def to_proto(self) -> ydb_coordination_pb2.CreateNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
return ydb_coordination_pb2.CreateNodeRequest(
path=self.path,
config=cfg_proto,
operation_params=self.operation_params,
)

@dataclass
class AlterNodeRequest(IToProto):
path: str
config: typing.Optional[public_types.NodeConfig] = None
operation_params: typing.Any = None

def to_proto(self) -> ydb_coordination_pb2.AlterNodeRequest:
cfg_proto = self.config.to_proto() if self.config else None
return ydb_coordination_pb2.AlterNodeRequest(
path=self.path,
config=cfg_proto,
operation_params=self.operation_params,
)



@dataclass
class DescribeNodeRequest(IToProto):
path: str
operation_params: typing.Any = None

def to_proto(self) -> ydb_coordination_pb2.DescribeNodeRequest:
return ydb_coordination_pb2.DescribeNodeRequest(
path=self.path,
operation_params=self.operation_params,
)


@dataclass
class DropNodeRequest(IToProto):
path: str
operation_params: typing.Any = None

def to_proto(self) -> ydb_coordination_pb2.DropNodeRequest:
return ydb_coordination_pb2.DropNodeRequest(
path=self.path,
operation_params=self.operation_params,
)


39 changes: 39 additions & 0 deletions ydb/_grpc/grpcwrapper/ydb_coordination_public_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from dataclasses import dataclass
from enum import Enum
import typing

if typing.TYPE_CHECKING:
from ..v4.protos import ydb_coordination_pb2
else:
from ..common.protos import ydb_coordination_pb2


@dataclass
class NodeConfig:
attach_consistency_mode: ydb_coordination_pb2.ConsistencyMode
path: str
rate_limiter_counters_mode: ydb_coordination_pb2.RateLimiterCountersMode
read_consistency_mode: ydb_coordination_pb2.ConsistencyMode
self_check_period_millis: int
session_grace_period_millis: int

@staticmethod
def from_proto(msg: ydb_coordination_pb2.Config) -> "NodeConfig":
return NodeConfig(
attach_consistency_mode=msg.attach_consistency_mode,
path=msg.path,
rate_limiter_counters_mode=msg.rate_limiter_counters_mode,
read_consistency_mode=msg.read_consistency_mode,
self_check_period_millis=msg.self_check_period_millis,
session_grace_period_millis=msg.session_grace_period_millis,
)

def to_proto(self) -> ydb_coordination_pb2.Config:
return ydb_coordination_pb2.Config(
attach_consistency_mode=self.attach_consistency_mode.value,
path=self.path,
rate_limiter_counters_mode=self.rate_limiter_counters_mode.value,
read_consistency_mode=self.read_consistency_mode.value,
self_check_period_millis=self.self_check_period_millis,
session_grace_period_millis=self.session_grace_period_millis,
)
5 changes: 5 additions & 0 deletions ydb/coordination/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .coordination_client import CoordinationClient

__all__ = [
"CoordinationClient",
]
75 changes: 75 additions & 0 deletions ydb/coordination/coordination_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import typing
from typing import Optional

from ydb import _apis, issues
from ydb._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig

if typing.TYPE_CHECKING:
import ydb


class CoordinationClient:
def __init__(self, driver: "ydb.Driver"):
self._driver = driver

def _call_node(
self,
request,
rpc_method,
settings: Optional["ydb.BaseRequestSettings"] = None,
):
response = self._driver(
request,
_apis.CoordinationService.Stub,
rpc_method,
settings=settings,
)
issues._process_response(response.operation)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

даже эту проверку стоит засунуть во враппер, тогда все что останется сделать во всех методах - return self._driver()

return response

def create_node(
self,
path: str,
config: Optional[_apis.ydb_coordination.Config] = None,
settings: Optional["ydb.BaseRequestSettings"] = None,
):
request = _apis.ydb_coordination.CreateNodeRequest(
path=path,
config=config,
)
self._call_node(request, _apis.CoordinationService.CreateNode, settings)

def describe_node(
self,
path: str,
settings: Optional["_apis.ydb_coordination.Config"] = None,
) -> Optional[NodeConfig]:
request = _apis.ydb_coordination.DescribeNodeRequest(path=path)
response = self._call_node(request, _apis.CoordinationService.DescribeNode, settings)
result = _apis.ydb_coordination.DescribeNodeResult()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

на все это нужен враппер

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

враппер так и не появился

response.operation.result.Unpack(result)
result.config.path = path
return NodeConfig.from_proto(result.config)

def delete_node(
self,
path: str,
settings: Optional["ydb.BaseRequestSettings"] = None,
):
request = _apis.ydb_coordination.DropNodeRequest(path=path)
self._call_node(request, _apis.CoordinationService.DropNode, settings)

def alter_node(
self,
path: str,
new_config: _apis.ydb_coordination.Config,
settings: Optional["ydb.BaseRequestSettings"] = None,
):
request = _apis.ydb_coordination.AlterNodeRequest(
path=path,
config=new_config,
)
self._call_node(request, _apis.CoordinationService.AlterNode, settings)

def close(self):
pass
6 changes: 5 additions & 1 deletion ydb/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from . import tracing
from . import iam
from . import _utilities
from . import coordination



logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -248,7 +250,7 @@ def get_config(


class Driver(pool.ConnectionPool):
__slots__ = ("scheme_client", "table_client")
__slots__ = ("scheme_client", "table_client", "coordination_client")

def __init__(
self,
Expand Down Expand Up @@ -287,10 +289,12 @@ def __init__(
self._credentials = driver_config.credentials

self.scheme_client = scheme.SchemeClient(self)
self.coordination_client = coordination.CoordinationClient(self)
self.table_client = table.TableClient(self, driver_config.table_client_settings)
self.topic_client = topic.TopicClient(self, driver_config.topic_client_settings)

def stop(self, timeout=10):
self.table_client._stop_pool_if_needed(timeout=timeout)
self.topic_client.close()
self.coordination_client.close()
super().stop(timeout=timeout)
Loading