1616
1717from __future__ import annotations
1818
19+ import asyncio
20+ from collections .abc import Generator
1921from typing import TYPE_CHECKING
2022
23+ import pytest
24+
2125import pydgraph
22- from pydgraph import run_transaction_async
26+ from pydgraph import AsyncDgraphClient , AsyncDgraphClientStub , run_transaction_async
2327from pydgraph .proto import api_pb2 as api
2428
25- from .helpers import generate_movie
29+ from .helpers import TEST_SERVER_ADDR , generate_movie
2630
2731if TYPE_CHECKING :
2832 from pytest_benchmark .fixture import BenchmarkFixture
2933
3034
35+ # =============================================================================
36+ # Fixtures
37+ # =============================================================================
38+
39+
40+ @pytest .fixture
41+ def event_loop () -> Generator [asyncio .AbstractEventLoop , None , None ]:
42+ """Dedicated event loop for async benchmark tests."""
43+ loop = asyncio .new_event_loop ()
44+ asyncio .set_event_loop (loop )
45+ yield loop
46+ loop .close ()
47+
48+
49+ @pytest .fixture
50+ def benchmark_client (
51+ event_loop : asyncio .AbstractEventLoop ,
52+ movies_schema_content : str ,
53+ ) -> Generator [AsyncDgraphClient , None , None ]:
54+ """Async client with schema for benchmark tests."""
55+ loop = event_loop
56+
57+ async def setup () -> AsyncDgraphClient :
58+ client_stub = AsyncDgraphClientStub (TEST_SERVER_ADDR )
59+ client = AsyncDgraphClient (client_stub )
60+ for _ in range (30 ):
61+ try :
62+ await client .login ("groot" , "password" )
63+ break
64+ except Exception as e :
65+ if "user not found" in str (e ):
66+ raise
67+ await asyncio .sleep (0.1 )
68+ await client .alter (pydgraph .Operation (drop_all = True ))
69+ await client .alter (pydgraph .Operation (schema = movies_schema_content ))
70+ return client
71+
72+ client = loop .run_until_complete (setup ())
73+ yield client
74+ loop .run_until_complete (client .close ())
75+
76+
3177# =============================================================================
3278# Query Benchmarks
3379# =============================================================================
@@ -38,11 +84,13 @@ class TestAsyncQueryBenchmarks:
3884
3985 def test_benchmark_query_async (
4086 self ,
41- stress_test_async_client_for_benchmark ,
87+ event_loop : asyncio .AbstractEventLoop ,
88+ benchmark_client : AsyncDgraphClient ,
4289 benchmark : BenchmarkFixture ,
4390 ) -> None :
4491 """Benchmark a simple async read query."""
45- client , loop = stress_test_async_client_for_benchmark
92+ loop = event_loop
93+ client = benchmark_client
4694
4795 # Setup: seed data outside benchmark
4896 async def setup () -> None :
@@ -70,11 +118,13 @@ def benchmark_wrapper() -> api.Response:
70118
71119 def test_benchmark_query_with_vars_async (
72120 self ,
73- stress_test_async_client_for_benchmark ,
121+ event_loop : asyncio .AbstractEventLoop ,
122+ benchmark_client : AsyncDgraphClient ,
74123 benchmark : BenchmarkFixture ,
75124 ) -> None :
76125 """Benchmark an async query with variables."""
77- client , loop = stress_test_async_client_for_benchmark
126+ loop = event_loop
127+ client = benchmark_client
78128
79129 # Setup
80130 async def setup () -> None :
@@ -104,11 +154,13 @@ def benchmark_wrapper() -> api.Response:
104154
105155 def test_benchmark_query_best_effort_async (
106156 self ,
107- stress_test_async_client_for_benchmark ,
157+ event_loop : asyncio .AbstractEventLoop ,
158+ benchmark_client : AsyncDgraphClient ,
108159 benchmark : BenchmarkFixture ,
109160 ) -> None :
110161 """Benchmark a best-effort async read query."""
111- client , loop = stress_test_async_client_for_benchmark
162+ loop = event_loop
163+ client = benchmark_client
112164
113165 # Setup
114166 async def setup () -> None :
@@ -139,11 +191,13 @@ class TestAsyncMutationBenchmarks:
139191
140192 def test_benchmark_mutation_commit_now_async (
141193 self ,
142- stress_test_async_client_for_benchmark ,
194+ event_loop : asyncio .AbstractEventLoop ,
195+ benchmark_client : AsyncDgraphClient ,
143196 benchmark : BenchmarkFixture ,
144197 ) -> None :
145198 """Benchmark async mutation with commit_now."""
146- client , loop = stress_test_async_client_for_benchmark
199+ loop = event_loop
200+ client = benchmark_client
147201 counter = [0 ]
148202
149203 async def run_mutation () -> api .Response :
@@ -158,11 +212,13 @@ def benchmark_wrapper() -> api.Response:
158212
159213 def test_benchmark_mutation_explicit_commit_async (
160214 self ,
161- stress_test_async_client_for_benchmark ,
215+ event_loop : asyncio .AbstractEventLoop ,
216+ benchmark_client : AsyncDgraphClient ,
162217 benchmark : BenchmarkFixture ,
163218 ) -> None :
164219 """Benchmark async mutation with explicit commit."""
165- client , loop = stress_test_async_client_for_benchmark
220+ loop = event_loop
221+ client = benchmark_client
166222 counter = [0 ]
167223
168224 async def run_mutation () -> api .TxnContext | None :
@@ -178,11 +234,13 @@ def benchmark_wrapper() -> api.TxnContext | None:
178234
179235 def test_benchmark_discard_async (
180236 self ,
181- stress_test_async_client_for_benchmark ,
237+ event_loop : asyncio .AbstractEventLoop ,
238+ benchmark_client : AsyncDgraphClient ,
182239 benchmark : BenchmarkFixture ,
183240 ) -> None :
184241 """Benchmark async mutation followed by discard."""
185- client , loop = stress_test_async_client_for_benchmark
242+ loop = event_loop
243+ client = benchmark_client
186244 counter = [0 ]
187245
188246 async def run_mutation () -> None :
@@ -198,11 +256,13 @@ def benchmark_wrapper() -> None:
198256
199257 def test_benchmark_mutation_nquads_async (
200258 self ,
201- stress_test_async_client_for_benchmark ,
259+ event_loop : asyncio .AbstractEventLoop ,
260+ benchmark_client : AsyncDgraphClient ,
202261 benchmark : BenchmarkFixture ,
203262 ) -> None :
204263 """Benchmark async N-Quads mutation."""
205- client , loop = stress_test_async_client_for_benchmark
264+ loop = event_loop
265+ client = benchmark_client
206266 counter = [0 ]
207267
208268 async def run_mutation () -> api .Response :
@@ -222,11 +282,13 @@ def benchmark_wrapper() -> api.Response:
222282
223283 def test_benchmark_delete_async (
224284 self ,
225- stress_test_async_client_for_benchmark ,
285+ event_loop : asyncio .AbstractEventLoop ,
286+ benchmark_client : AsyncDgraphClient ,
226287 benchmark : BenchmarkFixture ,
227288 ) -> None :
228289 """Benchmark async delete mutation."""
229- client , loop = stress_test_async_client_for_benchmark
290+ loop = event_loop
291+ client = benchmark_client
230292
231293 # Pre-create nodes to delete
232294 async def setup () -> list [str ]:
@@ -262,11 +324,13 @@ class TestAsyncTransactionBenchmarks:
262324
263325 def test_benchmark_upsert_async (
264326 self ,
265- stress_test_async_client_for_benchmark ,
327+ event_loop : asyncio .AbstractEventLoop ,
328+ benchmark_client : AsyncDgraphClient ,
266329 benchmark : BenchmarkFixture ,
267330 ) -> None :
268331 """Benchmark async upsert operation."""
269- client , loop = stress_test_async_client_for_benchmark
332+ loop = event_loop
333+ client = benchmark_client
270334 counter = [0 ]
271335
272336 async def run_upsert () -> api .Response :
@@ -295,11 +359,13 @@ def benchmark_wrapper() -> api.Response:
295359
296360 def test_benchmark_batch_mutations_async (
297361 self ,
298- stress_test_async_client_for_benchmark ,
362+ event_loop : asyncio .AbstractEventLoop ,
363+ benchmark_client : AsyncDgraphClient ,
299364 benchmark : BenchmarkFixture ,
300365 ) -> None :
301366 """Benchmark multiple async mutations in one transaction."""
302- client , loop = stress_test_async_client_for_benchmark
367+ loop = event_loop
368+ client = benchmark_client
303369 counter = [0 ]
304370 batch_size = 10
305371
@@ -317,11 +383,13 @@ def benchmark_wrapper() -> api.TxnContext | None:
317383
318384 def test_benchmark_run_transaction_async (
319385 self ,
320- stress_test_async_client_for_benchmark ,
386+ event_loop : asyncio .AbstractEventLoop ,
387+ benchmark_client : AsyncDgraphClient ,
321388 benchmark : BenchmarkFixture ,
322389 ) -> None :
323390 """Benchmark run_transaction_async helper overhead."""
324- client , loop = stress_test_async_client_for_benchmark
391+ loop = event_loop
392+ client = benchmark_client
325393 counter = [0 ]
326394
327395 async def txn_func (txn : pydgraph .AsyncTxn ) -> str :
@@ -350,11 +418,13 @@ class TestAsyncClientBenchmarks:
350418
351419 def test_benchmark_check_version_async (
352420 self ,
353- stress_test_async_client_for_benchmark ,
421+ event_loop : asyncio .AbstractEventLoop ,
422+ benchmark_client : AsyncDgraphClient ,
354423 benchmark : BenchmarkFixture ,
355424 ) -> None :
356425 """Benchmark async check_version."""
357- client , loop = stress_test_async_client_for_benchmark
426+ loop = event_loop
427+ client = benchmark_client
358428
359429 async def run_check () -> str :
360430 return await client .check_version ()
@@ -366,11 +436,13 @@ def benchmark_wrapper() -> str:
366436
367437 def test_benchmark_alter_schema_async (
368438 self ,
369- stress_test_async_client_for_benchmark ,
439+ event_loop : asyncio .AbstractEventLoop ,
440+ benchmark_client : AsyncDgraphClient ,
370441 benchmark : BenchmarkFixture ,
371442 ) -> None :
372443 """Benchmark async schema alter operation."""
373- client , loop = stress_test_async_client_for_benchmark
444+ loop = event_loop
445+ client = benchmark_client
374446 counter = [0 ]
375447
376448 async def run_alter () -> api .Payload :
0 commit comments