Skip to content

Commit b2aaeec

Browse files
committed
Use worker ID in search index names
1 parent 293d7ac commit b2aaeec

File tree

9 files changed

+147
-117
lines changed

9 files changed

+147
-117
lines changed

tests/conftest.py

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,23 +10,32 @@
1010
from redisvl.utils.vectorize import HFTextVectorizer
1111

1212

13+
@pytest.fixture(scope="session")
14+
def worker_id(request):
15+
"""
16+
Get the worker ID for the current test.
17+
18+
In pytest-xdist, the config has "workerid" in workerinput.
19+
This fixture abstracts that logic to provide a consistent worker_id
20+
across all tests.
21+
"""
22+
workerinput = getattr(request.config, "workerinput", {})
23+
return workerinput.get("workerid", "master")
24+
25+
1326
@pytest.fixture(autouse=True)
1427
def set_tokenizers_parallelism():
1528
"""Disable tokenizers parallelism in tests to avoid deadlocks"""
1629
os.environ["TOKENIZERS_PARALLELISM"] = "false"
1730

1831

1932
@pytest.fixture(scope="session", autouse=True)
20-
def redis_container(request):
33+
def redis_container(worker_id):
2134
"""
2235
If using xdist, create a unique Compose project for each xdist worker by
2336
setting COMPOSE_PROJECT_NAME. That prevents collisions on container/volume
2437
names.
2538
"""
26-
# In xdist, the config has "workerid" in workerinput
27-
workerinput = getattr(request.config, "workerinput", {})
28-
worker_id = workerinput.get("workerid", "master")
29-
3039
# Set the Compose project name so containers do not clash across workers
3140
os.environ["COMPOSE_PROJECT_NAME"] = f"redis_test_{worker_id}"
3241
os.environ.setdefault("REDIS_IMAGE", "redis/redis-stack-server:latest")
@@ -206,16 +215,17 @@ def pytest_collection_modifyitems(
206215

207216

208217
@pytest.fixture
209-
def flat_index(sample_data, redis_url):
218+
def flat_index(sample_data, redis_url, worker_id):
210219
"""
211220
A fixture that uses the "flag" algorithm for its vector field.
212221
"""
222+
213223
# construct a search index from the schema
214224
index = SearchIndex.from_dict(
215225
{
216226
"index": {
217-
"name": "user_index",
218-
"prefix": "v1",
227+
"name": f"user_index_{worker_id}",
228+
"prefix": f"v1_{worker_id}",
219229
"storage_type": "hash",
220230
},
221231
"fields": [
@@ -260,16 +270,17 @@ def hash_preprocess(item: dict) -> dict:
260270

261271

262272
@pytest.fixture
263-
async def async_flat_index(sample_data, redis_url):
273+
async def async_flat_index(sample_data, redis_url, worker_id):
264274
"""
265275
A fixture that uses the "flag" algorithm for its vector field.
266276
"""
277+
267278
# construct a search index from the schema
268279
index = AsyncSearchIndex.from_dict(
269280
{
270281
"index": {
271-
"name": "user_index",
272-
"prefix": "v1",
282+
"name": f"user_index_{worker_id}",
283+
"prefix": f"v1_{worker_id}",
273284
"storage_type": "hash",
274285
},
275286
"fields": [
@@ -314,15 +325,16 @@ def hash_preprocess(item: dict) -> dict:
314325

315326

316327
@pytest.fixture
317-
async def async_hnsw_index(sample_data, redis_url):
328+
async def async_hnsw_index(sample_data, redis_url, worker_id):
318329
"""
319330
A fixture that uses the "hnsw" algorithm for its vector field.
320331
"""
332+
321333
index = AsyncSearchIndex.from_dict(
322334
{
323335
"index": {
324-
"name": "user_index",
325-
"prefix": "v1",
336+
"name": f"user_index_{worker_id}",
337+
"prefix": f"v1_{worker_id}",
326338
"storage_type": "hash",
327339
},
328340
"fields": [
@@ -364,15 +376,16 @@ def hash_preprocess(item: dict) -> dict:
364376

365377

366378
@pytest.fixture
367-
def hnsw_index(sample_data, redis_url):
379+
def hnsw_index(sample_data, redis_url, worker_id):
368380
"""
369381
A fixture that uses the "hnsw" algorithm for its vector field.
370382
"""
383+
371384
index = SearchIndex.from_dict(
372385
{
373386
"index": {
374-
"name": "user_index",
375-
"prefix": "v1",
387+
"name": f"user_index_{worker_id}",
388+
"prefix": f"v1_{worker_id}",
376389
"storage_type": "hash",
377390
},
378391
"fields": [

tests/integration/test_aggregation.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,11 @@
1010

1111

1212
@pytest.fixture
13-
def index(sample_data, redis_url, request):
14-
# In xdist, the config has "workerid" in workerinput
15-
workerinput = getattr(request.config, "workerinput", {})
16-
worker_id = workerinput.get("workerid", "master")
17-
13+
def index(sample_data, redis_url, worker_id):
1814
index = SearchIndex.from_dict(
1915
{
2016
"index": {
21-
"name": "user_index",
17+
"name": f"user_index_{worker_id}",
2218
"prefix": f"v1_{worker_id}",
2319
"storage_type": "hash",
2420
},

tests/integration/test_async_search_index.py

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -32,30 +32,24 @@ def async_index(index_schema, async_client):
3232

3333

3434
@pytest.fixture
35-
def async_index_from_dict(request):
36-
# In xdist, the config has "workerid" in workerinput
37-
workerinput = getattr(request.config, "workerinput", {})
38-
worker_id = workerinput.get("workerid", "master")
35+
def async_index_from_dict(worker_id):
3936

4037
return AsyncSearchIndex.from_dict(
41-
{"index": {"name": "my_index", "prefix": f"rvl_{worker_id}"}, "fields": fields}
38+
{
39+
"index": {"name": f"my_index_{worker_id}", "prefix": f"rvl_{worker_id}"},
40+
"fields": fields,
41+
}
4242
)
4343

4444

4545
@pytest.fixture
46-
def async_index_from_yaml(request):
47-
# In xdist, the config has "workerid" in workerinput
48-
workerinput = getattr(request.config, "workerinput", {})
49-
worker_id = workerinput.get("workerid", "master")
46+
def async_index_from_yaml(worker_id):
5047

51-
# Load the schema from YAML
52-
schema = IndexSchema.from_yaml("schemas/test_json_schema.yaml")
53-
54-
# Modify the prefix to include the worker ID
55-
schema.index.prefix = f"{schema.index.prefix}_{worker_id}"
56-
57-
# Create the AsyncSearchIndex with the modified schema
58-
return AsyncSearchIndex(schema=schema)
48+
index = AsyncSearchIndex.from_yaml("schemas/test_json_schema.yaml")
49+
# Update the index name and prefix to include worker_id
50+
index.schema.index.name = f"{index.schema.index.name}_{worker_id}"
51+
index.schema.index.prefix = f"{index.schema.index.prefix}_{worker_id}"
52+
return index
5953

6054

6155
def test_search_index_properties(index_schema, async_index):
@@ -73,7 +67,7 @@ def test_search_index_properties(index_schema, async_index):
7367

7468

7569
def test_search_index_from_yaml(async_index_from_yaml):
76-
assert async_index_from_yaml.name == "json-test"
70+
assert async_index_from_yaml.name.startswith("json-test")
7771
assert async_index_from_yaml.client is None
7872
assert async_index_from_yaml.prefix == "json"
7973
assert async_index_from_yaml.key_separator == ":"
@@ -82,7 +76,7 @@ def test_search_index_from_yaml(async_index_from_yaml):
8276

8377

8478
def test_search_index_from_dict(async_index_from_dict):
85-
assert async_index_from_dict.name == "my_index"
79+
assert async_index_from_dict.name.startswith("my_index")
8680
assert async_index_from_dict.client is None
8781
assert async_index_from_dict.prefix == "rvl"
8882
assert async_index_from_dict.key_separator == ":"

tests/integration/test_flow.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,12 @@
4242

4343

4444
@pytest.mark.parametrize("schema", [hash_schema, json_schema])
45-
def test_simple(client, schema, sample_data):
45+
def test_simple(client, schema, sample_data, worker_id):
46+
# Update schema with worker_id
47+
schema = schema.copy()
48+
schema["index"] = schema["index"].copy()
49+
schema["index"]["name"] = f"{schema['index']['name']}_{worker_id}"
50+
schema["index"]["prefix"] = f"{schema['index']['prefix']}_{worker_id}"
4651
index = SearchIndex.from_dict(schema, redis_client=client)
4752
# create the index
4853
index.create(overwrite=True, drop=True)

tests/integration/test_flow_async.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,12 @@
4646

4747
@pytest.mark.asyncio
4848
@pytest.mark.parametrize("schema", [hash_schema, json_schema])
49-
async def test_simple(async_client, schema, sample_data):
49+
async def test_simple(async_client, schema, sample_data, worker_id):
50+
# Update schema with worker_id
51+
schema = schema.copy()
52+
schema["index"] = schema["index"].copy()
53+
schema["index"]["name"] = f"{schema['index']['name']}_{worker_id}"
54+
schema["index"]["prefix"] = f"{schema['index']['prefix']}_{worker_id}"
5055
index = AsyncSearchIndex.from_dict(schema, redis_client=async_client)
5156
# create the index
5257
await index.create(overwrite=True, drop=True)

0 commit comments

Comments
 (0)