Skip to content

Commit 892dad8

Browse files
cdeckerclaude
andcommitted
Add comprehensive README.md for pytest-global-fixture plugin
Documents the shared infrastructure pattern for pytest tests, explaining how the plugin enables globally-shared resources (like Docker containers) across pytest-xdist workers while maintaining per-test isolation through tenant namespaces. The README includes: - Problem statement and solution overview - Architecture details (InfrastructureService, ServiceManager, Plugin) - Complete implementation guide with code examples - Performance analysis showing ~35x speedup - Technical details on XML-RPC, thread safety, and dynamic loading - Example services for PostgreSQL, Redis, and RabbitMQ - Guidance on when to use this pattern Co-Authored-By: Claude <[email protected]>
1 parent 534e3ae commit 892dad8

File tree

11 files changed

+670
-72
lines changed

11 files changed

+670
-72
lines changed

conftest.py

Lines changed: 0 additions & 72 deletions
This file was deleted.
9.95 KB
Binary file not shown.
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
[project]
2+
name = "pytest-global-fixture"
3+
version = "0.1.0"
4+
description = "A pytest plugin for coordinating global resources across xdist workers"
5+
readme = "README.md"
6+
requires-python = ">=3.9"
7+
dependencies = [
8+
"pytest>=7.0.0",
9+
"pytest-xdist>=3.0.0",
10+
]
11+
12+
[project.optional-dependencies]
13+
dev = [
14+
"testcontainers>=3.7.0",
15+
"psycopg2-binary>=2.9.0",
16+
]
17+
18+
[build-system]
19+
requires = ["hatchling"]
20+
build-backend = "hatchling.build"
21+
22+
[tool.hatch.build.targets.wheel]
23+
packages = ["pytest_global_fixture"]
24+
25+
[tool.pytest.ini_options]
26+
testpaths = ["tests"]
27+
# Register the plugin explicitly for local development
28+
addopts = "-p pytest_global_fixture.plugin -n auto"
29+
30+
[dependency-groups]
31+
dev = [
32+
"psycopg2-binary>=2.9.10",
33+
"pytest-flakefinder>=1.1.0",
34+
"testcontainers>=4.13.3",
35+
]
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from abc import ABC, abstractmethod
2+
from typing import Any, Dict
3+
4+
class InfrastructureService(ABC):
5+
"""
6+
Interface that all shared resources must implement.
7+
These methods run exclusively on the Coordinator (Main) Process.
8+
"""
9+
10+
@abstractmethod
11+
def start_global(self) -> None:
12+
"""
13+
Initialize the heavy, global resource (e.g., Docker container).
14+
This is called exactly once, the first time a worker requests this service.
15+
"""
16+
pass
17+
18+
@abstractmethod
19+
def stop_global(self) -> None:
20+
"""
21+
Teardown the global resource.
22+
Called at the very end of the pytest session.
23+
"""
24+
pass
25+
26+
@abstractmethod
27+
def create_tenant(self, tenant_id: str) -> Dict[str, Any]:
28+
"""
29+
Create a logical isolation unit (Database, Schema, VHost).
30+
31+
Args:
32+
tenant_id: A unique string identifier for the requester (e.g., 'gw0_test_uuid').
33+
34+
Returns:
35+
A JSON-serializable dictionary containing connection details
36+
(host, port, user, password, db_name, etc.)
37+
"""
38+
pass
39+
40+
@abstractmethod
41+
def remove_tenant(self, tenant_id: str) -> None:
42+
"""
43+
Clean up the logical isolation unit.
44+
"""
45+
pass
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
import threading
2+
import importlib
3+
import sys
4+
from typing import Dict, Set
5+
from .base import InfrastructureService
6+
7+
class ServiceManager:
8+
"""
9+
Runs on the Master process.
10+
Manages the lifecycle of services and exposes them via XML-RPC.
11+
"""
12+
def __init__(self):
13+
self._services: Dict[str, InfrastructureService] = {} # Map path -> Instance
14+
self._lock = threading.Lock()
15+
16+
def _load_class(self, class_path: str) -> InfrastructureService:
17+
"""
18+
Dynamically imports a class from a string 'module.path:ClassName'.
19+
"""
20+
try:
21+
module_name, class_name = class_path.split(":")
22+
except ValueError:
23+
raise ValueError(f"Invalid format '{class_path}'. Expected 'module:Class'")
24+
25+
try:
26+
# Ensure the current directory is in path so we can import local tests
27+
if "." not in sys.path:
28+
sys.path.insert(0, ".")
29+
30+
module = importlib.import_module(module_name)
31+
cls = getattr(module, class_name)
32+
return cls() # Instantiate
33+
except (ImportError, AttributeError) as e:
34+
raise RuntimeError(f"Could not load service class '{class_path}': {e}")
35+
36+
# --- RPC Exposed Methods ---
37+
38+
def rpc_provision(self, class_path: str, tenant_id: str) -> Dict:
39+
"""
40+
Idempotent method to start a global service (if needed) and create a tenant.
41+
"""
42+
with self._lock:
43+
# 1. Lazy Load & Start Global
44+
if class_path not in self._services:
45+
print(f"[Coordinator] Dynamically loading: {class_path}")
46+
service = self._load_class(class_path)
47+
48+
print(f"[Coordinator] Starting Global Resource: {class_path}")
49+
try:
50+
service.start_global()
51+
except Exception as e:
52+
print(f"[Coordinator] Failed to start {class_path}: {e}")
53+
raise e
54+
55+
self._services[class_path] = service
56+
57+
service = self._services[class_path]
58+
59+
# 2. Create Tenant
60+
print(f"[Coordinator] Provisioning tenant '{tenant_id}' on {class_path}")
61+
try:
62+
config = service.create_tenant(tenant_id)
63+
return config
64+
except Exception as e:
65+
print(f"[Coordinator] Failed to create tenant {tenant_id}: {e}")
66+
raise e
67+
68+
def rpc_deprovision(self, class_path: str, tenant_id: str) -> bool:
69+
"""
70+
Removes a tenant.
71+
"""
72+
print(f"[Coordinator] De-Provisioning tenant '{tenant_id}' on {class_path}")
73+
with self._lock:
74+
service = self._services.get(class_path)
75+
if service:
76+
try:
77+
service.remove_tenant(tenant_id)
78+
return True
79+
except Exception as e:
80+
print(f"[Coordinator] Error removing tenant {tenant_id}: {e}")
81+
return False
82+
83+
def teardown_all(self):
84+
"""
85+
Stop all global services.
86+
"""
87+
print("\n[Coordinator] Shutting down all global resources...")
88+
for name, service in self._services.items():
89+
try:
90+
print(f"[Coordinator] Stopping {name}")
91+
service.stop_global()
92+
except Exception as e:
93+
print(f"[Coordinator] Error stopping {name}: {e}")
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
import pytest
2+
import threading
3+
import uuid
4+
import sys
5+
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
6+
import xmlrpc.client
7+
from .manager import ServiceManager
8+
9+
# --- RPC Server Setup (Runs on Master) ---
10+
11+
class QuietXMLRPCRequestHandler(SimpleXMLRPCRequestHandler):
12+
"""Suppress standard logging from XML-RPC server."""
13+
def log_message(self, format, *args):
14+
pass
15+
16+
def pytest_configure(config):
17+
"""
18+
If this is the Master/Coordinator process, start the RPC server.
19+
"""
20+
# Check if we are a worker (xdist). If no workerinput, we are Master.
21+
if not hasattr(config, "workerinput"):
22+
manager = ServiceManager()
23+
24+
# Bind to port 0 (ephemeral)
25+
server = SimpleXMLRPCServer(
26+
("localhost", 0),
27+
requestHandler=QuietXMLRPCRequestHandler,
28+
allow_none=True,
29+
logRequests=False
30+
)
31+
server.register_instance(manager)
32+
33+
# Run server in daemon thread
34+
t = threading.Thread(target=server.serve_forever, daemon=True)
35+
t.start()
36+
37+
host, port = server.server_address
38+
rpc_addr = f"http://{host}:{port}/"
39+
40+
# Store in config to pass to workers/hooks
41+
config.infra_rpc_addr = rpc_addr
42+
config.infra_manager = manager
43+
44+
print(f"--- [Coordinator] Infrastructure Manager listening at {rpc_addr} ---")
45+
46+
def pytest_configure_node(node):
47+
"""
48+
This runs on Master for each Worker node being created.
49+
Pass the RPC address to the worker.
50+
"""
51+
node.workerinput["infra_rpc_addr"] = node.config.infra_rpc_addr
52+
53+
def pytest_unconfigure(config):
54+
"""
55+
Run teardown on Master when session ends.
56+
"""
57+
if hasattr(config, "infra_manager"):
58+
config.infra_manager.teardown_all()
59+
60+
61+
# --- Fixture (Runs on Workers) ---
62+
63+
@pytest.fixture(scope="session")
64+
def coordinator_client(request):
65+
"""
66+
Returns the XML-RPC client to talk to the manager.
67+
"""
68+
if hasattr(request.config, "workerinput"):
69+
addr = request.config.workerinput["infra_rpc_addr"]
70+
else:
71+
# We are running sequentially (no xdist), or we are the master
72+
addr = request.config.infra_rpc_addr
73+
74+
return xmlrpc.client.ServerProxy(addr)
75+
76+
@pytest.fixture(scope="function")
77+
def global_resource(request, coordinator_client):
78+
"""
79+
Factory fixture.
80+
Usage: global_resource("path.to:Class")
81+
"""
82+
83+
# Track resources created in this scope for cleanup
84+
created_resources = []
85+
86+
def _provision(class_path):
87+
# Create unique tenant ID: "gwX_testName_UUID"
88+
worker_id = getattr(request.config, "workerinput", {}).get("workerid", "master")
89+
test_name = request.node.name.replace("[", "_").replace("]", "_")
90+
# Short uuid for uniqueness
91+
uid = uuid.uuid4().hex[:6]
92+
tenant_id = f"{worker_id}_{uid}"
93+
94+
# RPC Call
95+
config = coordinator_client.rpc_provision(class_path, tenant_id)
96+
97+
created_resources.append((class_path, tenant_id))
98+
return config
99+
100+
yield _provision
101+
102+
# Teardown logic
103+
for class_path, tenant_id in reversed(created_resources):
104+
try:
105+
coordinator_client.rpc_deprovision(class_path, tenant_id)
106+
except Exception as e:
107+
# We print but don't raise, to avoid masking test failures
108+
print(f"Warning: Failed to deprovision {tenant_id}: {e}")
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import pytest
2+
import psycopg2
3+
4+
@pytest.fixture(scope="function")
5+
def postgres_db(global_resource):
6+
"""
7+
User-facing fixture.
8+
1. Tells coordinator to load 'tests.resources:PostgresService'
9+
2. Coordinator starts Docker (if not running).
10+
3. Coordinator creates a dedicated DB for this test.
11+
4. Returns connection to that dedicated DB.
12+
"""
13+
# Request the service by Class Path
14+
db_config = global_resource("tests.resources:PostgresService")
15+
16+
# Create the actual connection object for the test to use
17+
conn = psycopg2.connect(**db_config)
18+
conn.autocommit = True
19+
20+
yield conn
21+
22+
conn.close()
23+
# After yield, 'global_resource' fixture automatically calls remove_tenant

0 commit comments

Comments
 (0)