diff --git a/.github/workflows/ci-pydgraph-benchmarks.yml b/.github/workflows/ci-pydgraph-benchmarks.yml new file mode 100644 index 0000000..2e0f603 --- /dev/null +++ b/.github/workflows/ci-pydgraph-benchmarks.yml @@ -0,0 +1,34 @@ +name: ci-pydgraph-benchmarks +on: + push: + branches: + - main + tags: + - v[0-9]+.[0-9]+.[0-9]+* + +permissions: + contents: read + +jobs: + benchmarks: + name: Release Benchmarks + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v5 + - name: Setup python runtime and tooling + uses: ./.github/actions/setup-python-and-tooling + with: + python-version: "3.13" + - name: Setup project dependencies + run: INSTALL_MISSING_TOOLS=true make setup + - name: Sync python virtualenv + run: make sync + - name: Run benchmarks + run: make benchmark + - name: Upload benchmark results + uses: actions/upload-artifact@v4 + with: + name: benchmark-results-${{ github.ref_name }} + path: | + benchmark-results.json + benchmark-histogram.svg diff --git a/.gitignore b/.gitignore index 0a96db7..aaa47b7 100755 --- a/.gitignore +++ b/.gitignore @@ -41,3 +41,16 @@ venv pyvenv.cfg .DS_Store examples/notebook/RAG/.env +.osgrep + +# Git worktrees +.worktrees/ + +# Benchmark outputs +benchmark-results.json +benchmark-histogram.svg +stress-benchmark-results.json + +# Downloaded test data (fetched on demand) +tests/resources/1million.rdf.gz +tests/resources/1million.schema diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b8a0ad9..19a7f83 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -65,6 +65,7 @@ repos: pass_filenames: false additional_dependencies: - pytest>=8.3.3 + - pytest-benchmark>=4.0.0 - grpcio>=1.65.1 - protobuf>=4.23.0 - repo: https://github.com/pre-commit/mirrors-mypy diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 02a5c11..052f89e 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -220,15 +220,48 @@ make test Run specific tests: ```sh -bash scripts/local-test.sh -v tests/test_connect.py::TestOpen +make test PYTEST_ARGS="-v tests/test_connect.py::TestOpen" ``` Run a single test: ```sh -bash scripts/local-test.sh -v tests/test_connect.py::TestOpen::test_connection_with_auth +make test PYTEST_ARGS="-v tests/test_connect.py::TestOpen::test_connection_with_auth" ``` +### Stress Tests + +The project includes comprehensive stress tests that verify concurrent operations, transaction +conflicts, deadlock prevention, and retry mechanisms for both sync and async clients. + +**Quick mode** (default, ~12 seconds) - 20 workers, 50 ops, 10 iterations: + +```sh +make test PYTEST_ARGS="tests/test_stress_sync.py tests/test_stress_async.py -v" +``` + +**Moderate mode** (10x quick, includes movie dataset, ~60+ seconds) - 200 workers, 500 ops, 100 +iterations: + +```sh +make test STRESS_TEST_MODE=moderate PYTEST_ARGS="tests/test_stress_sync.py tests/test_stress_async.py -v" +``` + +**Full mode** (10x moderate, maximum stress, ~10+ minutes) - 2000 workers, 5000 ops, 1000 +iterations: + +```sh +make test STRESS_TEST_MODE=full PYTEST_ARGS="tests/test_stress_sync.py tests/test_stress_async.py -v" +``` + +The stress tests cover: + +- **Sync tests**: Run with `ThreadPoolExecutor` to test concurrent operations +- **Async tests**: Use pure `asyncio.gather()` concurrency (no `concurrent.futures` mixing) +- **Retry utilities**: Tests for `retry_async()`, `with_retry_async()`, and + `run_transaction_async()` +- **Deadlock regression**: Validates the asyncio.Lock deadlock fix from PR #293 + ### Test Infrastructure The test script requires Docker and Docker Compose to be installed on your machine. diff --git a/Makefile b/Makefile index dbef802..17ff5f5 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,25 @@ SHELL := /bin/bash export PATH := $(HOME)/.local/bin:$(HOME)/.cargo/bin:$(PATH) +# Export test configuration variables so they're available to child processes +# Usage: make test STRESS_TEST_MODE=moderate PYTEST_ARGS="-v" +# make test LOG=info (adds --log-cli-level=INFO to default PYTEST_ARGS) +export STRESS_TEST_MODE +export DGRAPH_IMAGE_TAG + +# When LOG is set (e.g., LOG=info), inject --log-cli-level into pytest flags. +# Works with both the default PYTEST_ARGS and explicit overrides: +# make test LOG=info → -v --benchmark-disable --log-cli-level=INFO +# make benchmark LOG=warning → --benchmark-only ... --log-cli-level=WARNING +# make test PYTEST_ARGS="-x" LOG=debug → -x --log-cli-level=DEBUG +PYTEST_ARGS ?= -v --benchmark-disable +ifdef LOG + LOG_FLAG := --log-cli-level=$(shell echo '$(LOG)' | tr '[:lower:]' '[:upper:]') + PYTEST_ARGS += $(LOG_FLAG) +endif +export LOG +export PYTEST_ARGS + # Source venv if it exists and isn't already active PROJECT_VENV := $(CURDIR)/.venv ACTIVATE := $(wildcard .venv/bin/activate) @@ -15,7 +34,7 @@ else RUN := endif -.PHONY: help setup sync deps deps-uv deps-trunk deps-docker test check protogen clean build publish +.PHONY: help setup sync deps deps-uv deps-trunk deps-docker test benchmark check protogen clean build publish .DEFAULT_GOAL := help @@ -23,6 +42,13 @@ help: ## Show this help message @echo "" @echo "Environment Variables:" @echo " INSTALL_MISSING_TOOLS=true Enable automatic installation of missing tools (default: disabled)" + @echo " LOG= Add --log-cli-level to pytest (e.g., LOG=info, LOG=debug)" + @echo " Works with both 'test' and 'benchmark' targets" + @echo " STRESS_TEST_MODE= Stress test preset: quick (default), moderate, full" + @echo " PYTEST_ARGS=\"...\" Override default pytest flags (default: -v --benchmark-disable)" + @echo " Note: overrides LOG when set explicitly. 'benchmark' sets its own" + @echo " PYTEST_ARGS internally but still honours LOG" + @echo " DGRAPH_IMAGE_TAG= Override the Dgraph Docker image tag (default: latest)" @echo "" @echo "Available targets:" @grep -E '^[a-zA-Z_-]+:.*?## .*$$' $(MAKEFILE_LIST) | awk 'BEGIN {FS = ":.*?## "}; {printf "\033[36m%-15s\033[0m %s\n", $$1, $$2}' @@ -51,8 +77,31 @@ clean: ## Cleans build artifacts build: deps-uv sync protogen ## Builds release package $(RUN) uv build -test: deps-uv sync ## Run tests - bash scripts/local-test.sh +test: deps-uv sync ## Run tests (use PYTEST_ARGS to pass options, e.g., make test PYTEST_ARGS="-v tests/test_connect.py") + bash scripts/local-test.sh $(PYTEST_ARGS) + +benchmark: ## Run benchmarks (measures per-operation latency with pytest-benchmark) + @# Outputs (all .gitignored): + @# benchmark-results.json Phase 1 results (pytest-benchmark JSON) + @# benchmark-histogram.svg Phase 1 latency histogram + @# stress-benchmark-results.json Phase 2 results (pytest-benchmark JSON) + @# + @# Phase 1: Per-operation latency benchmarks against a clean database. + @# Runs targeted benchmark tests (test_benchmark_*.py) which measure individual + @# operations (query, mutation, upsert, etc.) in isolation. Each test creates a + @# fresh schema via drop_all, so these MUST run on their own Dgraph cluster — + @# the rapid schema churn destabilises the alpha for any tests that follow. + @echo "═══ Phase 1: Per-operation latency benchmarks ═══" + $(MAKE) test PYTEST_ARGS="--benchmark-only --benchmark-json=benchmark-results.json --benchmark-histogram=benchmark-histogram -v $(LOG_FLAG) tests/test_benchmark_async.py tests/test_benchmark_sync.py" + @# Phase 2: Stress-test benchmarks under sustained concurrent load. + @# Runs stress tests (test_stress_*.py) with the 1-million-movie dataset loaded. + @# Uses a separate Dgraph cluster (via a second 'make test' invocation) so the + @# alpha starts fresh after Phase 1's drop_all churn. + @# benchmark.pedantic(rounds=1) in each stress test prevents pytest-benchmark + @# from compounding iterations — the stress_config["rounds"] inner loop + @# (controlled by STRESS_TEST_MODE) handles repetition instead. + @echo "═══ Phase 2: Stress-test benchmarks (moderate load, 1M movies) ═══" + $(MAKE) test STRESS_TEST_MODE=moderate PYTEST_ARGS="--benchmark-only --benchmark-json=stress-benchmark-results.json -v $(LOG_FLAG) tests/test_stress_async.py tests/test_stress_sync.py" publish: clean build ## Publish a new release to PyPi (requires UV_PUBLISH_USERNAME and UV_PUBLISH_PASSWORD to be set) $(RUN) uv publish diff --git a/PUBLISHING.md b/PUBLISHING.md index 1f453b1..f79122f 100644 --- a/PUBLISHING.md +++ b/PUBLISHING.md @@ -9,12 +9,12 @@ This document contains instructions to create a new pydgraph release and publish 1. Create a new branch (prepare-for-release-vXX.X.X, for instance) 1. Update the VERSION in pydgraph/meta.py 1. Build pydgraph locally, see the [README](README.md#build-from-source) -1. Run the tests (`bash scripts/local-test.sh`) to ensure everything works +1. Run the tests (`make test`) to ensure everything works 1. If you're concerned about incompatibilities with earlier Dgraph versions, invoke the test suite with earlier Dgraph versions ```sh - DGRAPH_IMAGE_TAG=vX.X.X bash scripts/local-test.sh + make test DGRAPH_IMAGE_TAG=vX.X.X ``` 1. If you happen to have the testpypi access token, try a test upload to testpypi: diff --git a/docs/plans/2026-02-10-targeted-benchmarks-design.md b/docs/plans/2026-02-10-targeted-benchmarks-design.md new file mode 100644 index 0000000..f2e8ed3 --- /dev/null +++ b/docs/plans/2026-02-10-targeted-benchmarks-design.md @@ -0,0 +1,982 @@ +# Targeted Benchmark Tests Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan +> task-by-task. + +**Goal:** Create isolated benchmarks for individual pydgraph operations to pinpoint performance +regression sources when stress tests regress. + +**Architecture:** Two test files (`test_benchmark_sync.py`, `test_benchmark_async.py`) using +pytest-benchmark with real Dgraph compose backend via existing fixtures. Each operation gets its own +benchmark test. + +**Tech Stack:** pytest, pytest-benchmark, existing pydgraph fixtures, real Dgraph backend + +--- + +## Task 1: Create Sync Benchmark Test File Structure + +**Files:** + +- Create: `tests/test_benchmark_sync.py` + +**Step 1: Create the test file with imports and class structure** + +```python +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Targeted benchmark tests for sync client operations. + +These benchmarks measure individual operations in isolation to help +identify the root cause of performance regressions in stress tests. + +Usage: + # Run all sync benchmarks + pytest tests/test_benchmark_sync.py -v + + # Compare against previous run + pytest tests/test_benchmark_sync.py --benchmark-compare +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +import pydgraph +from pydgraph import DgraphClient, run_transaction +from pydgraph.proto import api_pb2 as api + +from .helpers import generate_person + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture +``` + +**Step 2: Verify the file is created correctly** + +Run: `python -c "import tests.test_benchmark_sync"` Expected: No import errors + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_sync.py +git commit -m "feat(tests): add sync benchmark test file structure" +``` + +--- + +## Task 2: Implement Query Benchmarks (Sync) + +**Files:** + +- Modify: `tests/test_benchmark_sync.py` + +**Step 1: Add query benchmark tests** + +```python +class TestSyncQueryBenchmarks: + """Benchmarks for sync query operations.""" + + def test_benchmark_query_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a simple read query.""" + client = sync_client_with_schema + + # Setup: seed data outside benchmark + txn = client.txn() + txn.mutate(set_obj=generate_person(0), commit_now=True) + + query = """query { + people(func: has(name), first: 1) { + name + email + age + } + }""" + + def run_query() -> api.Response: + txn = client.txn(read_only=True) + return txn.query(query) + + benchmark(run_query) + + def test_benchmark_query_with_vars_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a query with variables.""" + client = sync_client_with_schema + + # Setup: seed data + txn = client.txn() + txn.mutate(set_obj={"name": "BenchmarkUser", "email": "bench@test.com"}, commit_now=True) + + query = """query people($name: string) { + people(func: eq(name, $name)) { + name + email + } + }""" + + def run_query() -> api.Response: + txn = client.txn(read_only=True) + return txn.query(query, variables={"$name": "BenchmarkUser"}) + + benchmark(run_query) + + def test_benchmark_query_best_effort_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a best-effort read query.""" + client = sync_client_with_schema + + # Setup: seed data + txn = client.txn() + txn.mutate(set_obj=generate_person(0), commit_now=True) + + query = "{ people(func: has(name), first: 1) { name } }" + + def run_query() -> api.Response: + txn = client.txn(read_only=True, best_effort=True) + return txn.query(query) + + benchmark(run_query) +``` + +**Step 2: Run the query benchmarks to verify** + +Run: `pytest tests/test_benchmark_sync.py::TestSyncQueryBenchmarks -v --benchmark-disable` Expected: +PASS (3 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_sync.py +git commit -m "feat(tests): add sync query benchmarks" +``` + +--- + +## Task 3: Implement Mutation Benchmarks (Sync) + +**Files:** + +- Modify: `tests/test_benchmark_sync.py` + +**Step 1: Add mutation benchmark tests** + +```python +class TestSyncMutationBenchmarks: + """Benchmarks for sync mutation operations.""" + + def test_benchmark_mutation_commit_now_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation with commit_now (single round-trip).""" + client = sync_client_with_schema + counter = [0] + + def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + return txn.mutate(set_obj=generate_person(counter[0]), commit_now=True) + + benchmark(run_mutation) + + def test_benchmark_mutation_explicit_commit_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation with explicit commit (two round-trips).""" + client = sync_client_with_schema + counter = [0] + + def run_mutation() -> api.TxnContext: + counter[0] += 1 + txn = client.txn() + txn.mutate(set_obj=generate_person(counter[0])) + return txn.commit() + + benchmark(run_mutation) + + def test_benchmark_discard_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation followed by discard (rollback cost).""" + client = sync_client_with_schema + counter = [0] + + def run_mutation() -> None: + counter[0] += 1 + txn = client.txn() + txn.mutate(set_obj=generate_person(counter[0])) + txn.discard() + + benchmark(run_mutation) + + def test_benchmark_mutation_nquads_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark N-Quads mutation format.""" + client = sync_client_with_schema + counter = [0] + + def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + nquads = f''' + _:person "Person_{counter[0]}" . + _:person "person{counter[0]}@test.com" . + _:person "{counter[0] % 80}" . + ''' + return txn.mutate(set_nquads=nquads, commit_now=True) + + benchmark(run_mutation) + + def test_benchmark_delete_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark delete mutation.""" + client = sync_client_with_schema + + # Pre-create nodes to delete + uids: list[str] = [] + for i in range(100): + txn = client.txn() + resp = txn.mutate(set_obj=generate_person(i), commit_now=True) + uids.append(next(iter(resp.uids.values()))) + + uid_index = [0] + + def run_delete() -> api.Response: + idx = uid_index[0] % len(uids) + uid_index[0] += 1 + txn = client.txn() + return txn.mutate(del_obj={"uid": uids[idx]}, commit_now=True) + + benchmark(run_delete) +``` + +**Step 2: Run the mutation benchmarks to verify** + +Run: `pytest tests/test_benchmark_sync.py::TestSyncMutationBenchmarks -v --benchmark-disable` +Expected: PASS (5 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_sync.py +git commit -m "feat(tests): add sync mutation benchmarks" +``` + +--- + +## Task 4: Implement Advanced Transaction Benchmarks (Sync) + +**Files:** + +- Modify: `tests/test_benchmark_sync.py` + +**Step 1: Add upsert and batch benchmarks** + +```python +class TestSyncTransactionBenchmarks: + """Benchmarks for advanced sync transaction operations.""" + + def test_benchmark_upsert_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark upsert operation (query + conditional mutation).""" + client = sync_client_with_schema + counter = [0] + + def run_upsert() -> api.Response: + counter[0] += 1 + email = f"upsert{counter[0]}@test.com" + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f''' + uid(u) "{email}" . + uid(u) "Upsert_{counter[0]}" . + '''.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + return txn.do_request(request) + + benchmark(run_upsert) + + def test_benchmark_batch_mutations_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark multiple mutations in one transaction.""" + client = sync_client_with_schema + counter = [0] + batch_size = 10 + + def run_batch() -> api.TxnContext: + txn = client.txn() + for i in range(batch_size): + counter[0] += 1 + txn.mutate(set_obj=generate_person(counter[0])) + return txn.commit() + + benchmark(run_batch) + + def test_benchmark_run_transaction_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark run_transaction helper overhead.""" + client = sync_client_with_schema + counter = [0] + + def txn_func(txn: pydgraph.Txn) -> str: + counter[0] += 1 + response = txn.mutate(set_obj=generate_person(counter[0]), commit_now=True) + return next(iter(response.uids.values()), "") + + def run_with_helper() -> str: + return run_transaction(client, txn_func) + + benchmark(run_with_helper) +``` + +**Step 2: Run the transaction benchmarks to verify** + +Run: `pytest tests/test_benchmark_sync.py::TestSyncTransactionBenchmarks -v --benchmark-disable` +Expected: PASS (3 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_sync.py +git commit -m "feat(tests): add sync transaction benchmarks" +``` + +--- + +## Task 5: Implement Client Operation Benchmarks (Sync) + +**Files:** + +- Modify: `tests/test_benchmark_sync.py` + +**Step 1: Add client-level operation benchmarks** + +```python +class TestSyncClientBenchmarks: + """Benchmarks for sync client-level operations.""" + + def test_benchmark_check_version_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark check_version (health check).""" + client = sync_client_with_schema + + def run_check() -> str: + return client.check_version() + + benchmark(run_check) + + def test_benchmark_alter_schema_sync( + self, + sync_client_with_schema: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark schema alter operation.""" + client = sync_client_with_schema + counter = [0] + + def run_alter() -> api.Payload: + counter[0] += 1 + # Add a new predicate each time to avoid conflicts + schema = f"benchmark_pred_{counter[0]}: string @index(exact) ." + return client.alter(pydgraph.Operation(schema=schema)) + + benchmark(run_alter) +``` + +**Step 2: Run the client benchmarks to verify** + +Run: `pytest tests/test_benchmark_sync.py::TestSyncClientBenchmarks -v --benchmark-disable` +Expected: PASS (2 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_sync.py +git commit -m "feat(tests): add sync client operation benchmarks" +``` + +--- + +## Task 6: Create Async Benchmark Test File + +**Files:** + +- Create: `tests/test_benchmark_async.py` + +**Step 1: Create the async test file with imports** + +```python +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Targeted benchmark tests for async client operations. + +These benchmarks measure individual async operations in isolation to help +identify the root cause of performance regressions in stress tests. + +Usage: + # Run all async benchmarks + pytest tests/test_benchmark_async.py -v + + # Compare against previous run + pytest tests/test_benchmark_async.py --benchmark-compare +""" + +from __future__ import annotations + +import asyncio +import json +from typing import TYPE_CHECKING, Any + +import pydgraph +from pydgraph import AsyncDgraphClient, run_transaction_async +from pydgraph.proto import api_pb2 as api + +from .helpers import generate_person + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture +``` + +**Step 2: Verify the file is created correctly** + +Run: `python -c "import tests.test_benchmark_async"` Expected: No import errors + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_async.py +git commit -m "feat(tests): add async benchmark test file structure" +``` + +--- + +## Task 7: Implement Query Benchmarks (Async) + +**Files:** + +- Modify: `tests/test_benchmark_async.py` + +**Step 1: Add async query benchmark tests** + +Note: Async benchmarks use the `async_client_with_schema_for_benchmark` fixture which returns +`(client, loop)` to avoid pytest-asyncio/benchmark conflicts. + +```python +class TestAsyncQueryBenchmarks: + """Benchmarks for async query operations.""" + + def test_benchmark_query_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a simple async read query.""" + client, loop = async_client_with_schema_for_benchmark + + # Setup: seed data outside benchmark + async def setup() -> None: + txn = client.txn() + await txn.mutate(set_obj=generate_person(0), commit_now=True) + + loop.run_until_complete(setup()) + + query = """query { + people(func: has(name), first: 1) { + name + email + age + } + }""" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True) + return await txn.query(query) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) + + def test_benchmark_query_with_vars_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark an async query with variables.""" + client, loop = async_client_with_schema_for_benchmark + + # Setup + async def setup() -> None: + txn = client.txn() + await txn.mutate(set_obj={"name": "BenchmarkUser", "email": "bench@test.com"}, commit_now=True) + + loop.run_until_complete(setup()) + + query = """query people($name: string) { + people(func: eq(name, $name)) { + name + email + } + }""" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True) + return await txn.query(query, variables={"$name": "BenchmarkUser"}) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) + + def test_benchmark_query_best_effort_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a best-effort async read query.""" + client, loop = async_client_with_schema_for_benchmark + + # Setup + async def setup() -> None: + txn = client.txn() + await txn.mutate(set_obj=generate_person(0), commit_now=True) + + loop.run_until_complete(setup()) + + query = "{ people(func: has(name), first: 1) { name } }" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True, best_effort=True) + return await txn.query(query) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) +``` + +**Step 2: Run the async query benchmarks to verify** + +Run: `pytest tests/test_benchmark_async.py::TestAsyncQueryBenchmarks -v --benchmark-disable` +Expected: PASS (3 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_async.py +git commit -m "feat(tests): add async query benchmarks" +``` + +--- + +## Task 8: Implement Mutation Benchmarks (Async) + +**Files:** + +- Modify: `tests/test_benchmark_async.py` + +**Step 1: Add async mutation benchmark tests** + +```python +class TestAsyncMutationBenchmarks: + """Benchmarks for async mutation operations.""" + + def test_benchmark_mutation_commit_now_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation with commit_now.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + return await txn.mutate(set_obj=generate_person(counter[0]), commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_mutation_explicit_commit_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation with explicit commit.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_mutation() -> api.TxnContext: + counter[0] += 1 + txn = client.txn() + await txn.mutate(set_obj=generate_person(counter[0])) + return await txn.commit() + + def benchmark_wrapper() -> api.TxnContext: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_discard_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation followed by discard.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_mutation() -> None: + counter[0] += 1 + txn = client.txn() + await txn.mutate(set_obj=generate_person(counter[0])) + await txn.discard() + + def benchmark_wrapper() -> None: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_mutation_nquads_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async N-Quads mutation.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + nquads = f''' + _:person "Person_{counter[0]}" . + _:person "person{counter[0]}@test.com" . + _:person "{counter[0] % 80}" . + ''' + return await txn.mutate(set_nquads=nquads, commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_delete_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async delete mutation.""" + client, loop = async_client_with_schema_for_benchmark + + # Pre-create nodes to delete + async def setup() -> list[str]: + uids = [] + for i in range(100): + txn = client.txn() + resp = await txn.mutate(set_obj=generate_person(i), commit_now=True) + uids.append(next(iter(resp.uids.values()))) + return uids + + uids = loop.run_until_complete(setup()) + uid_index = [0] + + async def run_delete() -> api.Response: + idx = uid_index[0] % len(uids) + uid_index[0] += 1 + txn = client.txn() + return await txn.mutate(del_obj={"uid": uids[idx]}, commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_delete()) + + benchmark(benchmark_wrapper) +``` + +**Step 2: Run the async mutation benchmarks to verify** + +Run: `pytest tests/test_benchmark_async.py::TestAsyncMutationBenchmarks -v --benchmark-disable` +Expected: PASS (5 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_async.py +git commit -m "feat(tests): add async mutation benchmarks" +``` + +--- + +## Task 9: Implement Advanced Transaction Benchmarks (Async) + +**Files:** + +- Modify: `tests/test_benchmark_async.py` + +**Step 1: Add async upsert and batch benchmarks** + +```python +class TestAsyncTransactionBenchmarks: + """Benchmarks for advanced async transaction operations.""" + + def test_benchmark_upsert_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async upsert operation.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_upsert() -> api.Response: + counter[0] += 1 + email = f"upsert{counter[0]}@test.com" + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f''' + uid(u) "{email}" . + uid(u) "Upsert_{counter[0]}" . + '''.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + return await txn.do_request(request) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_upsert()) + + benchmark(benchmark_wrapper) + + def test_benchmark_batch_mutations_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark multiple async mutations in one transaction.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + batch_size = 10 + + async def run_batch() -> api.TxnContext: + txn = client.txn() + for i in range(batch_size): + counter[0] += 1 + await txn.mutate(set_obj=generate_person(counter[0])) + return await txn.commit() + + def benchmark_wrapper() -> api.TxnContext: + return loop.run_until_complete(run_batch()) + + benchmark(benchmark_wrapper) + + def test_benchmark_run_transaction_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark run_transaction_async helper overhead.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def txn_func(txn: pydgraph.AsyncTxn) -> str: + counter[0] += 1 + response = await txn.mutate(set_obj=generate_person(counter[0]), commit_now=True) + return next(iter(response.uids.values()), "") + + async def run_with_helper() -> str: + return await run_transaction_async(client, txn_func) + + def benchmark_wrapper() -> str: + return loop.run_until_complete(run_with_helper()) + + benchmark(benchmark_wrapper) +``` + +**Step 2: Run the async transaction benchmarks to verify** + +Run: `pytest tests/test_benchmark_async.py::TestAsyncTransactionBenchmarks -v --benchmark-disable` +Expected: PASS (3 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_async.py +git commit -m "feat(tests): add async transaction benchmarks" +``` + +--- + +## Task 10: Implement Client Operation Benchmarks (Async) + +**Files:** + +- Modify: `tests/test_benchmark_async.py` + +**Step 1: Add async client-level operation benchmarks** + +```python +class TestAsyncClientBenchmarks: + """Benchmarks for async client-level operations.""" + + def test_benchmark_check_version_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async check_version.""" + client, loop = async_client_with_schema_for_benchmark + + async def run_check() -> str: + return await client.check_version() + + def benchmark_wrapper() -> str: + return loop.run_until_complete(run_check()) + + benchmark(benchmark_wrapper) + + def test_benchmark_alter_schema_async( + self, + async_client_with_schema_for_benchmark: tuple[AsyncDgraphClient, asyncio.AbstractEventLoop], + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async schema alter operation.""" + client, loop = async_client_with_schema_for_benchmark + counter = [0] + + async def run_alter() -> api.Payload: + counter[0] += 1 + schema = f"benchmark_pred_{counter[0]}: string @index(exact) ." + return await client.alter(pydgraph.Operation(schema=schema)) + + def benchmark_wrapper() -> api.Payload: + return loop.run_until_complete(run_alter()) + + benchmark(benchmark_wrapper) +``` + +**Step 2: Run the async client benchmarks to verify** + +Run: `pytest tests/test_benchmark_async.py::TestAsyncClientBenchmarks -v --benchmark-disable` +Expected: PASS (2 tests) + +**Step 3: Commit** + +```bash +git add tests/test_benchmark_async.py +git commit -m "feat(tests): add async client operation benchmarks" +``` + +--- + +## Task 11: Run Full Benchmark Suite and Verify + +**Step 1: Run all sync benchmarks** + +Run: `pytest tests/test_benchmark_sync.py -v --benchmark-disable` Expected: PASS (13 tests) + +**Step 2: Run all async benchmarks** + +Run: `pytest tests/test_benchmark_async.py -v --benchmark-disable` Expected: PASS (13 tests) + +**Step 3: Run benchmarks with actual timing** + +Run: `pytest tests/test_benchmark_sync.py tests/test_benchmark_async.py -v` Expected: All benchmarks +complete with timing data + +**Step 4: Commit final state** + +```bash +git add -A +git commit -m "test(benchmarks): complete targeted benchmark test suite" +``` + +--- + +## Task 12: Update PR and Generate Benchmark Results + +**Step 1: Push changes** + +Run: `git push` + +**Step 2: Generate benchmark SVG** + +Run: `make benchmark` Expected: Benchmark SVG generated + +**Step 3: Update PR description with new benchmark results** + +Include the new targeted benchmarks in the PR description, noting that these complement the stress +tests by isolating individual operations. + +--- + +## Summary + +**Created files:** + +- `tests/test_benchmark_sync.py` - 13 sync benchmarks +- `tests/test_benchmark_async.py` - 13 async benchmarks + +**Operations covered:** | Category | Operations | |----------|------------| | Query | Simple, with +variables, best-effort | | Mutation | commit_now, explicit commit, discard, N-Quads, delete | | +Transaction | Upsert, batch, run_transaction helper | | Client | check_version, alter schema | + +**Total: 26 targeted benchmarks** + +**Regression analysis workflow:** + +1. Stress test regresses +2. Run targeted benchmarks +3. Compare individual operation times +4. Identify specific operation that degraded diff --git a/pydgraph/__init__.py b/pydgraph/__init__.py index 6e8b404..19996fb 100755 --- a/pydgraph/__init__.py +++ b/pydgraph/__init__.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 # Async client +# Errors - export both module and individual classes for convenience +from pydgraph import errors from pydgraph.async_client import AsyncDgraphClient, async_open from pydgraph.async_client_stub import AsyncDgraphClientStub from pydgraph.async_txn import AsyncTxn @@ -66,6 +68,7 @@ "Version", "async_open", "client_stub", + "errors", "open", "retry", "retry_async", diff --git a/pyproject.toml b/pyproject.toml index 83c4869..204ad50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -50,6 +50,8 @@ dev = [ "grpcio-tools>=1.66.2", "pytest>=8.3.3", "pytest-asyncio>=0.23.0", + "pytest-benchmark>=4.0.0", + "pygal>=3.0.0", "ruff>=0.8.4", "ty>=0.0.8", ] @@ -190,6 +192,10 @@ ignore_missing_imports = true module = "pytest.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "pytest_benchmark.*" +ignore_missing_imports = true + [[tool.mypy.overrides]] module = "grpc.*" ignore_missing_imports = true diff --git a/scripts/local-test.sh b/scripts/local-test.sh index 358f89a..27e713e 100755 --- a/scripts/local-test.sh +++ b/scripts/local-test.sh @@ -37,12 +37,49 @@ function stopCluster() { DockerCompose down -t 5 -v } -SRCDIR=$(readlink -f "${BASH_SOURCE[0]%/*}") +if [[ -z ${SRCDIR} ]]; then + SRCDIR=$(readlink -f "${BASH_SOURCE[0]%/*}") +fi +if [[ ! -d "${SRCDIR}/../scripts" ]]; then + echo "No scripts directory found at \"${SRCDIR}/../scripts\"" + echo "Trying alternate locations for SRCDIR..." + # shellcheck disable=SC2043 + for dir in "./scripts"; do + echo -n "Trying \"${dir}\"... " + if [[ -d ${dir} ]]; then + echo "found: ${dir}" + SRCDIR="${dir}" + echo "Setting SRCDIR=\"${dir}\"" + break + else + echo "not found: ${dir}" + fi + if [[ ! -d ${SRCDIR} ]]; then + echo "Unable to determine script SRCDIR." + echo "Please re-run with SRCDIR set to correct project root." + exit 1 + fi + done +fi + readonly SRCDIR +SRCDIR_VENV="${SRCDIR}/../.venv" +VENV_ACTIVATE="${SRCDIR_VENV}/bin/activate" +if [[ ${VIRTUAL_ENV} != "${SRCDIR_VENV}" ]]; then + if [[ -e ${VENV_ACTIVATE} ]]; then + echo "Ensuring use of SRCDIR virtual env using \"${VENV_ACTIVATE}\"" + # shellcheck disable=SC1090 + source "${VENV_ACTIVATE}" + else + echo "WARNING: Can't activate SRCDIR virtual env, no activate script found at \"${VENV_ACTIVATE}\"" + fi +fi + # Run cluster and tests pushd "$(dirname "${SRCDIR}")" || exit pushd "${SRCDIR}"/../tests || exit + restartCluster # shellcheck disable=SC2312 alphaGrpcPort=$(DockerCompose port alpha1 9080 | awk -F: '{print $2}') diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..38ad0a5 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,338 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared pytest fixtures for pydgraph tests.""" + +from __future__ import annotations + +import asyncio +import gzip +import logging +import os + +# Prevent gRPC atfork crash when pytest-benchmark calls subprocess.fork() +# to collect machine info while gRPC channels are still open. +os.environ.setdefault("GRPC_ENABLE_FORK_SUPPORT", "0") +import re +import shutil +import tempfile +import time +import urllib.request +from collections.abc import AsyncGenerator, Generator +from concurrent.futures import ThreadPoolExecutor +from pathlib import Path +from typing import Any + +import pytest + +import pydgraph +from pydgraph import ( + AsyncDgraphClient, + AsyncDgraphClientStub, + DgraphClient, + DgraphClientStub, +) + +from .helpers import TEST_SERVER_ADDR + +# ============================================================================= +# Data Fixture Configuration (fetched on demand for stress/benchmark tests) +# ============================================================================= + +DATA_FIXTURE_DIR = Path(__file__).parent / "resources" +DATA_FIXTURE_BASE_URL = ( + "https://github.com/dgraph-io/dgraph-benchmarks/raw/refs/heads/main/data/" +) + +logger = logging.getLogger(__name__) + + +# ============================================================================= +# Alpha Health Check +# ============================================================================= + + +def _wait_for_alpha_ready( + client: DgraphClient, + *, + max_wait: float = 60.0, + poll_interval: float = 2.0, +) -> None: + """Block until the Dgraph alpha can serve real queries, or raise after max_wait. + + WHY THIS EXISTS: + After bulk-loading 1M+ RDF triples, the alpha needs time to finish + background indexing and Raft log compaction. A lightweight health check + like ``check_version()`` can pass while the alpha is still unable to serve + queries — then the first stress test hits a dead connection. + + This helper runs a real read query (not just a version ping) so we only + proceed once the alpha is genuinely ready for workload. It is called at + the end of ``movies_data_loaded``, after all data is committed. + """ + probe_query = "{ probe(func: has(name), first: 1) { name } }" + start = time.time() + last_exc: Exception | None = None + while time.time() - start < max_wait: + try: + txn = client.txn(read_only=True) + txn.query(probe_query) + except Exception as exc: + last_exc = exc + time.sleep(poll_interval) + else: + return # alpha is ready for real work + + raise RuntimeError( + f"Dgraph alpha not query-ready after {max_wait}s — last error: {last_exc}" + ) + + +# ============================================================================= +# Stress Test Configuration +# ============================================================================= + + +@pytest.fixture(scope="session") +def stress_config() -> dict[str, Any]: + """Configuration for stress tests based on STRESS_TEST_MODE env var. + + Modes: + quick: Fast sanity check (default, ~30s) - 20 workers, 200 ops, 50 rounds + moderate: Sustained async stress (5-8 min) - 10 workers, 200 ops, 8 rounds + full: Extended sync stress (12-16 min) - 15 workers, 500 ops, 15 rounds + + Parameters: + workers: Concurrency level (thread pool size or asyncio task count) + ops: Operations per concurrent batch + rounds: How many times each test repeats its concurrent batch. + Each stress test uses ``benchmark.pedantic(rounds=1)`` so + pytest-benchmark doesn't compound on top of this inner loop. + load_movies: Whether to load the 1million movie dataset + """ + mode = os.environ.get("STRESS_TEST_MODE", "quick") + + if mode == "full": + config = { + "mode": "full", + "workers": 15, + "ops": 500, + "rounds": 15, + "load_movies": True, + } + elif mode == "moderate": + config = { + "mode": "moderate", + "workers": 10, + "ops": 200, + "rounds": 8, + "load_movies": True, + } + else: + config = { + "mode": "quick", + "workers": 20, + "ops": 200, + "rounds": 50, + "load_movies": os.environ.get("STRESS_TEST_LOAD_MOVIES", "").lower() + in ("1", "true"), + } + + return config + + +# ============================================================================= +# Movie Dataset Fixtures +# ============================================================================= + + +def _downloaded_data_fixture_path(name: str) -> Path: + """Download a data fixture file if it doesn't exist locally.""" + path = DATA_FIXTURE_DIR / name + if not path.exists() or path.stat().st_size == 0: + url = DATA_FIXTURE_BASE_URL + name + path.parent.mkdir(parents=True, exist_ok=True) + logger.info("Downloading %s from %s", name, url) + urllib.request.urlretrieve(url, path) # noqa: S310 + logger.info("Downloaded %s (%.1f MB)", name, path.stat().st_size / 1024 / 1024) + return path + + +@pytest.fixture(scope="session") +def movies_schema_path() -> Path: + """Path to the 1million movie schema file. + + Downloads from dgraph-benchmarks repo if not present locally. + """ + return _downloaded_data_fixture_path("1million.schema") + + +@pytest.fixture(scope="session") +def movies_rdf_gz() -> Path: + """Path to the compressed 1million movie RDF data file. + + Downloads from dgraph-benchmarks repo if not present locally. + """ + return _downloaded_data_fixture_path("1million.rdf.gz") + + +@pytest.fixture(scope="session") +def movies_rdf(movies_rdf_gz: Path) -> Generator[Path, None, None]: + """Path to the uncompressed 1million movie RDF data file. + + Decompresses the gzipped RDF file to a temporary directory that is + automatically cleaned up at the end of the test session. + """ + with tempfile.TemporaryDirectory() as tempdir: + output_path = Path(tempdir) / "1million.rdf" + logger.info("Decompressing %s to %s", movies_rdf_gz, output_path) + with gzip.open(movies_rdf_gz, "rb") as f_in, open(output_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + logger.info( + "Decompressed RDF file: %.1f MB", output_path.stat().st_size / 1024 / 1024 + ) + yield output_path + + +@pytest.fixture(scope="session") +def sync_client() -> Generator[DgraphClient, None, None]: + """Session-scoped sync client with login.""" + client_stub = DgraphClientStub(TEST_SERVER_ADDR) + client = DgraphClient(client_stub) + + for _ in range(30): + try: + client.login("groot", "password") + break + except Exception as e: + if "user not found" in str(e): + raise + time.sleep(0.1) + + yield client + client.close() + + +@pytest.fixture(scope="session") +def movies_data_loaded( + request: pytest.FixtureRequest, + stress_config: dict[str, Any], +) -> bool: + """Load the 1million movie dataset into Dgraph if load_movies is True. + + Uses lazy fixture evaluation - only requests movies_rdf and client fixtures + when load_movies is True, avoiding unnecessary downloads in quick mode. + + Returns True if data was loaded, False otherwise. + """ + if not stress_config["load_movies"]: + logger.info("Skipping movie data loading (load_movies=False)") + return False + + # Lazy evaluation: only instantiate session-scoped fixtures when actually needed + client: DgraphClient = request.getfixturevalue("sync_client") + movies_rdf_path: Path = request.getfixturevalue("movies_rdf") + schema_content: str = request.getfixturevalue("movies_schema") + + # Apply schema before loading data + client.alter(pydgraph.Operation(drop_all=True)) + client.alter(pydgraph.Operation(schema=schema_content)) + + # Pattern to convert explicit UIDs and UUIDs to blank nodes + # Matches: <12345> (numeric UIDs) and <24d9530f-553a-43fc-8eb6-14ac667b2387> (UUIDs) + # These formats can't be directly used as Dgraph UIDs + uid_pattern = re.compile( + r"<(\d+|[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12})>", + re.IGNORECASE, + ) + + def convert_uids_to_blank_nodes(line: str) -> str: + """Convert <12345> or to _:identifier so Dgraph assigns new UIDs.""" + return uid_pattern.sub(r"_:\1", line) + + # Load RDF data in batches + batch_size = 10000 + batch: list[str] = [] + total_loaded = 0 + + logger.info("Loading RDF data from %s", movies_rdf_path) + with open(movies_rdf_path, encoding="utf-8") as f: + for line in f: + line = line.strip() + if line and not line.startswith("#"): + # Convert UIDs to blank nodes + line = convert_uids_to_blank_nodes(line) + batch.append(line) + + if len(batch) >= batch_size: + nquads = "\n".join(batch) + txn = client.txn() + txn.mutate(set_nquads=nquads, commit_now=True) + total_loaded += len(batch) + if total_loaded % 100000 == 0: + logger.info("Loaded %d RDF triples", total_loaded) + batch = [] + + # Load remaining batch + if batch: + nquads = "\n".join(batch) + txn = client.txn() + txn.mutate(set_nquads=nquads, commit_now=True) + total_loaded += len(batch) + + logger.info("Finished loading %d RDF triples", total_loaded) + + # After bulk-loading 1M+ triples the alpha needs time to finish background + # indexing and Raft log compaction. Without this gate the first stress test + # (especially under ``--benchmark-only``) hits the alpha while it is still + # recovering and gets connection-refused on every operation. + logger.info("Waiting for alpha to finish indexing before stress tests start") + _wait_for_alpha_ready(client) + logger.info("Alpha is query-ready") + + return True + + +# ============================================================================= +# Executor Fixture (for sync stress tests) +# ============================================================================= + + +@pytest.fixture +def executor( + stress_config: dict[str, Any], +) -> Generator[ThreadPoolExecutor, None, None]: + """Create ThreadPoolExecutor for concurrent stress tests.""" + workers = stress_config["workers"] + with ThreadPoolExecutor(max_workers=workers) as ex: + yield ex + + +@pytest.fixture(scope="session") +def movies_schema(movies_schema_path: Path) -> str: + """Return the movies schema content as a string.""" + return movies_schema_path.read_text() + + +# ============================================================================= +# Async Client Fixtures +# ============================================================================= + + +@pytest.fixture +async def async_client() -> AsyncGenerator[AsyncDgraphClient, None]: + """Async client with login.""" + client_stub = AsyncDgraphClientStub(TEST_SERVER_ADDR) + client = AsyncDgraphClient(client_stub) + + for _ in range(30): + try: + await client.login("groot", "password") + break + except Exception as e: + if "user not found" in str(e): + raise + await asyncio.sleep(0.1) + + yield client + await client.close() diff --git a/tests/helpers.py b/tests/helpers.py new file mode 100644 index 0000000..14b71f4 --- /dev/null +++ b/tests/helpers.py @@ -0,0 +1,41 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Shared test helpers, constants, and utility functions. + +This module contains constants and helper functions used across multiple test files. +It's separate from conftest.py to keep fixture definitions isolated from shared utilities. +""" + +from __future__ import annotations + +import os +import random +from typing import Any + +# ============================================================================= +# Configuration Constants +# ============================================================================= + +TEST_SERVER_ADDR = os.getenv("TEST_SERVER_ADDR", "localhost:9180") + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def generate_movie(index: int) -> dict[str, Any]: + """Generate a movie object for testing using 1million.schema predicates. + + Args: + index: Unique index for this movie (used in name generation) + + Returns: + Dictionary with movie attributes suitable for Dgraph mutation + """ + return { + "name": f"TestMovie_{index}_{random.randint(1000, 9999)}", # noqa: S311 + "tagline": f"An amazing test film number {index}", + "email": f"movie{index}_{random.randint(1000, 9999)}@test.com", # noqa: S311 + } diff --git a/tests/test_alter.py b/tests/test_alter.py index 122d655..9d6818e 100644 --- a/tests/test_alter.py +++ b/tests/test_alter.py @@ -257,12 +257,5 @@ def test_set_schema_empty_string(self) -> None: self.client.set_schema("") -def suite() -> unittest.TestSuite: - """Returns a test suite object.""" - suite_obj = unittest.TestSuite() - suite_obj.addTest(unittest.makeSuite(TestAlter)) - return suite_obj - - if __name__ == "__main__": unittest.main() diff --git a/tests/test_async_client.py b/tests/test_async_client.py index 3c75bdf..b4001f4 100644 --- a/tests/test_async_client.py +++ b/tests/test_async_client.py @@ -1,54 +1,25 @@ # SPDX-FileCopyrightText: © Hypermode Inc. # SPDX-License-Identifier: Apache-2.0 -"""Integration tests for async client.""" +"""Integration tests for async client. + +Note: async_client fixture is defined in conftest.py +""" import asyncio import json import os -from collections.abc import AsyncGenerator import pytest import pydgraph -from pydgraph import AsyncDgraphClient, AsyncDgraphClientStub, async_open +from pydgraph import AsyncDgraphClient, async_open from pydgraph.proto import api_pb2 as api -# Get test server address from environment +# Get test server address from environment (also defined in conftest.py) TEST_SERVER_ADDR = os.getenv("TEST_SERVER_ADDR", "localhost:9180") -@pytest.fixture -async def async_client() -> AsyncGenerator[AsyncDgraphClient, None]: - """Fixture providing an async client with login.""" - client_stub = AsyncDgraphClientStub(TEST_SERVER_ADDR) - client = AsyncDgraphClient(client_stub) - - # Retry login until server is ready - max_retries = 30 - for _ in range(max_retries): - try: - await client.login("groot", "password") - break - except Exception as e: - if "user not found" in str(e): - # User not found means auth is working but user doesn't exist yet - # This shouldn't happen with groot, so treat as error - raise - # Server might not be ready, wait and retry - await asyncio.sleep(0.1) - - yield client - await client.close() - - -@pytest.fixture -async def async_client_clean(async_client: AsyncDgraphClient) -> AsyncDgraphClient: - """Fixture providing an async client with clean database.""" - await async_client.alter(pydgraph.Operation(drop_all=True)) - return async_client - - class TestAsyncClient: """Test suite for async client basic operations.""" @@ -72,15 +43,16 @@ async def test_alter_schema(self, async_client: AsyncDgraphClient) -> None: assert response is not None @pytest.mark.asyncio - async def test_mutation_and_query(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_mutation_and_query(self, async_client: AsyncDgraphClient) -> None: """Test async mutation and query operations.""" # Set schema - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Mutation with commit_now - txn = async_client_clean.txn() + txn = async_client.txn() mutation = pydgraph.Mutation(commit_now=True) response = await txn.mutate( mutation=mutation, set_nquads='<_:alice> "Alice" .' @@ -94,21 +66,22 @@ async def test_mutation_and_query(self, async_client_clean: AsyncDgraphClient) - } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 1 assert result["me"][0]["name"] == "Alice" @pytest.mark.asyncio - async def test_mutation_with_json(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_mutation_with_json(self, async_client: AsyncDgraphClient) -> None: """Test mutation with JSON object.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Mutation with set_obj - txn = async_client_clean.txn() + txn = async_client.txn() response = await txn.mutate(set_obj={"name": "Bob"}, commit_now=True) assert len(response.uids) == 1 @@ -125,21 +98,22 @@ async def test_mutation_with_json(self, async_client_clean: AsyncDgraphClient) - }} }}""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 1 assert result["me"][0]["name"] == "Bob" @pytest.mark.asyncio - async def test_transaction_commit(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_transaction_commit(self, async_client: AsyncDgraphClient) -> None: """Test explicit transaction commit.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Mutation without commit_now - txn = async_client_clean.txn() + txn = async_client.txn() response = await txn.mutate(set_obj={"name": "Charlie"}) assert len(response.uids) == 1 @@ -153,21 +127,22 @@ async def test_transaction_commit(self, async_client_clean: AsyncDgraphClient) - } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 1 assert result["me"][0]["name"] == "Charlie" @pytest.mark.asyncio - async def test_transaction_discard(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_transaction_discard(self, async_client: AsyncDgraphClient) -> None: """Test transaction discard.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Mutation without commit - txn = async_client_clean.txn() + txn = async_client.txn() await txn.mutate(set_obj={"name": "David"}) # Discard @@ -180,20 +155,21 @@ async def test_transaction_discard(self, async_client_clean: AsyncDgraphClient) } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result.get("me", [])) == 0 @pytest.mark.asyncio - async def test_read_only_transaction(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_read_only_transaction(self, async_client: AsyncDgraphClient) -> None: """Test read-only transactions.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Add some data - txn = async_client_clean.txn() + txn = async_client.txn() await txn.mutate(set_obj={"name": "Eve"}, commit_now=True) # Read-only query @@ -203,7 +179,7 @@ async def test_read_only_transaction(self, async_client_clean: AsyncDgraphClient } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 1 @@ -214,14 +190,15 @@ async def test_read_only_transaction(self, async_client_clean: AsyncDgraphClient await txn.mutate(set_obj={"name": "Frank"}) @pytest.mark.asyncio - async def test_query_with_variables(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_query_with_variables(self, async_client: AsyncDgraphClient) -> None: """Test query with variables.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Add data - txn = async_client_clean.txn() + txn = async_client.txn() await txn.mutate(set_obj={"name": "Grace"}, commit_now=True) # Query with variables @@ -231,7 +208,7 @@ async def test_query_with_variables(self, async_client_clean: AsyncDgraphClient) } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query, variables={"$name": "Grace"}) result = json.loads(response.json) assert len(result["me"]) == 1 @@ -252,14 +229,15 @@ async def test_client_context_manager(self) -> None: # Client should be closed automatically @pytest.mark.asyncio - async def test_transaction_context_manager(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_transaction_context_manager(self, async_client: AsyncDgraphClient) -> None: """Test async transaction context manager.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Use transaction as context manager - async with async_client_clean.txn() as txn: + async with async_client.txn() as txn: await txn.mutate(set_obj={"name": "Henry"}, commit_now=True) # Transaction should be discarded automatically @@ -270,7 +248,7 @@ async def test_transaction_context_manager(self, async_client_clean: AsyncDgraph } }""" - async with async_client_clean.txn(read_only=True) as txn: + async with async_client.txn(read_only=True) as txn: response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 1 @@ -281,14 +259,15 @@ class TestAsyncConcurrent: """Test suite for concurrent async operations.""" @pytest.mark.asyncio - async def test_concurrent_queries(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_concurrent_queries(self, async_client: AsyncDgraphClient) -> None: """Test multiple concurrent queries.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Add some data - txn = async_client_clean.txn() + txn = async_client.txn() await txn.mutate(set_obj={"name": "Concurrent Test"}, commit_now=True) # Run multiple queries concurrently @@ -299,7 +278,7 @@ async def test_concurrent_queries(self, async_client_clean: AsyncDgraphClient) - }""" async def run_query() -> api.Response: - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) return await txn.query(query) # Run 10 queries concurrently @@ -314,15 +293,16 @@ async def run_query() -> api.Response: assert result["me"][0]["name"] == "Concurrent Test" @pytest.mark.asyncio - async def test_concurrent_mutations(self, async_client_clean: AsyncDgraphClient) -> None: + async def test_concurrent_mutations(self, async_client: AsyncDgraphClient) -> None: """Test multiple concurrent mutations.""" - await async_client_clean.alter( + await async_client.alter(pydgraph.Operation(drop_all=True)) + await async_client.alter( pydgraph.Operation(schema="name: string @index(term) .") ) # Run multiple mutations concurrently async def run_mutation(name: str) -> api.Response: - txn = async_client_clean.txn() + txn = async_client.txn() return await txn.mutate(set_obj={"name": name}, commit_now=True) # Run 5 mutations concurrently @@ -342,7 +322,7 @@ async def run_mutation(name: str) -> api.Response: } }""" - txn = async_client_clean.txn(read_only=True) + txn = async_client.txn(read_only=True) response = await txn.query(query) result = json.loads(response.json) assert len(result["me"]) == 5 diff --git a/tests/test_benchmark_async.py b/tests/test_benchmark_async.py new file mode 100644 index 0000000..6144541 --- /dev/null +++ b/tests/test_benchmark_async.py @@ -0,0 +1,456 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Targeted benchmark tests for async client operations. + +These benchmarks measure individual async operations in isolation to help +identify the root cause of performance regressions in stress tests. + +Usage: + # Run all async benchmarks + pytest tests/test_benchmark_async.py -v + + # Compare against previous run + pytest tests/test_benchmark_async.py --benchmark-compare +""" + +from __future__ import annotations + +import asyncio +from collections.abc import Generator +from typing import TYPE_CHECKING + +import pytest + +import pydgraph +from pydgraph import AsyncDgraphClient, AsyncDgraphClientStub, run_transaction_async +from pydgraph.proto import api_pb2 as api + +from .helpers import TEST_SERVER_ADDR, generate_movie + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture + + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: + """Dedicated event loop for async benchmark tests.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + + +@pytest.fixture +def benchmark_client( + event_loop: asyncio.AbstractEventLoop, + movies_schema: str, +) -> Generator[AsyncDgraphClient, None, None]: + """Async client with schema for benchmark tests.""" + loop = event_loop + + async def setup() -> AsyncDgraphClient: + client_stub = AsyncDgraphClientStub(TEST_SERVER_ADDR) + client = AsyncDgraphClient(client_stub) + for _ in range(30): + try: + await client.login("groot", "password") + break + except Exception as e: + if "user not found" in str(e): + raise + await asyncio.sleep(0.1) + await client.alter(pydgraph.Operation(drop_all=True)) + await client.alter(pydgraph.Operation(schema=movies_schema)) + return client + + client = loop.run_until_complete(setup()) + yield client + loop.run_until_complete(client.close()) + + +# ============================================================================= +# Query Benchmarks +# ============================================================================= + + +class TestAsyncQueryBenchmarks: + """Benchmarks for async query operations.""" + + def test_benchmark_query_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a simple async read query.""" + loop = event_loop + client = benchmark_client + + # Setup: seed data outside benchmark + async def setup() -> None: + txn = client.txn() + await txn.mutate(set_obj=generate_movie(0), commit_now=True) + + loop.run_until_complete(setup()) + + query = """query { + people(func: has(name), first: 1) { + name + email + tagline + } + }""" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True) + return await txn.query(query) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) + + def test_benchmark_query_with_vars_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark an async query with variables.""" + loop = event_loop + client = benchmark_client + + # Setup + async def setup() -> None: + txn = client.txn() + await txn.mutate( + set_obj={"name": "BenchmarkUser", "email": "bench@test.com"}, + commit_now=True, + ) + + loop.run_until_complete(setup()) + + query = """query people($name: string) { + people(func: eq(name, $name)) { + name + email + } + }""" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True) + return await txn.query(query, variables={"$name": "BenchmarkUser"}) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) + + def test_benchmark_query_best_effort_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a best-effort async read query.""" + loop = event_loop + client = benchmark_client + + # Setup + async def setup() -> None: + txn = client.txn() + await txn.mutate(set_obj=generate_movie(0), commit_now=True) + + loop.run_until_complete(setup()) + + query = "{ people(func: has(name), first: 1) { name } }" + + async def run_query() -> api.Response: + txn = client.txn(read_only=True, best_effort=True) + return await txn.query(query) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_query()) + + benchmark(benchmark_wrapper) + + +# ============================================================================= +# Mutation Benchmarks +# ============================================================================= + + +class TestAsyncMutationBenchmarks: + """Benchmarks for async mutation operations.""" + + def test_benchmark_mutation_commit_now_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation with commit_now.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + return await txn.mutate(set_obj=generate_movie(counter[0]), commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_mutation_explicit_commit_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation with explicit commit.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_mutation() -> api.TxnContext | None: + counter[0] += 1 + txn = client.txn() + await txn.mutate(set_obj=generate_movie(counter[0])) + return await txn.commit() + + def benchmark_wrapper() -> api.TxnContext | None: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_discard_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async mutation followed by discard.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_mutation() -> None: + counter[0] += 1 + txn = client.txn() + await txn.mutate(set_obj=generate_movie(counter[0])) + await txn.discard() + + def benchmark_wrapper() -> None: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_mutation_nquads_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async N-Quads mutation.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + nquads = f""" + _:person "Movie_{counter[0]}" . + _:person "movie{counter[0]}@test.com" . + _:person "A test movie number {counter[0]}" . + """ + return await txn.mutate(set_nquads=nquads, commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_mutation()) + + benchmark(benchmark_wrapper) + + def test_benchmark_delete_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async delete mutation.""" + loop = event_loop + client = benchmark_client + + # Pre-create nodes to delete + async def setup() -> list[str]: + uids = [] + for i in range(100): + txn = client.txn() + resp = await txn.mutate(set_obj=generate_movie(i), commit_now=True) + uids.append(next(iter(resp.uids.values()))) + return uids + + uids = loop.run_until_complete(setup()) + uid_index = [0] + + async def run_delete() -> api.Response: + idx = uid_index[0] % len(uids) + uid_index[0] += 1 + txn = client.txn() + return await txn.mutate(del_obj={"uid": uids[idx]}, commit_now=True) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_delete()) + + benchmark(benchmark_wrapper) + + +# ============================================================================= +# Transaction Benchmarks +# ============================================================================= + + +class TestAsyncTransactionBenchmarks: + """Benchmarks for advanced async transaction operations.""" + + def test_benchmark_upsert_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async upsert operation.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_upsert() -> api.Response: + counter[0] += 1 + email = f"upsert{counter[0]}@test.com" + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f""" + uid(u) "{email}" . + uid(u) "Upsert_{counter[0]}" . + """.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + return await txn.do_request(request) + + def benchmark_wrapper() -> api.Response: + return loop.run_until_complete(run_upsert()) + + benchmark(benchmark_wrapper) + + def test_benchmark_batch_mutations_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark multiple async mutations in one transaction.""" + loop = event_loop + client = benchmark_client + counter = [0] + batch_size = 10 + + async def run_batch() -> api.TxnContext | None: + txn = client.txn() + for _ in range(batch_size): + counter[0] += 1 + await txn.mutate(set_obj=generate_movie(counter[0])) + return await txn.commit() + + def benchmark_wrapper() -> api.TxnContext | None: + return loop.run_until_complete(run_batch()) + + benchmark(benchmark_wrapper) + + def test_benchmark_run_transaction_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark run_transaction_async helper overhead.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def txn_func(txn: pydgraph.AsyncTxn) -> str: + counter[0] += 1 + response = await txn.mutate( + set_obj=generate_movie(counter[0]), commit_now=True + ) + return next(iter(response.uids.values()), "") + + async def run_with_helper() -> str: + return await run_transaction_async(client, txn_func) + + def benchmark_wrapper() -> str: + return loop.run_until_complete(run_with_helper()) + + benchmark(benchmark_wrapper) + + +# ============================================================================= +# Client Benchmarks +# ============================================================================= + + +class TestAsyncClientBenchmarks: + """Benchmarks for async client-level operations.""" + + def test_benchmark_check_version_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async check_version.""" + loop = event_loop + client = benchmark_client + + async def run_check() -> str: + return await client.check_version() + + def benchmark_wrapper() -> str: + return loop.run_until_complete(run_check()) + + benchmark(benchmark_wrapper) + + def test_benchmark_alter_schema_async( + self, + event_loop: asyncio.AbstractEventLoop, + benchmark_client: AsyncDgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark async schema alter operation.""" + loop = event_loop + client = benchmark_client + counter = [0] + + async def run_alter() -> api.Payload: + counter[0] += 1 + schema = f"benchmark_pred_{counter[0]}: string @index(exact) ." + return await client.alter(pydgraph.Operation(schema=schema)) + + def benchmark_wrapper() -> api.Payload: + return loop.run_until_complete(run_alter()) + + benchmark(benchmark_wrapper) diff --git a/tests/test_benchmark_sync.py b/tests/test_benchmark_sync.py new file mode 100644 index 0000000..1374d5e --- /dev/null +++ b/tests/test_benchmark_sync.py @@ -0,0 +1,348 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Targeted benchmark tests for sync client operations. + +These benchmarks measure individual operations in isolation to help +identify the root cause of performance regressions in stress tests. + +Usage: + # Run all sync benchmarks + pytest tests/test_benchmark_sync.py -v + + # Compare against previous run + pytest tests/test_benchmark_sync.py --benchmark-compare +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pydgraph +from pydgraph import DgraphClient, run_transaction +from pydgraph.proto import api_pb2 as api + +from .helpers import generate_movie + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture + +import pytest + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture +def stress_client( + sync_client: DgraphClient, movies_schema: str +) -> DgraphClient: + """Sync client with movies test schema for benchmark tests.""" + sync_client.alter(pydgraph.Operation(drop_all=True)) + sync_client.alter(pydgraph.Operation(schema=movies_schema)) + return sync_client + + +# ============================================================================= +# Query Benchmarks +# ============================================================================= + + +class TestSyncQueryBenchmarks: + """Benchmarks for sync query operations.""" + + def test_benchmark_query_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a simple read query.""" + client = stress_client + + # Setup: seed data outside benchmark + txn = client.txn() + txn.mutate(set_obj=generate_movie(0), commit_now=True) + + query = """query { + people(func: has(name), first: 1) { + name + email + tagline + } + }""" + + def run_query() -> api.Response: + txn = client.txn(read_only=True) + return txn.query(query) + + benchmark(run_query) + + def test_benchmark_query_with_vars_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a query with variables.""" + client = stress_client + + # Setup: seed data + txn = client.txn() + txn.mutate( + set_obj={"name": "BenchmarkUser", "email": "bench@test.com"}, + commit_now=True, + ) + + query = """query people($name: string) { + people(func: eq(name, $name)) { + name + email + } + }""" + + def run_query() -> api.Response: + txn = client.txn(read_only=True) + return txn.query(query, variables={"$name": "BenchmarkUser"}) + + benchmark(run_query) + + def test_benchmark_query_best_effort_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark a best-effort read query.""" + client = stress_client + + # Setup: seed data + txn = client.txn() + txn.mutate(set_obj=generate_movie(0), commit_now=True) + + query = "{ people(func: has(name), first: 1) { name } }" + + def run_query() -> api.Response: + txn = client.txn(read_only=True, best_effort=True) + return txn.query(query) + + benchmark(run_query) + + +# ============================================================================= +# Mutation Benchmarks +# ============================================================================= + + +class TestSyncMutationBenchmarks: + """Benchmarks for sync mutation operations.""" + + def test_benchmark_mutation_commit_now_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation with commit_now (single round-trip).""" + client = stress_client + counter = [0] + + def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + return txn.mutate(set_obj=generate_movie(counter[0]), commit_now=True) + + benchmark(run_mutation) + + def test_benchmark_mutation_explicit_commit_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation with explicit commit (two round-trips).""" + client = stress_client + counter = [0] + + def run_mutation() -> api.TxnContext | None: + counter[0] += 1 + txn = client.txn() + txn.mutate(set_obj=generate_movie(counter[0])) + return txn.commit() + + benchmark(run_mutation) + + def test_benchmark_discard_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark mutation followed by discard (rollback cost).""" + client = stress_client + counter = [0] + + def run_mutation() -> None: + counter[0] += 1 + txn = client.txn() + txn.mutate(set_obj=generate_movie(counter[0])) + txn.discard() + + benchmark(run_mutation) + + def test_benchmark_mutation_nquads_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark N-Quads mutation format.""" + client = stress_client + counter = [0] + + def run_mutation() -> api.Response: + counter[0] += 1 + txn = client.txn() + nquads = f""" + _:person "Movie_{counter[0]}" . + _:person "movie{counter[0]}@test.com" . + _:person "A test movie number {counter[0]}" . + """ + return txn.mutate(set_nquads=nquads, commit_now=True) + + benchmark(run_mutation) + + def test_benchmark_delete_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark delete mutation.""" + client = stress_client + + # Pre-create nodes to delete + uids: list[str] = [] + for i in range(100): + txn = client.txn() + resp = txn.mutate(set_obj=generate_movie(i), commit_now=True) + uids.append(next(iter(resp.uids.values()))) + + uid_index = [0] + + def run_delete() -> api.Response: + idx = uid_index[0] % len(uids) + uid_index[0] += 1 + txn = client.txn() + return txn.mutate(del_obj={"uid": uids[idx]}, commit_now=True) + + benchmark(run_delete) + + +# ============================================================================= +# Transaction Benchmarks +# ============================================================================= + + +class TestSyncTransactionBenchmarks: + """Benchmarks for advanced sync transaction operations.""" + + def test_benchmark_upsert_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark upsert operation (query + conditional mutation).""" + client = stress_client + counter = [0] + + def run_upsert() -> api.Response: + counter[0] += 1 + email = f"upsert{counter[0]}@test.com" + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f""" + uid(u) "{email}" . + uid(u) "Upsert_{counter[0]}" . + """.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + return txn.do_request(request) + + benchmark(run_upsert) + + def test_benchmark_batch_mutations_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark multiple mutations in one transaction.""" + client = stress_client + counter = [0] + batch_size = 10 + + def run_batch() -> api.TxnContext | None: + txn = client.txn() + for _ in range(batch_size): + counter[0] += 1 + txn.mutate(set_obj=generate_movie(counter[0])) + return txn.commit() + + benchmark(run_batch) + + def test_benchmark_run_transaction_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark run_transaction helper overhead.""" + client = stress_client + counter = [0] + + def txn_func(txn: pydgraph.Txn) -> str: + counter[0] += 1 + response = txn.mutate(set_obj=generate_movie(counter[0]), commit_now=True) + return next(iter(response.uids.values()), "") + + def run_with_helper() -> str: + return run_transaction(client, txn_func) + + benchmark(run_with_helper) + + +# ============================================================================= +# Client Benchmarks +# ============================================================================= + + +class TestSyncClientBenchmarks: + """Benchmarks for sync client-level operations.""" + + def test_benchmark_check_version_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark check_version (health check).""" + client = stress_client + + def run_check() -> str: + return client.check_version() + + benchmark(run_check) + + def test_benchmark_alter_schema_sync( + self, + stress_client: DgraphClient, + benchmark: BenchmarkFixture, + ) -> None: + """Benchmark schema alter operation.""" + client = stress_client + counter = [0] + + def run_alter() -> api.Payload: + counter[0] += 1 + # Add a new predicate each time to avoid conflicts + schema = f"benchmark_pred_{counter[0]}: string @index(exact) ." + return client.alter(pydgraph.Operation(schema=schema)) + + benchmark(run_alter) diff --git a/tests/test_client.py b/tests/test_client.py index 07949b4..98e67d3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -23,13 +23,5 @@ def test_constructor(self) -> None: pydgraph.DgraphClient() -def suite() -> unittest.TestSuite: - """Returns a tests suite object.""" - suite_obj = unittest.TestSuite() - suite_obj.addTest(unittest.makeSuite(TestDgraphClient)) - return suite_obj - - if __name__ == "__main__": - runner = unittest.TextTestRunner() - runner.run(suite()) + unittest.main() diff --git a/tests/test_retry.py b/tests/test_retry.py index b558005..dc57f39 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -115,6 +115,7 @@ def test_raise_class_directly(self) -> None: assert str(restored) == "Transaction has been aborted. Please retry" + class TestRetryGenerator(unittest.TestCase): """Tests for sync retry generator.""" @@ -476,23 +477,5 @@ def operation(txn: Any) -> None: ) -def suite() -> unittest.TestSuite: - """Returns a test suite object.""" - suite_obj = unittest.TestSuite() - suite_obj.addTest(unittest.makeSuite(TestCalculateDelay)) - suite_obj.addTest(unittest.makeSuite(TestIsRetriable)) - suite_obj.addTest(unittest.makeSuite(TestRetryGenerator)) - suite_obj.addTest(unittest.makeSuite(TestRetryAsyncGenerator)) - suite_obj.addTest(unittest.makeSuite(TestWithRetryDecorator)) - suite_obj.addTest(unittest.makeSuite(TestWithRetryAsyncDecorator)) - suite_obj.addTest(unittest.makeSuite(TestRetryImports)) - suite_obj.addTest(unittest.makeSuite(TestRetryWithRetriableError)) - suite_obj.addTest(unittest.makeSuite(TestRetryExponentialBackoff)) - suite_obj.addTest(unittest.makeSuite(TestParameterValidation)) - suite_obj.addTest(unittest.makeSuite(TestRunTransaction)) - return suite_obj - - if __name__ == "__main__": - runner = unittest.TextTestRunner() - runner.run(suite()) + unittest.main() diff --git a/tests/test_stress_async.py b/tests/test_stress_async.py new file mode 100644 index 0000000..388be49 --- /dev/null +++ b/tests/test_stress_async.py @@ -0,0 +1,705 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Async client stress tests. + +These tests stress test the asynchronous pydgraph client using pure asyncio +concurrency patterns (asyncio.gather, asyncio.create_task). No concurrent.futures +mixing - all concurrency is handled by the asyncio event loop. + +Usage: + # Quick mode (default, CI-friendly) + pytest tests/test_stress_async.py -v + + # Full mode (thorough stress testing) + STRESS_TEST_MODE=full pytest tests/test_stress_async.py -v +""" + +from __future__ import annotations + +import asyncio +from typing import TYPE_CHECKING, Any + +import pytest + +import pydgraph +from pydgraph import ( + AsyncDgraphClient, + AsyncDgraphClientStub, + errors, + retry_async, + run_transaction_async, + with_retry_async, +) +from pydgraph.proto import api_pb2 as api + +from .helpers import TEST_SERVER_ADDR, generate_movie + +if TYPE_CHECKING: + from collections.abc import Generator + + from pytest_benchmark.fixture import BenchmarkFixture + +# ============================================================================= +# Fixtures +# ============================================================================= + + +@pytest.fixture(scope="module") +def benchmark_event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]: + """Module-scoped event loop for async stress and benchmark tests.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + yield loop + loop.close() + + +@pytest.fixture(scope="module") +def stress_client( + benchmark_event_loop: asyncio.AbstractEventLoop, + movies_schema: str, + movies_data_loaded: bool, +) -> Generator[AsyncDgraphClient, None, None]: + """Module-scoped async client with movies test schema for stress tests.""" + loop = benchmark_event_loop + + async def setup() -> AsyncDgraphClient: + client_stub = AsyncDgraphClientStub(TEST_SERVER_ADDR) + client = AsyncDgraphClient(client_stub) + for _ in range(30): + try: + await client.login("groot", "password") + break + except Exception as e: + if "user not found" in str(e): + raise + await asyncio.sleep(0.1) + if not movies_data_loaded: + await client.alter(pydgraph.Operation(drop_all=True)) + await client.alter(pydgraph.Operation(schema=movies_schema)) + return client + + client = loop.run_until_complete(setup()) + yield client + loop.run_until_complete(client.close()) + + +# ============================================================================= +# Async Client Stress Tests +# ============================================================================= + + +class TestAsyncClientStress: + """Stress tests for asynchronous Dgraph client using pure asyncio.""" + + def test_concurrent_read_queries_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test many concurrent read-only queries using asyncio.gather.""" + loop = benchmark_event_loop + client = stress_client + num_ops = stress_config["ops"] + rounds = stress_config["rounds"] + + query = """query { + people(func: has(name), first: 10) { + name + email + tagline + } + }""" + + # Setup: Insert test data once before benchmarking (using same loop) + async def setup_data() -> None: + txn = client.txn() + for i in range(100): + await txn.mutate(set_obj=generate_movie(i)) + await txn.commit() + + loop.run_until_complete(setup_data()) + + async def run_query() -> api.Response: + txn = client.txn(read_only=True) + return await txn.query(query) + + # Wrap async execution in sync function for benchmark (using same loop) + def run_benchmark() -> list[api.Response | BaseException]: + all_results: list[api.Response | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[run_query() for _ in range(num_ops)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + # Stress tests use pedantic(rounds=1) because the stress workload is + # already controlled by stress_config["rounds"] inside run_benchmark(). + # Letting pytest-benchmark repeat the whole concurrent batch would + # compound iterations and overwhelm the Dgraph cluster. + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + assert len(exc_list) == 0, f"Got {len(exc_list)} errors: {exc_list[:5]}" + assert len(results) == num_ops * rounds + + def test_concurrent_mutations_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test concurrent mutations in separate transactions using asyncio.gather.""" + loop = benchmark_event_loop + client = stress_client + num_ops = stress_config["workers"] * 10 + rounds = stress_config["rounds"] + counter = [0] + + async def run_mutation() -> bool: + counter[0] += 1 + try: + txn = client.txn() + await txn.mutate(set_obj=generate_movie(counter[0]), commit_now=True) + except errors.AbortedError: + return False + else: + return True + + def run_benchmark() -> list[bool | BaseException]: + all_results: list[bool | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[run_mutation() for _ in range(num_ops)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + successes = sum(1 for r in results if r is True) + + assert len(exc_list) == 0, f"Unexpected errors: {exc_list[:5]}" + assert successes > num_ops * rounds * 0.5, f"Too few successes: {successes}/{num_ops * rounds}" + + def test_mixed_workload_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test mix of queries, mutations, commits, and discards concurrently.""" + loop = benchmark_event_loop + client = stress_client + num_ops = stress_config["workers"] * 20 + rounds = stress_config["rounds"] + counter = [0] + + # Setup: Seed some data once before benchmarking (using same loop) + async def setup_data() -> None: + txn = client.txn() + for i in range(50): + await txn.mutate(set_obj=generate_movie(i)) + await txn.commit() + + loop.run_until_complete(setup_data()) + + async def random_operation(op_id: int) -> str: + counter[0] += 1 + unique_id = counter[0] + op_type = op_id % 4 + result = "unknown" + try: + if op_type == 0: + # Read query + txn = client.txn(read_only=True) + await txn.query("{ q(func: has(name), first: 5) { name } }") + result = "query" + elif op_type == 1: + # Mutation with commit_now + txn = client.txn() + await txn.mutate(set_obj=generate_movie(unique_id), commit_now=True) + result = "mutation" + elif op_type == 2: + # Mutation with explicit commit + txn = client.txn() + await txn.mutate(set_obj=generate_movie(unique_id)) + await txn.commit() + result = "commit" + else: + # Mutation with discard + txn = client.txn() + await txn.mutate(set_obj=generate_movie(unique_id)) + await txn.discard() + result = "discard" + except errors.AbortedError: + return "aborted" + return result + + def run_benchmark() -> list[str | BaseException]: + all_results: list[str | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[random_operation(i) for i in range(num_ops)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + assert len(exc_list) == 0, f"Unexpected errors: {exc_list[:5]}" + + +# ============================================================================= +# Async Transaction Stress Tests +# ============================================================================= + + +class TestAsyncTransactionStress: + """Stress tests for async transaction conflict handling.""" + + def test_upsert_conflicts_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test concurrent upserts on the same key detect conflicts properly.""" + loop = benchmark_event_loop + client = stress_client + target_email = "async_conflict@test.com" + num_workers = stress_config["workers"] + rounds = stress_config["rounds"] + + async def run_upsert(worker_id: int) -> str: + try: + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{target_email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f""" + uid(u) "{target_email}" . + uid(u) "AsyncWorker_{worker_id}" . + uid(u) "AsyncWorker {worker_id} tagline" . + """.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + await txn.do_request(request) + except errors.AbortedError: + return "aborted" + else: + return "success" + + def run_benchmark() -> list[str | BaseException]: + all_results: list[str | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[run_upsert(i) for i in range(num_workers)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + successes = sum(1 for r in results if r == "success") + + assert len(exc_list) == 0, f"Unexpected errors: {exc_list}" + assert successes >= rounds, "Too few upserts succeeded" + + def test_deadlock_regression_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + ) -> None: + """Regression test for PR #293 asyncio.Lock deadlock fix. + + Verifies that when do_request() encounters an error, the transaction + is properly cleaned up without causing deadlocks due to the non-reentrant + asyncio.Lock trying to be acquired twice. + """ + loop = benchmark_event_loop + client = stress_client + num_ops = stress_config["ops"] + + async def cause_error() -> None: + txn = client.txn() + try: + # Invalid query syntax causes error + await txn.query("{ invalid syntax") + except Exception: + pass # Expected + + async def run_all() -> None: + # If there's a deadlock, wait_for will timeout + try: + await asyncio.wait_for( + asyncio.gather(*[cause_error() for _ in range(num_ops)]), + timeout=30, + ) + except asyncio.TimeoutError: + pytest.fail("Deadlock detected - asyncio.Lock not released properly") + + loop.run_until_complete(run_all()) + + def test_lock_released_after_mutation_error_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that lock is released after mutation errors allowing reuse.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + # Create a transaction and force an error + txn = client.txn() + await txn.mutate(set_obj={"name": "Test"}) + + # Force cleanup by discarding + await txn.discard() + + # Create new transaction and verify it works + txn2 = client.txn() + response = await txn2.mutate(set_obj={"name": "Test2"}, commit_now=True) + assert len(response.uids) == 1 + + loop.run_until_complete(run_test()) + + def test_context_manager_cleanup_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that context managers properly clean up even on errors.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + async def use_txn_with_error() -> None: + async with client.txn() as txn: + await txn.mutate(set_obj={"name": "ContextTest"}) + raise ValueError("Intentional error") + + # Should not leave any locks held + for _ in range(2): + try: + await use_txn_with_error() + except ValueError: + pass # Expected + + # Verify client still works + async with client.txn() as txn: + response = await txn.query( + "{ q(func: has(name), first: 1) { name } }" + ) + assert response is not None + + loop.run_until_complete(run_test()) + + def test_rapid_txn_create_discard_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + ) -> None: + """Test rapidly creating and discarding transactions.""" + loop = benchmark_event_loop + client = stress_client + num_ops = stress_config["ops"] + + async def create_and_discard() -> None: + txn = client.txn() + await txn.discard() + + async def run_all() -> None: + # Pure asyncio concurrency + results = await asyncio.gather( + *[create_and_discard() for _ in range(num_ops)], + return_exceptions=True, + ) + exc_list = [r for r in results if isinstance(r, Exception)] + assert len(exc_list) == 0, ( + f"Errors during rapid txn lifecycle: {exc_list[:5]}" + ) + + loop.run_until_complete(run_all()) + + +# ============================================================================= +# Async Retry Stress Tests +# ============================================================================= + + +class TestAsyncRetryStress: + """Stress tests for async retry utilities.""" + + def test_retry_under_conflicts_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test retry_async() generator handles conflicts correctly under load.""" + loop = benchmark_event_loop + client = stress_client + num_workers = min(stress_config["workers"], 20) + rounds = stress_config["rounds"] + reps_per_worker = 2 + + async def retry_work(worker_id: int) -> int: + successes = 0 + for _ in range(reps_per_worker): + async for attempt in retry_async(): + with attempt: # Note: regular 'with', not 'async with' + txn = client.txn() + await txn.mutate( + set_obj=generate_movie(worker_id * 1000 + successes), + commit_now=True, + ) + successes += 1 + return successes + + def run_benchmark() -> list[int | BaseException]: + all_results: list[int | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[retry_work(i) for i in range(num_workers)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + total_successes = sum(r for r in results if isinstance(r, int)) + + assert len(exc_list) == 0, f"Errors: {exc_list[:5]}" + assert total_successes >= num_workers * reps_per_worker * rounds + + def test_with_retry_decorator_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test @with_retry_async decorator handles conflicts correctly.""" + loop = benchmark_event_loop + client = stress_client + num_workers = min(stress_config["workers"], 10) + rounds = stress_config["rounds"] + counter = [0] + + @with_retry_async() + async def create_person() -> str: + counter[0] += 1 + txn = client.txn() + response = await txn.mutate( + set_obj=generate_movie(counter[0]), + commit_now=True, + ) + return next(iter(response.uids.values()), "") + + def run_benchmark() -> list[str | BaseException]: + all_results: list[str | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[create_person() for _ in range(num_workers)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + successes = [r for r in results if isinstance(r, str) and r] + + assert len(exc_list) == 0, f"Errors: {exc_list[:5]}" + assert len(successes) == num_workers * rounds + + def test_run_transaction_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test run_transaction_async() helper handles conflicts correctly.""" + loop = benchmark_event_loop + client = stress_client + num_workers = min(stress_config["workers"], 10) + rounds = stress_config["rounds"] + counter = [0] + + async def work() -> str: + counter[0] += 1 + unique_id = counter[0] + + async def txn_func(txn: pydgraph.AsyncTxn) -> str: + response = await txn.mutate( + set_obj={ + "name": f"AsyncRunTxn_{unique_id}", + "tagline": f"AsyncWorker {unique_id} transaction", + }, + commit_now=True, + ) + return next(iter(response.uids.values()), "") + + return await run_transaction_async(client, txn_func) + + def run_benchmark() -> list[str | BaseException]: + all_results: list[str | BaseException] = [] + for _ in range(rounds): + batch = loop.run_until_complete( + asyncio.gather( + *[work() for _ in range(num_workers)], + return_exceptions=True, + ) + ) + all_results.extend(batch) + return all_results + + results = benchmark.pedantic( + run_benchmark, rounds=1, iterations=1, warmup_rounds=0 + ) + + exc_list = [r for r in results if isinstance(r, Exception)] + successes = [r for r in results if isinstance(r, str) and r] + + assert len(exc_list) == 0, f"Errors: {exc_list[:5]}" + assert len(successes) == num_workers * rounds + + +# ============================================================================= +# Async Transaction Edge Cases +# ============================================================================= + + +class TestAsyncTransactionEdgeCases: + """Tests for async transaction edge cases and error handling.""" + + def test_double_commit_error_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that double commit raises appropriate error.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + txn = client.txn() + await txn.mutate(set_obj={"name": "DoubleCommit"}) + await txn.commit() + + with pytest.raises(errors.TransactionError): + await txn.commit() + + loop.run_until_complete(run_test()) + + def test_use_after_commit_error_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that using transaction after commit raises error.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + txn = client.txn() + await txn.mutate(set_obj={"name": "UseAfterCommit"}, commit_now=True) + + with pytest.raises(errors.TransactionError): + await txn.query("{ q(func: has(name)) { name } }") + + loop.run_until_complete(run_test()) + + def test_read_only_mutation_error_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that mutations in read-only transaction raise error.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + txn = client.txn(read_only=True) + + with pytest.raises(errors.TransactionError): + await txn.mutate(set_obj={"name": "ReadOnlyMutation"}) + + loop.run_until_complete(run_test()) + + def test_best_effort_requires_read_only_async( + self, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that best_effort requires read_only=True.""" + client = stress_client + + with pytest.raises(ValueError): + client.txn(read_only=False, best_effort=True) + + def test_double_discard_is_safe_async( + self, + benchmark_event_loop: asyncio.AbstractEventLoop, + stress_client: AsyncDgraphClient, + ) -> None: + """Test that calling discard twice is safe for async transactions.""" + loop = benchmark_event_loop + client = stress_client + + async def run_test() -> None: + txn = client.txn() + await txn.mutate(set_obj={"name": "AsyncDoubleDiscard"}) + await txn.discard() + await txn.discard() # Should not raise + + loop.run_until_complete(run_test()) diff --git a/tests/test_stress_sync.py b/tests/test_stress_sync.py new file mode 100644 index 0000000..1ccde51 --- /dev/null +++ b/tests/test_stress_sync.py @@ -0,0 +1,474 @@ +# SPDX-FileCopyrightText: © 2017-2026 Istari Digital, Inc. +# SPDX-License-Identifier: Apache-2.0 + +"""Sync client stress tests. + +These tests stress test the synchronous pydgraph client by running concurrent +queries and mutations using ThreadPoolExecutor. + +Usage: + # Quick mode (default, CI-friendly) + pytest tests/test_stress_sync.py -v + + # Full mode (thorough stress testing) + STRESS_TEST_MODE=full pytest tests/test_stress_sync.py -v +""" + +from __future__ import annotations + +import json +from concurrent.futures import ThreadPoolExecutor, wait +from typing import TYPE_CHECKING, Any + +import pytest + +if TYPE_CHECKING: + from pytest_benchmark.fixture import BenchmarkFixture + +import pydgraph +from pydgraph import DgraphClient, errors, retry, run_transaction +from pydgraph.proto import api_pb2 as api + +from .helpers import generate_movie + +# ============================================================================= +# Fixtures +# ============================================================================= + +@pytest.fixture(scope="module") +def stress_client( + sync_client: DgraphClient, movies_schema: str, movies_data_loaded: bool +) -> DgraphClient: + """Module-scoped sync client with movies test schema for stress tests.""" + if not movies_data_loaded: + sync_client.alter(pydgraph.Operation(drop_all=True)) + sync_client.alter(pydgraph.Operation(schema=movies_schema)) + return sync_client + +# ============================================================================= +# Sync Client Stress Tests +# ============================================================================= + +class TestSyncClientStress: + """Stress tests for synchronous Dgraph client.""" + + def test_concurrent_read_queries_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test many concurrent read-only queries don't cause issues.""" + client = stress_client + num_ops = stress_config["ops"] + rounds = stress_config["rounds"] + + # Insert some test data first (outside benchmark) + txn = client.txn() + for i in range(100): + txn.mutate(set_obj=generate_movie(i)) + txn.commit() + + query = """query { + people(func: has(name), first: 10) { + name + email + tagline + } + }""" + + results: list[api.Response] = [] + exc_list: list[Exception] = [] + + def run_query() -> None: + try: + txn = client.txn(read_only=True) + response = txn.query(query) + results.append(response) + except Exception as e: + exc_list.append(e) + + def run_all_queries() -> int: + # Clear state at start of each benchmark iteration + results.clear() + exc_list.clear() + for _ in range(rounds): + futures = [executor.submit(run_query) for _ in range(num_ops)] + wait(futures) + return len(results) + + # Stress tests use pedantic(rounds=1) because the stress workload is + # already controlled by stress_config["rounds"] inside the callable. + # Letting pytest-benchmark repeat the whole concurrent batch would + # compound iterations and overwhelm the Dgraph cluster. + result_count = benchmark.pedantic( + run_all_queries, rounds=1, iterations=1, warmup_rounds=0 + ) + + assert result_count == num_ops * rounds + + def test_concurrent_mutations_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test concurrent mutations in separate transactions.""" + client = stress_client + num_ops = stress_config["workers"] * 10 + rounds = stress_config["rounds"] + counter = [0] + + success_count = 0 + exc_list: list[Exception] = [] + + def run_mutation() -> None: + nonlocal success_count + counter[0] += 1 + try: + txn = client.txn() + txn.mutate(set_obj=generate_movie(counter[0]), commit_now=True) + success_count += 1 + except errors.AbortedError: + pass # Expected conflict + except Exception as e: + exc_list.append(e) + + def run_all_mutations() -> int: + nonlocal success_count + # Clear state at start of each benchmark iteration + success_count = 0 + exc_list.clear() + for _ in range(rounds): + futures = [executor.submit(run_mutation) for _ in range(num_ops)] + wait(futures) + return success_count + + result_count = benchmark.pedantic( + run_all_mutations, rounds=1, iterations=1, warmup_rounds=0 + ) + + # Some AbortedErrors are expected + assert result_count > num_ops * rounds * 0.5 + + def test_mixed_workload_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test mix of queries, mutations, commits, and discards concurrently.""" + client = stress_client + num_ops = stress_config["workers"] * 20 + rounds = stress_config["rounds"] + counter = [0] + + # Setup: Seed some data once before benchmarking + txn = client.txn() + for i in range(50): + txn.mutate(set_obj=generate_movie(i)) + txn.commit() + + results: list[str] = [] + exc_list: list[Exception] = [] + + def random_operation(op_id: int) -> None: + counter[0] += 1 + unique_id = counter[0] + op_type = op_id % 4 + try: + if op_type == 0: + # Read query + txn = client.txn(read_only=True) + txn.query("{ q(func: has(name), first: 5) { name } }") + results.append("query") + elif op_type == 1: + # Mutation with commit_now + txn = client.txn() + txn.mutate(set_obj=generate_movie(unique_id), commit_now=True) + results.append("mutation") + elif op_type == 2: + # Mutation with explicit commit + txn = client.txn() + txn.mutate(set_obj=generate_movie(unique_id)) + txn.commit() + results.append("commit") + else: + # Mutation with discard + txn = client.txn() + txn.mutate(set_obj=generate_movie(unique_id)) + txn.discard() + results.append("discard") + except errors.AbortedError: + results.append("aborted") + except Exception as e: + exc_list.append(e) + + def run_all_operations() -> int: + # Clear state at start of each benchmark iteration + results.clear() + exc_list.clear() + for _ in range(rounds): + futures = [executor.submit(random_operation, i) for i in range(num_ops)] + wait(futures) + return len(results) + + result_count = benchmark.pedantic( + run_all_operations, rounds=1, iterations=1, warmup_rounds=0 + ) + + assert len(exc_list) == 0, f"Unexpected errors: {exc_list[:5]}" + assert result_count == num_ops * rounds + +class TestSyncTransactionStress: + """Stress tests for sync transaction conflict handling.""" + + def test_upsert_conflicts_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test concurrent upserts on the same key detect conflicts properly.""" + client = stress_client + target_email = "conflict@test.com" + num_workers = stress_config["workers"] + rounds = stress_config["rounds"] + + aborted_count = 0 + success_count = 0 + exc_list: list[Exception] = [] + + def run_upsert(worker_id: int) -> None: + nonlocal aborted_count, success_count + try: + txn = client.txn() + query = f'{{ u as var(func: eq(email, "{target_email}")) }}' + mutation = pydgraph.Mutation( + set_nquads=f""" + uid(u) "{target_email}" . + uid(u) "Worker_{worker_id}" . + uid(u) "Worker {worker_id} tagline" . + """.encode(), + cond="@if(eq(len(u), 0))", + ) + request = api.Request( + query=query, + mutations=[mutation], + commit_now=True, + ) + txn.do_request(request) + success_count += 1 + except errors.AbortedError: + aborted_count += 1 + except Exception as e: + exc_list.append(e) + + def run_all_upserts() -> int: + nonlocal aborted_count, success_count + # Clear state at start of each benchmark iteration + aborted_count = 0 + success_count = 0 + exc_list.clear() + for _ in range(rounds): + futures = [executor.submit(run_upsert, i) for i in range(num_workers)] + wait(futures) + return success_count + + result_count = benchmark.pedantic( + run_all_upserts, rounds=1, iterations=1, warmup_rounds=0 + ) + + assert result_count >= rounds, "Too few upserts succeeded" + + def test_transaction_isolation_sync( # noqa: C901 + self, + stress_client: DgraphClient, + stress_config: dict[str, Any], + ) -> None: + """Test that transactions provide proper isolation.""" + client = stress_client + workers = min(stress_config["workers"], 20) + + # Insert initial data with a counter stored in tagline + txn = client.txn() + response = txn.mutate( + set_obj={"name": "IsolationTest", "tagline": "counter:100"}, commit_now=True + ) + uid = next(iter(response.uids.values())) + + results: list[int] = [] + exc_list: list[Exception] = [] + + def read_counter() -> None: + try: + txn = client.txn(read_only=True) + query = f'{{ node(func: uid("{uid}")) {{ tagline }} }}' + response = txn.query(query) + data = json.loads(response.json) + if data.get("node"): + tagline = data["node"][0]["tagline"] + counter = int(tagline.split(":")[1]) + results.append(counter) + except Exception as e: + exc_list.append(e) + + def update_counter(delta: int) -> None: + try: + txn = client.txn() + query = f'{{ node(func: uid("{uid}")) {{ tagline }} }}' + response = txn.query(query) + data = json.loads(response.json) + if data.get("node"): + tagline = data["node"][0]["tagline"] + current = int(tagline.split(":")[1]) + txn.mutate( + set_obj={"uid": uid, "tagline": f"counter:{current + delta}"}, + commit_now=True, + ) + except errors.AbortedError: + pass # Expected + except Exception as e: + exc_list.append(e) + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [] + for i in range(100): + if i % 3 == 0: + futures.append(executor.submit(update_counter, 1)) + else: + futures.append(executor.submit(read_counter)) + wait(futures) + + assert len(exc_list) == 0, f"Unexpected errors: {exc_list}" + for counter in results: + assert isinstance(counter, int) + assert counter >= 100 + +class TestSyncRetryStress: + """Stress tests for sync retry utilities.""" + + def test_retry_under_conflicts_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test retry() generator handles conflicts correctly under load.""" + num_workers = min(stress_config["workers"], 10) + rounds = stress_config["rounds"] + + total_successes = 0 + all_errors: list[str] = [] + + def retry_work() -> None: + nonlocal total_successes + for attempt in retry(): + with attempt: + txn = stress_client.txn() + txn.mutate( + set_obj=generate_movie(total_successes), + commit_now=True, + ) + total_successes += 1 + + def run_all_retry_work() -> int: + nonlocal total_successes + # Clear state at start of each benchmark iteration + total_successes = 0 + all_errors.clear() + for _ in range(rounds): + futures = [executor.submit(retry_work) for _ in range(num_workers)] + wait(futures) + # Check for exceptions + for f in futures: + try: + f.result() + except Exception as e: + all_errors.append(str(e)) + return total_successes + + result_count = benchmark.pedantic( + run_all_retry_work, rounds=1, iterations=1, warmup_rounds=0 + ) + + assert result_count >= num_workers * rounds + + def test_run_transaction_sync( + self, + stress_client: DgraphClient, + executor: ThreadPoolExecutor, + stress_config: dict[str, Any], + benchmark: BenchmarkFixture, + ) -> None: + """Test run_transaction() helper handles conflicts correctly.""" + num_workers = min(stress_config["workers"], 10) + rounds = stress_config["rounds"] + counter = [0] + + results: list[str] = [] + exc_list: list[Exception] = [] + + def work() -> None: + counter[0] += 1 + unique_id = counter[0] + try: + + def txn_func(txn: pydgraph.Txn) -> str: + response = txn.mutate( + set_obj={ + "name": f"RunTxn_{unique_id}", + "tagline": f"Worker {unique_id} transaction", + }, + commit_now=True, + ) + return next(iter(response.uids.values()), "") + + uid = run_transaction(stress_client, txn_func) + results.append(uid) + except Exception as e: + exc_list.append(e) + + def run_all_transactions() -> int: + # Clear state at start of each benchmark iteration + results.clear() + exc_list.clear() + for _ in range(rounds): + futures = [executor.submit(work) for _ in range(num_workers)] + wait(futures) + return len(results) + + result_count = benchmark.pedantic( + run_all_transactions, rounds=1, iterations=1, warmup_rounds=0 + ) + + assert result_count == num_workers * rounds + +class TestSyncDeadlockPrevention: + """Tests for deadlock prevention in sync client.""" + + def test_no_deadlock_on_error_sync( + self, + stress_client: DgraphClient, + stress_config: dict[str, Any], + ) -> None: + """Test that errors don't cause deadlocks.""" + client = stress_client + workers = min(stress_config["workers"], 20) + + def cause_error() -> None: + txn = client.txn() + try: + txn.query("{ invalid syntax") + except Exception: + pass + + with ThreadPoolExecutor(max_workers=workers) as executor: + futures = [executor.submit(cause_error) for _ in range(100)] + _, not_done = wait(futures, timeout=30) # Ignore done set + assert len(not_done) == 0, "Possible deadlock detected" diff --git a/uv.lock b/uv.lock index 9b9b92f..4a01ea0 100644 --- a/uv.lock +++ b/uv.lock @@ -291,7 +291,7 @@ name = "importlib-metadata" version = "8.7.1" source = { registry = "https://pypi.org/simple" } dependencies = [ - { name = "zipp", marker = "python_full_version < '3.13'" }, + { name = "zipp" }, ] sdist = { url = "https://files.pythonhosted.org/packages/f3/49/3b30cad09e7771a4982d9975a8cbf64f00d4a1ececb53297f1d9a7be1b10/importlib_metadata-8.7.1.tar.gz", hash = "sha256:49fef1ae6440c182052f407c8d34a68f72efc36db9ca90dc0113398f2fdde8bb", size = 57107, upload-time = "2025-12-21T10:00:19.278Z" } wheels = [ @@ -597,6 +597,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0e/15/4f02896cc3df04fc465010a4c6a0cd89810f54617a32a70ef531ed75d61c/protobuf-6.33.2-py3-none-any.whl", hash = "sha256:7636aad9bb01768870266de5dc009de2d1b936771b38a793f73cbbf279c91c5c", size = 170501, upload-time = "2025-12-06T00:17:52.211Z" }, ] +[[package]] +name = "py-cpuinfo" +version = "9.0.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/37/a8/d832f7293ebb21690860d2e01d8115e5ff6f2ae8bbdc953f0eb0fa4bd2c7/py-cpuinfo-9.0.0.tar.gz", hash = "sha256:3cdbbf3fac90dc6f118bfd64384f309edeadd902d7c8fb17f02ffa1fc3f49690", size = 104716, upload-time = "2022-10-25T20:38:06.303Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335, upload-time = "2022-10-25T20:38:27.636Z" }, +] + [[package]] name = "pydantic" version = "2.12.5" @@ -790,10 +799,12 @@ dependencies = [ dev = [ { name = "build" }, { name = "grpcio-tools" }, + { name = "pygal" }, { name = "pytest", version = "8.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "pytest", version = "9.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, { name = "pytest-asyncio", version = "1.2.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, { name = "pytest-asyncio", version = "1.3.0", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, + { name = "pytest-benchmark" }, { name = "ruff" }, { name = "ty" }, ] @@ -821,8 +832,10 @@ requires-dist = [ { name = "grpcio", specifier = ">=1.65.0,<2.0.0" }, { name = "grpcio-tools", marker = "extra == 'dev'", specifier = ">=1.66.2" }, { name = "protobuf", specifier = ">=4.23.0,<7.0.0" }, + { name = "pygal", marker = "extra == 'dev'", specifier = ">=3.0.0" }, { name = "pytest", marker = "extra == 'dev'", specifier = ">=8.3.3" }, { name = "pytest-asyncio", marker = "extra == 'dev'", specifier = ">=0.23.0" }, + { name = "pytest-benchmark", marker = "extra == 'dev'", specifier = ">=4.0.0" }, { name = "ruff", marker = "extra == 'dev'", specifier = ">=0.8.4" }, { name = "ty", marker = "extra == 'dev'", specifier = ">=0.0.8" }, ] @@ -843,6 +856,18 @@ dev = [ { name = "types-requests", specifier = ">=2.32.0.20241016" }, ] +[[package]] +name = "pygal" +version = "3.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "importlib-metadata" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b0/b6/04176faeb312c84d7f9bc1a810f96ee38d15597e226bb9bda59f3a5cb122/pygal-3.1.0.tar.gz", hash = "sha256:fbdee7351a7423e7907fb8a9c3b77305f6b5678cb2e6fd0db36a8825e42955ec", size = 81006, upload-time = "2025-12-09T10:29:19.587Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/0b/4c/2862dd25352fe4b22ec7760d4fa12cb692587cd7ec3e378cbf644fc0d2a8/pygal-3.1.0-py3-none-any.whl", hash = "sha256:4e923490f3490c90c481f4535fa3adcda20ff374257ab9d8ae897f91b632c0bb", size = 130171, upload-time = "2025-12-09T10:29:16.721Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -939,6 +964,20 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, ] +[[package]] +name = "pytest-benchmark" +version = "5.2.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "py-cpuinfo" }, + { name = "pytest", version = "8.4.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.10'" }, + { name = "pytest", version = "9.0.2", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.10'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/24/34/9f732b76456d64faffbef6232f1f9dbec7a7c4999ff46282fa418bd1af66/pytest_benchmark-5.2.3.tar.gz", hash = "sha256:deb7317998a23c650fd4ff76e1230066a76cb45dcece0aca5607143c619e7779", size = 341340, upload-time = "2025-11-09T18:48:43.215Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/33/29/e756e715a48959f1c0045342088d7ca9762a2f509b945f362a316e9412b7/pytest_benchmark-5.2.3-py3-none-any.whl", hash = "sha256:bc839726ad20e99aaa0d11a127445457b4219bdb9e80a1afc4b51da7f96b0803", size = 45255, upload-time = "2025-11-09T18:48:39.765Z" }, +] + [[package]] name = "python-dotenv" version = "1.2.1"