Skip to content

Commit eb78713

Browse files
committed
fix and rewrite logic
1 parent f238d0b commit eb78713

File tree

9 files changed

+261
-341
lines changed

9 files changed

+261
-341
lines changed
Lines changed: 73 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,17 @@
11
import asyncio
22
import threading
3-
import time
43

54
import pytest
65

76
import ydb
87
from ydb.aio.coordination import CoordinationClient as AioCoordinationClient
9-
from ydb import StatusCode, logger
8+
from ydb import StatusCode
109

1110
from ydb.coordination import (
1211
NodeConfig,
1312
ConsistencyMode,
1413
RateLimiterCountersMode,
1514
CoordinationClient,
16-
CreateSemaphoreResult,
17-
DescribeLockResult,
1815
)
1916

2017

@@ -120,165 +117,103 @@ async def test_coordination_node_lifecycle_async(self, async_coordination_node):
120117
await client.describe_node(node_path)
121118

122119
async def test_coordination_lock_describe_full_async(self, async_coordination_node):
123-
client, node_path, config = async_coordination_node
124-
125-
lock = client.lock("test_lock", node_path)
126-
127-
create = await lock.create(init_limit=1, init_data=b"hello")
128-
assert create.status == StatusCode.SUCCESS
129-
130-
desc = await lock.describe()
131-
assert desc.status == StatusCode.SUCCESS
132-
assert desc.name == "test_lock"
133-
assert desc.data == b"hello"
134-
assert desc.count == 0
135-
assert desc.ephemeral is False
136-
assert list(desc.owners) == []
137-
assert list(desc.waiters) == []
138-
139-
upd = await lock.update(new_data=b"world")
140-
assert upd.status == StatusCode.SUCCESS
141-
142-
desc2 = await lock.describe()
143-
assert desc2.status == StatusCode.SUCCESS
144-
assert desc2.name == "test_lock"
145-
assert desc2.data == b"world"
146-
assert desc2.count == 0
147-
assert desc2.ephemeral is False
148-
assert list(desc2.owners) == []
149-
assert list(desc2.waiters) == []
150-
151-
delete = await lock.delete()
152-
assert delete.status == StatusCode.SUCCESS
153-
154-
desc_after = await lock.describe()
155-
assert desc_after.status == StatusCode.NOT_FOUND
156-
157-
def test_coordination_lock_describe_full_sync(self, sync_coordination_node):
158-
client, node_path, config = sync_coordination_node
159-
160-
lock = client.lock("test_lock", node_path)
161-
162-
create = lock.create(init_limit=1, init_data=b"hello")
163-
assert create.status == StatusCode.SUCCESS
164-
165-
desc = lock.describe()
166-
assert desc.status == StatusCode.SUCCESS
167-
assert desc.name == "test_lock"
168-
assert desc.data == b"hello"
169-
assert desc.count == 0
170-
assert desc.ephemeral is False
171-
assert list(desc.owners) == []
172-
assert list(desc.waiters) == []
173-
upd = lock.update(new_data=b"world")
174-
assert upd.status == StatusCode.SUCCESS
175-
176-
desc2 = lock.describe()
177-
assert desc2.status == StatusCode.SUCCESS
178-
assert desc2.name == "test_lock"
179-
assert desc2.data == b"world"
180-
assert desc2.count == 0
181-
assert desc2.ephemeral is False
182-
assert list(desc2.owners) == []
183-
assert list(desc2.waiters) == []
184-
185-
delete = lock.delete()
186-
assert delete.status == StatusCode.SUCCESS
187-
188-
desc_after = lock.describe()
189-
assert desc_after.status == StatusCode.NOT_FOUND
120+
client, node_path, _ = async_coordination_node
190121

191-
async def test_coordination_lock_racing_async(self, async_coordination_node):
192-
client, node_path, initial_config = async_coordination_node
193-
timeout = 5
122+
async with client.node(node_path) as node:
123+
lock = node.lock("test_lock")
194124

195-
lock = client.lock("test_lock", node_path)
196-
await lock.create(init_limit=1, init_data=b"init-data")
125+
desc = await lock.describe()
126+
assert desc.status == StatusCode.NOT_FOUND
197127

198-
describe_resp: DescribeLockResult = await lock.describe()
199-
assert describe_resp.status == StatusCode.SUCCESS
128+
async with lock:
129+
pass
200130

201-
lock2_started = asyncio.Event()
202-
lock2_acquired = asyncio.Event()
203-
lock2_release = asyncio.Event()
131+
desc = await lock.describe()
132+
assert desc.data == b""
204133

205-
async def second_lock_task():
206-
lock2_started.set()
207-
async with client.lock("test_lock", node_path):
208-
lock2_acquired.set()
209-
await lock2_release.wait()
134+
await lock.update(new_data=b"world")
210135

211-
async with client.lock("test_lock", node_path) as lock1:
212-
resp: DescribeLockResult = await lock1.describe()
213-
assert resp.status == StatusCode.SUCCESS
136+
desc2 = await lock.describe()
137+
assert desc2.data == b"world"
214138

215-
t2 = asyncio.create_task(second_lock_task())
216-
await asyncio.wait_for(lock2_started.wait(), timeout=timeout)
139+
def test_coordination_lock_describe_full(self, sync_coordination_node):
140+
client, node_path, _ = sync_coordination_node
217141

218-
await asyncio.sleep(0)
142+
with client.node(node_path) as node:
143+
lock = node.lock("test_lock")
219144

