99import gzip
1010import logging
1111import os
12+ import re
1213import shutil
1314import tempfile
1415import time
2829 DgraphClientStub ,
2930)
3031
31- from .helpers import SYNTHETIC_SCHEMA , TEST_SERVER_ADDR
32+ from .helpers import TEST_SERVER_ADDR
3233
3334# =============================================================================
3435# Data Fixture Configuration (fetched on demand for stress/benchmark tests)
@@ -127,11 +128,105 @@ def movies_rdf(movies_rdf_gz: Path) -> Generator[Path, None, None]:
127128 """
128129 with tempfile .TemporaryDirectory () as tempdir :
129130 output_path = Path (tempdir ) / "1million.rdf"
131+ logger .info ("Decompressing %s to %s" , movies_rdf_gz , output_path )
130132 with gzip .open (movies_rdf_gz , "rb" ) as f_in , open (output_path , "wb" ) as f_out :
131133 shutil .copyfileobj (f_in , f_out )
134+ logger .info (
135+ "Decompressed RDF file: %.1f MB" , output_path .stat ().st_size / 1024 / 1024
136+ )
132137 yield output_path
133138
134139
140+ @pytest .fixture (scope = "session" )
141+ def session_sync_client () -> Generator [DgraphClient , None , None ]:
142+ """Session-scoped sync client with login."""
143+ client_stub = DgraphClientStub (TEST_SERVER_ADDR )
144+ client = DgraphClient (client_stub )
145+
146+ for _ in range (30 ):
147+ try :
148+ client .login ("groot" , "password" )
149+ break
150+ except Exception as e :
151+ if "user not found" in str (e ):
152+ raise
153+ time .sleep (0.1 )
154+
155+ yield client
156+ client .close ()
157+
158+
159+ @pytest .fixture (scope = "session" )
160+ def movies_data_loaded (
161+ request : pytest .FixtureRequest ,
162+ stress_config : dict [str , Any ],
163+ ) -> bool :
164+ """Load the 1million movie dataset into Dgraph if load_movies is True.
165+
166+ Uses lazy fixture evaluation - only requests movies_rdf and client fixtures
167+ when load_movies is True, avoiding unnecessary downloads in quick mode.
168+
169+ Returns True if data was loaded, False otherwise.
170+ """
171+ if not stress_config ["load_movies" ]:
172+ logger .info ("Skipping movie data loading (load_movies=False)" )
173+ return False
174+
175+ # Lazy evaluation: only instantiate session-scoped fixtures when actually needed
176+ client : DgraphClient = request .getfixturevalue ("session_sync_client" )
177+ movies_rdf_path : Path = request .getfixturevalue ("movies_rdf" )
178+ schema_content : str = request .getfixturevalue ("movies_schema_content" )
179+
180+ # Apply schema before loading data
181+ client .alter (pydgraph .Operation (drop_all = True ))
182+ client .alter (pydgraph .Operation (schema = schema_content ))
183+
184+ # Pattern to convert explicit UIDs and UUIDs to blank nodes
185+ # Matches: <12345> (numeric UIDs) and <24d9530f-553a-43fc-8eb6-14ac667b2387> (UUIDs)
186+ # These formats can't be directly used as Dgraph UIDs
187+ uid_pattern = re .compile (
188+ r"<(\d+|[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})>" ,
189+ re .IGNORECASE ,
190+ )
191+
192+ def convert_uids_to_blank_nodes (line : str ) -> str :
193+ """Convert <12345> or <uuid> to _:identifier so Dgraph assigns new UIDs."""
194+ return uid_pattern .sub (r"_:\1" , line )
195+
196+ # Load RDF data in batches
197+ batch_size = 10000
198+ batch : list [str ] = []
199+ total_loaded = 0
200+
201+ logger .info ("Loading RDF data from %s" , movies_rdf_path )
202+ with open (movies_rdf_path , encoding = "utf-8" ) as f :
203+ for line in f :
204+ line = line .strip ()
205+ if line and not line .startswith ("#" ):
206+ # Convert UIDs to blank nodes
207+ line = convert_uids_to_blank_nodes (line )
208+ batch .append (line )
209+
210+ if len (batch ) >= batch_size :
211+ nquads = "\n " .join (batch )
212+ txn = client .txn ()
213+ txn .mutate (set_nquads = nquads , commit_now = True )
214+ total_loaded += len (batch )
215+ if total_loaded % 100000 == 0 :
216+ logger .info ("Loaded %d RDF triples" , total_loaded )
217+ batch = []
218+
219+ # Load remaining batch
220+ if batch :
221+ nquads = "\n " .join (batch )
222+ txn = client .txn ()
223+ txn .mutate (set_nquads = nquads , commit_now = True )
224+ total_loaded += len (batch )
225+
226+ logger .info ("Finished loading %d RDF triples" , total_loaded )
227+ return True
228+
229+
135230# =============================================================================
136231# Executor Fixture (for sync stress tests)
137232# =============================================================================
@@ -178,10 +273,18 @@ def sync_client_clean(sync_client: DgraphClient) -> DgraphClient:
178273 return sync_client
179274
180275
276+ @pytest .fixture (scope = "session" )
277+ def movies_schema_content (movies_schema : Path ) -> str :
278+ """Return the movies schema content as a string."""
279+ return movies_schema .read_text ()
280+
281+
181282@pytest .fixture
182- def sync_client_with_schema (sync_client_clean : DgraphClient ) -> DgraphClient :
183- """Sync client with synthetic test schema."""
184- sync_client_clean .alter (pydgraph .Operation (schema = SYNTHETIC_SCHEMA ))
283+ def sync_client_with_movies_schema (
284+ sync_client_clean : DgraphClient , movies_schema_content : str
285+ ) -> DgraphClient :
286+ """Sync client with movies test schema."""
287+ sync_client_clean .alter (pydgraph .Operation (schema = movies_schema_content ))
185288 return sync_client_clean
186289
187290
@@ -217,11 +320,12 @@ async def async_client_clean(async_client: AsyncDgraphClient) -> AsyncDgraphClie
217320
218321
219322@pytest .fixture
220- async def async_client_with_schema (
323+ async def async_client_with_movies_schema (
221324 async_client_clean : AsyncDgraphClient ,
325+ movies_schema_content : str ,
222326) -> AsyncDgraphClient :
223- """Async client with synthetic test schema."""
224- await async_client_clean .alter (pydgraph .Operation (schema = SYNTHETIC_SCHEMA ))
327+ """Async client with movies test schema."""
328+ await async_client_clean .alter (pydgraph .Operation (schema = movies_schema_content ))
225329 return async_client_clean
226330
227331
@@ -234,9 +338,9 @@ async def async_client_with_schema(
234338
235339
236340@pytest .fixture
237- def async_client_with_schema_for_benchmark () -> Generator [
238- tuple [ AsyncDgraphClient , asyncio . AbstractEventLoop ], None , None
239- ]:
341+ def async_client_with_movies_schema_for_benchmark (
342+ movies_schema_content : str ,
343+ ) -> Generator [ tuple [ AsyncDgraphClient , asyncio . AbstractEventLoop ], None , None ]:
240344 """Async client with schema and its event loop for benchmarking.
241345
242346 Returns a tuple of (client, loop) so tests can run async operations
@@ -245,6 +349,8 @@ def async_client_with_schema_for_benchmark() -> Generator[
245349 loop = asyncio .new_event_loop ()
246350 asyncio .set_event_loop (loop )
247351
352+ schema_content = movies_schema_content # Capture for closure
353+
248354 async def setup () -> AsyncDgraphClient :
249355 client_stub = AsyncDgraphClientStub (TEST_SERVER_ADDR )
250356 client = AsyncDgraphClient (client_stub )
@@ -257,7 +363,7 @@ async def setup() -> AsyncDgraphClient:
257363 raise
258364 await asyncio .sleep (0.1 )
259365 await client .alter (pydgraph .Operation (drop_all = True ))
260- await client .alter (pydgraph .Operation (schema = SYNTHETIC_SCHEMA ))
366+ await client .alter (pydgraph .Operation (schema = schema_content ))
261367 return client
262368
263369 client = loop .run_until_complete (setup ())
0 commit comments