Skip to content

Commit 2d5bfdf

Browse files
committed
first step
1 parent 40ac692 commit 2d5bfdf

File tree

9 files changed

+45
-0
lines changed

9 files changed

+45
-0
lines changed

.gitmodules

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
11
[submodule "ydb-api-protos"]
22
path = ydb-api-protos
33
url = https://github.com/ydb-platform/ydb-api-protos.git
4+
[submodule "ydb/coordination/ydb-protos"]
5+
path = ydb/coordination/ydb-protos
6+
url = https://github.com/ydb-platform/ydb-api-protos.git

ydb/coordination/__init__.py

Whitespace-only changes.

ydb/coordination/client.py

Whitespace-only changes.

ydb/coordination/coordination_lock.py

Whitespace-only changes.

ydb/coordination/exceptions.py

Whitespace-only changes.

ydb/coordination/tests/__init__.py

Whitespace-only changes.

ydb/coordination/tests/test_coordination_minimal.py

Whitespace-only changes.

ydb/coordination/ydb-protos

Submodule ydb-protos added at a0c108c
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
import time
2+
3+
from ydb._grpc.v5.protos.ydb_coordination_pb2 import SessionRequest
4+
from ydb._grpc.v5.ydb_coordination_v1_pb2_grpc import CoordinationServiceStub
5+
6+
7+
class CoordinationSession:
8+
def __init__(self, driver: "ydb.Driver"):
9+
self._driver = driver
10+
11+
def acquire_semaphore(self, name: str, count: int = 1, timeout: int = 5000):
12+
req = SessionRequest(
13+
acquire_semaphore=SessionRequest.AcquireSemaphore(
14+
name=name,
15+
count=count,
16+
timeout_millis=timeout,
17+
req_id=int(time.time() * 1000),
18+
data=b"",
19+
ephemeral=True,
20+
),
21+
session_start=SessionRequest.SessionStart()
22+
)
23+
24+
res_iter = self._driver(req, CoordinationServiceStub, "Session")
25+
res = next(res_iter)
26+
acquire_result = getattr(res, "acquire_semaphore_result", None)
27+
28+
if not acquire_result or not acquire_result.acquired:
29+
raise RuntimeError(f"Failed to acquire semaphore {name}")
30+
31+
return res.session_started.session_id
32+
33+
def release_semaphore(self, name: str, session_id: int):
34+
req = SessionRequest(
35+
release_semaphore=SessionRequest.ReleaseSemaphore(
36+
name=name,
37+
req_id=int(time.time() * 1000),
38+
),
39+
session_stop=SessionRequest.SessionStop()
40+
)
41+
self._driver(req, CoordinationServiceStub, "Session")

0 commit comments

Comments
 (0)