Skip to content

Commit a9ecff6

Browse files
authored
test: search benchmark updated to use class fixture for multiple tests (#5425)
* fix: ft benchmark was updated * fix: naming updated * fix: latency_tracking flag was added in instance.py
1 parent e1cf9f7 commit a9ecff6

File tree

3 files changed

+150
-124
lines changed

3 files changed

+150
-124
lines changed

tests/dragonfly/instance.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,9 @@ def create(self, existing_port=None, path=None, version=100, **kwargs) -> DflyIn
423423
if version > 1.27:
424424
args.setdefault("omit_basic_usage")
425425

426+
if version > 1.31:
427+
args.setdefault("latency_tracking")
428+
426429
args.setdefault("log_dir", self.params.log_dir)
427430

428431
if version >= 1.21 and "serialization_max_chunk_size" not in args:

tests/dragonfly/search_benchmark_test.py

Lines changed: 145 additions & 122 deletions
Original file line numberDiff line numberDiff line change
@@ -15,128 +15,151 @@
1515
)
1616

1717

18-
async def run_dragonfly_benchmark(
19-
df_server: DflyInstance,
20-
num_documents: int,
21-
num_queries: int,
22-
num_concurrent_clients: int,
23-
random_seed: int,
24-
):
25-
set_random_seed(random_seed)
26-
27-
logging.info(f"Starting Dragonfly benchmark test on port {df_server.port}")
28-
logging.info(
29-
f"Parameters: {num_documents} documents, {num_queries} queries, {num_concurrent_clients} concurrent clients, seed={random_seed}"
30-
)
31-
client = df_server.client()
32-
33-
# Basic connectivity check
34-
assert await client.ping() == True
35-
36-
# Stage 1: Schema Generation
37-
logging.info("Stage 1: Schema Generation - generating columns and creating search index")
38-
document_columns = generate_document_columns()
39-
await create_search_index(client, document_columns)
40-
41-
# Verify the index was created
42-
index_info = await client.execute_command(f"FT.INFO {INDEX_KEY}")
43-
assert index_info is not None
44-
logging.info(
45-
f"Stage 1 completed: search index '{INDEX_KEY}' created with {len(document_columns)} columns"
46-
)
47-
48-
# Stage 2: Data Generation
49-
logging.info(
50-
f"Stage 2: Data Generation - generating {num_documents:,} documents with full column data"
51-
)
52-
stage2_start = time.time()
53-
document_ids = await generate_document_data(
54-
client=client,
55-
columns=document_columns,
56-
num_documents=num_documents,
57-
chunk_size=1000, # Chunk size for batch processing
58-
)
59-
60-
# Verify data was generated
61-
assert len(document_ids) == num_documents
62-
63-
# Verify some documents were stored
64-
sample_document_id = document_ids[0]
65-
document_key = DOCUMENT_KEY.format(documentId=sample_document_id)
66-
stored_document = await client.hgetall(document_key)
67-
assert stored_document is not None
68-
assert stored_document["DocumentId"] == sample_document_id
69-
stage2_duration = time.time() - stage2_start
70-
logging.info(
71-
f"Stage 2 completed in {stage2_duration:.2f}s: {len(document_ids)} documents generated and stored"
72-
)
73-
74-
# Stage 3: Query Load Testing
75-
logging.info(
76-
f"Stage 3: Query Load Testing - running {num_queries:,} queries with {num_concurrent_clients} concurrent clients"
77-
)
78-
stage3_start = time.time()
79-
total_completed = await run_query_load_test(
80-
df_server=df_server,
81-
columns=document_columns,
82-
document_ids=document_ids,
83-
total_queries=num_queries,
84-
num_concurrent_clients=num_concurrent_clients,
85-
)
86-
87-
# Verify queries completed
88-
assert total_completed == num_queries
89-
stage3_duration = time.time() - stage3_start
90-
logging.info(
91-
f"Stage 3 completed in {stage3_duration:.2f}s: {total_completed} queries executed successfully"
92-
)
93-
94-
# Final summary
95-
logging.info(
96-
f"Benchmark Timings Summary -> Data Generation: {stage2_duration:.2f}s | Query Load: {stage3_duration:.2f}s"
97-
)
98-
99-
# Command statistics
100-
cmd_stats = await client.info("commandstats")
101-
logging.info("Command Statistics:")
102-
for key, value in cmd_stats.items():
103-
if key.startswith("cmdstat_") and "ft." in key.lower():
104-
command = key[8:] # Remove "cmdstat_" prefix
105-
logging.info(f" {command}: {value}")
106-
107-
# Latency statistics
108-
latency_stats = await client.info("latencystats")
109-
logging.info("Latency Statistics:")
110-
for key, value in latency_stats.items():
111-
if "ft." in key.lower():
112-
logging.info(f" {key}: {value}")
113-
114-
# Memory statistics
115-
memory_stats = await client.info("memory")
116-
logging.info("Memory Statistics:")
117-
important_memory_keys = [
118-
"used_memory",
119-
"used_memory_human",
120-
"used_memory_rss",
121-
"used_memory_rss_human",
122-
"used_memory_peak",
123-
"used_memory_peak_human",
124-
]
125-
for key in important_memory_keys:
126-
if key in memory_stats:
127-
logging.info(f" {key}: {memory_stats[key]}")
128-
129-
logging.info("Benchmark test completed successfully")
130-
131-
# Close client
132-
await client.aclose()
133-
134-
13518
@dfly_args({"proactor_threads": 4})
13619
@pytest.mark.opt_only
13720
@pytest.mark.slow
138-
async def test_dragonfly_benchmark(
139-
df_server: DflyInstance,
140-
):
141-
# num_documents, num_queries, num_concurrent_clients, random_seed
142-
await run_dragonfly_benchmark(df_server, 3000, 100, 10, 42)
21+
class TestSearchBenchmark:
22+
random_seed = 42
23+
num_documents = 3000
24+
chunk_size = 1000
25+
26+
@pytest.fixture(scope="class")
27+
async def prepared_benchmark_data(self, df_server: DflyInstance):
28+
set_random_seed(self.random_seed)
29+
30+
logging.info(f"Preparing benchmark data on port {df_server.port}")
31+
client = df_server.client()
32+
33+
# Basic connectivity check
34+
assert await client.ping() == True
35+
36+
# Schema Generation
37+
logging.info("Schema Generation - generating columns and creating search index")
38+
document_columns = generate_document_columns()
39+
await create_search_index(client, document_columns)
40+
41+
# Verify the index was created
42+
index_info = await client.execute_command(f"FT.INFO {INDEX_KEY}")
43+
assert index_info is not None
44+
logging.info(f"Search index '{INDEX_KEY}' created with {len(document_columns)} columns")
45+
46+
# Data Generation
47+
logging.info(
48+
f"Data Generation - generating {self.num_documents:,} documents with full column data"
49+
)
50+
stage_start = time.time()
51+
document_ids = await generate_document_data(
52+
client=client,
53+
columns=document_columns,
54+
num_documents=self.num_documents,
55+
chunk_size=self.chunk_size, # Chunk size for batch processing
56+
)
57+
58+
# Verify data was generated
59+
assert len(document_ids) == self.num_documents
60+
61+
# Verify some documents were stored
62+
sample_document_id = document_ids[0]
63+
document_key = DOCUMENT_KEY.format(documentId=sample_document_id)
64+
stored_document = await client.hgetall(document_key)
65+
assert stored_document is not None
66+
assert stored_document["DocumentId"] == sample_document_id
67+
stage_duration = time.time() - stage_start
68+
logging.info(
69+
f"Preparation stage completed in {stage_duration:.2f}s: {len(document_ids)} documents generated and stored"
70+
)
71+
72+
await client.aclose()
73+
74+
return {
75+
"document_columns": document_columns,
76+
"document_ids": document_ids,
77+
"num_documents": self.num_documents,
78+
"setup_duration": stage_duration,
79+
}
80+
81+
async def _run_benchmark(
82+
self,
83+
df_server: DflyInstance,
84+
prepared_benchmark_data,
85+
num_queries: int,
86+
num_concurrent_clients: int,
87+
test_name: str,
88+
):
89+
logging.info(f"Starting {test_name} test on port {df_server.port}")
90+
logging.info(
91+
f"Parameters: {prepared_benchmark_data['num_documents']} documents, {num_queries} queries, {num_concurrent_clients} concurrent clients"
92+
)
93+
94+
client = df_server.client()
95+
96+
# Basic connectivity check
97+
assert await client.ping() == True
98+
99+
# Query Load Testing
100+
logging.info(
101+
f"Query Load Testing - running {num_queries:,} queries with {num_concurrent_clients} concurrent clients"
102+
)
103+
stage_start = time.time()
104+
total_completed = await run_query_load_test(
105+
df_server=df_server,
106+
columns=prepared_benchmark_data["document_columns"],
107+
document_ids=prepared_benchmark_data["document_ids"],
108+
total_queries=num_queries,
109+
num_concurrent_clients=num_concurrent_clients,
110+
)
111+
112+
# Verify queries completed
113+
assert total_completed == num_queries
114+
stage_duration = time.time() - stage_start
115+
logging.info(
116+
f"Query Load Testing completed in {stage_duration:.2f}s: {total_completed} queries executed successfully"
117+
)
118+
119+
# Final summary
120+
logging.info(
121+
f"Benchmark Timings Summary -> Data Generation: {prepared_benchmark_data['setup_duration']:.2f}s | Query Load: {stage_duration:.2f}s"
122+
)
123+
124+
# Command statistics
125+
cmd_stats = await client.info("commandstats")
126+
logging.info("Command Statistics:")
127+
for key, value in cmd_stats.items():
128+
if key.startswith("cmdstat_") and "ft." in key.lower():
129+
command = key[8:] # Remove "cmdstat_" prefix
130+
logging.info(f" {command}: {value}")
131+
132+
# Latency statistics
133+
latency_stats = await client.info("latencystats")
134+
logging.info("Latency Statistics:")
135+
for key, value in latency_stats.items():
136+
if "ft." in key.lower():
137+
logging.info(f" {key}: {value}")
138+
139+
# Memory statistics
140+
memory_stats = await client.info("memory")
141+
logging.info("Memory Statistics:")
142+
important_memory_keys = [
143+
"used_memory",
144+
"used_memory_human",
145+
"used_memory_rss",
146+
"used_memory_rss_human",
147+
"used_memory_peak",
148+
"used_memory_peak_human",
149+
]
150+
for key in important_memory_keys:
151+
if key in memory_stats:
152+
logging.info(f" {key}: {memory_stats[key]}")
153+
154+
logging.info(f"{test_name} completed successfully")
155+
156+
# Close client
157+
await client.aclose()
158+
159+
async def test_standard_benchmark(self, df_server: DflyInstance, prepared_benchmark_data):
160+
"""Standard benchmark test - 100 queries with 10 concurrent clients."""
161+
await self._run_benchmark(df_server, prepared_benchmark_data, 100, 10, "Standard Benchmark")
162+
163+
async def test_small_benchmark(self, df_server: DflyInstance, prepared_benchmark_data):
164+
"""Small benchmark test - 50 queries with 5 concurrent clients."""
165+
await self._run_benchmark(df_server, prepared_benchmark_data, 50, 5, "Small Benchmark")

tests/dragonfly/server_family_test.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -222,7 +222,6 @@ def match_label_value(s: Sample, name, func):
222222
match_label_value(sample, "other", lambda v: v == 1)
223223

224224

225-
@dfly_args({"latency_tracking": True})
226225
async def test_latency_stats(async_client: aioredis.Redis):
227226
for _ in range(100):
228227
await async_client.set("foo", "bar")
@@ -245,7 +244,8 @@ async def test_latency_stats(async_client: aioredis.Redis):
245244
)
246245

247246

248-
async def test_latency_stats_disabled_by_default(async_client: aioredis.Redis):
247+
@dfly_args({"latency_tracking": False})
248+
async def test_latency_stats_disabled(async_client: aioredis.Redis):
249249
for _ in range(100):
250250
await async_client.set("foo", "bar")
251251
assert await async_client.info("LATENCYSTATS") == {}

0 commit comments

Comments
 (0)