Skip to content

Commit b6668a9

Browse files
committed
More changes for the sync py client
1 parent 210ba24 commit b6668a9

File tree

11 files changed

+18906
-84
lines changed

11 files changed

+18906
-84
lines changed
Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
# Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0
2+
3+
import argparse
4+
import asyncio
5+
import functools
6+
import json
7+
import math
8+
import random
9+
import time
10+
from datetime import datetime, timezone
11+
from enum import Enum
12+
from pathlib import Path
13+
from statistics import mean
14+
from typing import List
15+
16+
import numpy as np
17+
import redis as redispy # type: ignore
18+
from glide import (
19+
GlideSync,
20+
Logger,
21+
LogLevel,
22+
NodeAddress,
23+
)
24+
25+
26+
class ChosenAction(Enum):
27+
GET_NON_EXISTING = 1
28+
GET_EXISTING = 2
29+
SET = 3
30+
31+
32+
PORT = 6379
33+
34+
arguments_parser = argparse.ArgumentParser()
35+
arguments_parser.add_argument(
36+
"--resultsFile",
37+
help="Where to write the results file",
38+
required=False,
39+
default="../results/python-results.json",
40+
)
41+
arguments_parser.add_argument(
42+
"--dataSize", help="Size of data to set", required=False, default="100"
43+
)
44+
arguments_parser.add_argument(
45+
"--concurrentTasks",
46+
help="List of number of concurrent tasks to run",
47+
nargs="+",
48+
required=False,
49+
default=("1", "10", "100", "1000"),
50+
)
51+
arguments_parser.add_argument(
52+
"--clients", help="Which clients should run", required=False, default="all"
53+
)
54+
arguments_parser.add_argument(
55+
"--host", help="What host to target", required=False, default="localhost"
56+
)
57+
arguments_parser.add_argument(
58+
"--clientCount",
59+
help="Number of clients to run concurrently",
60+
nargs="+",
61+
required=False,
62+
default=("1"),
63+
)
64+
arguments_parser.add_argument(
65+
"--tls",
66+
help="Should benchmark a TLS server",
67+
action="store_true",
68+
required=False,
69+
default=False,
70+
)
71+
arguments_parser.add_argument(
72+
"--clusterModeEnabled",
73+
help="Should benchmark a cluster mode enabled cluster",
74+
action="store_true",
75+
required=False,
76+
default=False,
77+
)
78+
arguments_parser.add_argument(
79+
"--port",
80+
default=PORT,
81+
type=int,
82+
required=False,
83+
help="Which port to connect to, defaults to `%(default)s`",
84+
)
85+
arguments_parser.add_argument(
86+
"--minimal", help="Should run a minimal benchmark", action="store_true"
87+
)
88+
args = arguments_parser.parse_args()
89+
90+
PROB_GET = 0.8
91+
PROB_GET_EXISTING_KEY = 0.8
92+
SIZE_GET_KEYSPACE = 3750000 # 3.75 million
93+
SIZE_SET_KEYSPACE = 3000000 # 3 million
94+
started_tasks_counter = 0
95+
running_tasks = set()
96+
bench_json_results: List[str] = []
97+
98+
99+
def truncate_decimal(number: float, digits: int = 3) -> float:
100+
stepper = 10**digits
101+
return math.floor(number * stepper) / stepper
102+
103+
104+
def generate_value(size):
105+
return str("0" * size)
106+
107+
108+
def generate_key_set():
109+
return str(random.randint(1, SIZE_SET_KEYSPACE + 1))
110+
111+
112+
def generate_key_get():
113+
return str(random.randint(SIZE_SET_KEYSPACE, SIZE_GET_KEYSPACE + 1))
114+
115+
116+
def choose_action():
117+
if random.random() > PROB_GET:
118+
return ChosenAction.SET
119+
if random.random() > PROB_GET_EXISTING_KEY:
120+
return ChosenAction.GET_NON_EXISTING
121+
return ChosenAction.GET_EXISTING
122+
123+
124+
def calculate_latency(latency_list, percentile):
125+
return round(np.percentile(np.array(latency_list), percentile), 4)
126+
127+
128+
def process_results():
129+
global bench_json_results
130+
global args
131+
132+
# write json results to a file
133+
res_file_path = args.resultsFile
134+
with open(res_file_path, "w+") as f:
135+
json.dump(bench_json_results, f)
136+
137+
138+
def timer(func):
139+
@functools.wraps(func)
140+
def wrapper(*args, **kwargs):
141+
tic = time.perf_counter()
142+
func(*args, **kwargs)
143+
toc = time.perf_counter()
144+
return toc - tic
145+
146+
return wrapper
147+
148+
@timer
149+
def execute_commands(clients, total_commands, data_size, action_latencies):
150+
global started_tasks_counter
151+
while started_tasks_counter < total_commands:
152+
started_tasks_counter += 1
153+
chosen_action = choose_action()
154+
client = clients[started_tasks_counter % len(clients)]
155+
tic = time.perf_counter()
156+
if chosen_action == ChosenAction.GET_EXISTING:
157+
res = client.get(generate_key_set())
158+
elif chosen_action == ChosenAction.GET_NON_EXISTING:
159+
res = client.get(generate_key_get())
160+
elif chosen_action == ChosenAction.SET:
161+
res = client.set(generate_key_set(), generate_value(data_size))
162+
toc = time.perf_counter()
163+
execution_time_milli = (toc - tic) * 1000
164+
action_latencies[chosen_action].append(truncate_decimal(execution_time_milli))
165+
return True
166+
167+
168+
@timer
169+
def create_and_run_concurrent_tasks(
170+
clients, total_commands, num_of_concurrent_tasks, data_size, action_latencies
171+
):
172+
global started_tasks_counter
173+
global get_latency
174+
global set_latency
175+
started_tasks_counter = 0
176+
for _ in range(num_of_concurrent_tasks):
177+
task = asyncio.create_task(
178+
execute_commands(clients, total_commands, data_size, action_latencies)
179+
)
180+
running_tasks.add(task)
181+
task.add_done_callback(running_tasks.discard)
182+
asyncio.gather(*(list(running_tasks)))
183+
184+
185+
def latency_results(prefix, latencies):
186+
result = {}
187+
result[prefix + "_p50_latency"] = calculate_latency(latencies, 50)
188+
result[prefix + "_p90_latency"] = calculate_latency(latencies, 90)
189+
result[prefix + "_p99_latency"] = calculate_latency(latencies, 9)
190+
result[prefix + "_average_latency"] = truncate_decimal(mean(latencies))
191+
result[prefix + "_std_dev"] = truncate_decimal(np.std(latencies))
192+
193+
return result
194+
195+
196+
def create_clients(client_count, action):
197+
return [action() for _ in range(client_count)]
198+
199+
200+
def run_clients(
201+
clients,
202+
client_name,
203+
total_commands,
204+
data_size,
205+
is_cluster,
206+
):
207+
now = datetime.now(timezone.utc).strftime("%H:%M:%S")
208+
print(
209+
f"Starting {client_name} data size: {data_size}"
210+
f"client count: {len(clients)} {now}"
211+
)
212+
action_latencies = {
213+
ChosenAction.GET_NON_EXISTING: list(),
214+
ChosenAction.GET_EXISTING: list(),
215+
ChosenAction.SET: list(),
216+
}
217+
218+
time = execute_commands(clients, total_commands, data_size, action_latencies)
219+
220+
tps = int(started_tasks_counter / time)
221+
get_non_existing_latencies = action_latencies[ChosenAction.GET_NON_EXISTING]
222+
get_non_existing_latency_results = latency_results(
223+
"get_non_existing", get_non_existing_latencies
224+
)
225+
226+
get_existing_latencies = action_latencies[ChosenAction.GET_EXISTING]
227+
get_existing_latency_results = latency_results(
228+
"get_existing", get_existing_latencies
229+
)
230+
231+
set_latencies = action_latencies[ChosenAction.SET]
232+
set_results = latency_results("set", set_latencies)
233+
234+
json_res = {
235+
**{
236+
"client": client_name,
237+
"data_size": data_size,
238+
"tps": tps,
239+
"client_count": len(clients),
240+
"is_cluster": is_cluster,
241+
},
242+
**get_existing_latency_results,
243+
**get_non_existing_latency_results,
244+
**set_results,
245+
}
246+
247+
bench_json_results.append(json_res)
248+
249+
250+
def main(
251+
total_commands,
252+
data_size,
253+
clients_to_run,
254+
host,
255+
client_count,
256+
use_tls,
257+
is_cluster,
258+
):
259+
if clients_to_run == "all" or clients_to_run == "redispy":
260+
client_class = redispy.RedisCluster if is_cluster else redispy.Redis
261+
clients = create_clients(
262+
client_count,
263+
lambda: client_class(
264+
host=host, port=port, decode_responses=False, ssl=use_tls
265+
),
266+
)
267+
268+
run_clients(
269+
clients,
270+
"redispy",
271+
total_commands,
272+
data_size,
273+
is_cluster,
274+
)
275+
276+
for client in clients:
277+
client.close()
278+
279+
if clients_to_run == "all" or clients_to_run == "glide":
280+
# Glide Socket
281+
# client_class = GlideClusterClient if is_cluster else GlideClient
282+
# config = GlideClusterClientConfiguration(
283+
# [NodeAddress(host=host, port=port)], use_tls=use_tls
284+
# ) if is_cluster else GlideClientConfiguration(
285+
# [NodeAddress(host=host, port=port)], use_tls=use_tls
286+
# )
287+
clients = create_clients(
288+
client_count,
289+
# lambda: client_class.create(config),
290+
lambda: GlideSync(),
291+
)
292+
run_clients(
293+
clients,
294+
"glide_sync",
295+
total_commands,
296+
data_size,
297+
is_cluster,
298+
)
299+
300+
301+
def number_of_iterations(num_of_concurrent_tasks):
302+
return min(max(100000, num_of_concurrent_tasks * 10000), 5000000)
303+
304+
305+
if __name__ == "__main__":
306+
concurrent_tasks = args.concurrentTasks
307+
data_size = int(args.dataSize)
308+
clients_to_run = args.clients
309+
client_count = args.clientCount
310+
host = args.host
311+
use_tls = args.tls
312+
port = args.port
313+
is_cluster = args.clusterModeEnabled
314+
315+
# Setting the internal logger to log every log that has a level of info and above,
316+
# and save the logs to a file with the name of the results file.
317+
Logger.set_logger_config(LogLevel.INFO, Path(args.resultsFile).stem)
318+
319+
product_of_arguments = [
320+
(data_size, int(number_of_clients))
321+
for number_of_clients in client_count
322+
]
323+
324+
for data_size, number_of_clients in product_of_arguments:
325+
iterations = 100000
326+
main(
327+
iterations,
328+
data_size,
329+
clients_to_run,
330+
host,
331+
number_of_clients,
332+
use_tls,
333+
is_cluster,
334+
)
335+
336+
337+
process_results()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
import redis
2+
import sys
3+
from glide import GlideSync
4+
5+
# Example usage
6+
if __name__ == "__main__":
7+
redis_client = redis.Redis(decode_responses=False)
8+
glide_client = GlideSync()
9+
print(glide_client.strlen("foo"))
10+
key="foo"
11+
value = "0"*4
12+
print(f"value size={sys.getsizeof(value)}")
13+
import timeit
14+
def glide_test_fn():
15+
glide_client.set(key, value)
16+
glide_client.mget([key, key, key, key, key, key, key, key, key])
17+
18+
def redis_test_fn():
19+
redis_client.set(key, value)
20+
glide_client.mget([key, key, key, key, key, key, key, key, key])
21+
22+
# Benchmark the function
23+
num_of_requests = 1000
24+
redispy_execution_time = timeit.timeit(redis_test_fn, number=num_of_requests)
25+
glide_execution_time = timeit.timeit(glide_test_fn, number=num_of_requests)
26+
print(f"Glide Execution time: {glide_execution_time:.6f} seconds, avg TPS: {(num_of_requests/glide_execution_time):.0f}")
27+
print(f"RedisPy Execution time: {redispy_execution_time:.6f} seconds, avg TPS: {(num_of_requests/redispy_execution_time):.0f}")

0 commit comments

Comments
 (0)