220-
await asyncio.wait_for(lock2_acquired.wait(), timeout=timeout)
145+
desc = lock.describe()
146+
assert desc.status == StatusCode.NOT_FOUND
221147

222-
lock2_release.set()
223-
await asyncio.wait_for(t2, timeout=timeout)
148+
with lock:
149+
pass
224150

225-
delete_resp = await lock.delete()
226-
assert delete_resp.status == StatusCode.SUCCESS
151+
desc = lock.describe()
152+
assert desc.data == b""
227153

228-
describe_after_delete: DescribeLockResult = await lock.describe()
229-
assert describe_after_delete.status == StatusCode.NOT_FOUND
154+
lock.update(new_data=b"world")
230155

231-
def test_coordination_lock_racing_sync(self, sync_coordination_node):
232-
client, node_path, initial_config = sync_coordination_node
233-
small_timeout = 1
156+
desc2 = lock.describe()
157+
assert desc2.data == b"world"
158+
159+
async def test_coordination_lock_racing_async(self, async_coordination_node):
160+
client, node_path, _ = async_coordination_node
161+
timeout = 5
234162

235-
lock = client.lock("test_lock", node_path)
163+
async with client.node(node_path) as node:
164+
lock2_started = asyncio.Event()
165+
lock2_acquired = asyncio.Event()
166+
lock2_release = asyncio.Event()
236167

237-
create_resp: CreateSemaphoreResult = lock.create(init_limit=1, init_data=b"init-data")
238-
assert create_resp.status == StatusCode.SUCCESS
168+
async def second_lock_task():
169+
lock2_started.set()
170+
async with node.lock("test_lock"):
171+
lock2_acquired.set()
172+
await lock2_release.wait()
239173

240-
describe_resp: DescribeLockResult = lock.describe()
241-
assert describe_resp.status == StatusCode.SUCCESS
174+
async with node.lock("test_lock"):
175+
t2 = asyncio.create_task(second_lock_task())
176+
await asyncio.wait_for(lock2_started.wait(), timeout=timeout)
242177

243-
lock2_ready = threading.Event()
244-
lock2_acquired = threading.Event()
178+
await asyncio.wait_for(lock2_acquired.wait(), timeout=timeout)
179+
lock2_release.set()
180+
await asyncio.wait_for(t2, timeout=timeout)
245181

246-
def second_lock_task():
247-
try:
248-
lock2_ready.set()
249-
with client.lock("test_lock", node_path):
250-
lock2_acquired.set()
251-
logger.info("Second thread acquired lock")
252-
except Exception as e:
253-
logger.exception(f"{e} | second_lock_task failed")
182+
def test_coordination_lock_racing(self, sync_coordination_node):
183+
client, node_path, _ = sync_coordination_node
184+
timeout = 5
185+
186+
with client.node(node_path) as node:
187+
lock2_started = threading.Event()
188+
lock2_acquired = threading.Event()
189+
lock2_release = threading.Event()
254190

255-
t2 = threading.Thread(target=second_lock_task)
191+
def second_lock_task():
192+
lock2_started.set()
193+
with node.lock("test_lock"):
194+
lock2_acquired.set()
195+
lock2_release.wait(timeout)
256196

257-
with client.lock("test_lock", node_path) as lock1:
258-
resp = lock1.describe()
259-
assert resp.status == StatusCode.SUCCESS
260-
t2.start()
261-
lock2_ready.wait(timeout=small_timeout)
197+
with node.lock("test_lock"):
198+
t2 = threading.Thread(target=second_lock_task)
199+
t2.start()
262200

263-
lock2_acquired.wait(timeout=small_timeout)
264-
t2.join(timeout=small_timeout)
201+
assert lock2_started.wait(timeout)
265202

266-
delete_resp = lock.delete()
267-
assert delete_resp.status == StatusCode.SUCCESS
268-
time.sleep(small_timeout)
269-
describe_after_delete: DescribeLockResult = lock.describe()
270-
assert describe_after_delete.status == StatusCode.NOT_FOUND
203+
assert lock2_acquired.wait(timeout)
204+
lock2_release.set()
205+
t2.join(timeout)
271206

272207
async def test_coordination_reconnect_async(self, async_coordination_node):
273-
client, node_path, config = async_coordination_node
208+
client, node_path, _ = async_coordination_node
274209

275-
lock = client.lock("test_lock", node_path)
210+
async with client.node(node_path) as node:
211+
lock = node.lock("test_lock")
276212

277-
res = await lock.create(init_limit=1, init_data=b"")
278-
assert res.status == StatusCode.SUCCESS
213+
async with lock:
214+
pass
279215

280-
await lock._reconnector._stream.close()
216+
await node._reconnector.stop()
281217

282-
desc = await lock.describe()
283-
assert desc.status == StatusCode.SUCCESS
284-
assert desc.name == "test_lock"
218+
async with lock:
219+
pass

ydb/aio/coordination/client.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,7 @@
88
)
99
from ..._grpc.grpcwrapper.ydb_coordination_public_types import NodeConfig
1010
from ...coordination.base import BaseCoordinationClient
11-
12-
from .lock import CoordinationLock
11+
from .node import CoordinationNode
1312

1413

1514
class CoordinationClient(BaseCoordinationClient):
@@ -41,5 +40,5 @@ async def delete_node(self, path: str, settings=None):
4140
settings=settings,
4241
)
4342

44-
def lock(self, lock_name: str, node_path: str):
45-
return CoordinationLock(self, lock_name, node_path=node_path)
43+
def node(self, path: str) -> CoordinationNode:
44+
return CoordinationNode(self._driver, path)

0 commit comments

Comments
 (0)