Skip to content

Commit dba4c6b

Browse files
authored
feat(server): active memory defrag test application (dragonflydb#552)
Signed-off-by: Boaz Sade <[email protected]> Signed-off-by: Boaz Sade <[email protected]>
1 parent 444f7e3 commit dba4c6b

File tree

2 files changed

+179
-0
lines changed

2 files changed

+179
-0
lines changed

tools/defrag_mem_test.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
#!/usr/bin/env python3
2+
import asyncio
3+
import aioredis
4+
import async_timeout
5+
import sys
6+
import argparse
7+
'''
8+
To install: pip install -r requirements.txt
9+
10+
This program would try to re-create the issue with memory defragmentation.
11+
See issue number 448 for more details.
12+
To run this:
13+
You can just execute this from the command line without any arguemnts.
14+
Or you can run with --help to see the options.
15+
The defaults are:
16+
number of keys: 4,023,467
17+
value size: 64 bytes
18+
key name pattern: key-for-testing
19+
host: localhost
20+
port: default redis port
21+
Please note that this would create 4 * number of keys entries
22+
You can see the memory usage/defrag state with the monitoring task that
23+
prints the current state
24+
25+
NOTE:
26+
If this seems to get stuck please kill it with ctrl+c
27+
This can happen in case we don't have "defrag_realloc_total > 0"
28+
'''
29+
30+
class TaskCancel:
31+
def __init__(self):
32+
self.run = True
33+
34+
def dont_stop(self):
35+
return self.run
36+
37+
def stop(self):
38+
self.run = False
39+
40+
async def run_cmd(connection, cmd, sub_val):
41+
val = await connection.execute_command(cmd, sub_val)
42+
return val
43+
44+
async def handle_defrag_stats(connection, prev):
45+
info = await run_cmd(connection, "info", "stats")
46+
if info is not None:
47+
if info['defrag_task_invocation_total'] != prev:
48+
print("--------------------------------------------------------------")
49+
print(f"defrag_task_invocation_total: {info['defrag_task_invocation_total']:,}")
50+
print(f"defrag_realloc_total: {info['defrag_realloc_total']:,}")
51+
print(f"defrag_attempt_total: {info['defrag_attempt_total']:,}")
52+
print("--------------------------------------------------------------")
53+
if info["defrag_realloc_total"] > 0:
54+
return True, None
55+
return False, info['defrag_task_invocation_total']
56+
return False, None
57+
58+
async def memory_stats(connection):
59+
print("--------------------------------------------------------------")
60+
info = await run_cmd(connection, "info", "memory")
61+
print(f"memory commited: {info['comitted_memory']:,}")
62+
print(f"memory used: {info['used_memory']:,}")
63+
print(f"memory usage ratio: {info['comitted_memory']/info['used_memory']:.2f}")
64+
print("--------------------------------------------------------------")
65+
66+
67+
async def stats_check(connection, condition):
68+
try:
69+
defrag_task_invocation_total = 0;
70+
runs=0
71+
while condition.dont_stop():
72+
await asyncio.sleep(0.3)
73+
done, d = await handle_defrag_stats(connection, defrag_task_invocation_total)
74+
if done:
75+
print("defrag task successfully found memory locations to reallocate")
76+
condition.stop()
77+
else:
78+
if d is not None:
79+
defrag_task_invocation_total = d
80+
runs += 1
81+
if runs % 3 == 0:
82+
await memory_stats(connection)
83+
for i in range(5):
84+
done, d = await handle_defrag_stats(connection, -1)
85+
if done:
86+
print("defrag task successfully found memory locations to reallocate")
87+
return True
88+
else:
89+
await asyncio.sleep(2)
90+
return True
91+
except Exception as e:
92+
print(f"failed to run monitor task: {e}")
93+
return False
94+
95+
96+
async def delete_keys(connection, keys):
97+
results = await connection.delete(*keys)
98+
return results
99+
100+
def generate_keys(pattern: str, count: int, batch_size: int) -> list:
101+
for i in range(1, count, batch_size):
102+
batch = [f"{pattern}{j}" for j in range(i, batch_size + i, 3)]
103+
yield batch
104+
105+
async def mem_cleanup(connection, pattern, num, cond, keys_count):
106+
counter=0
107+
for keys in generate_keys(pattern=pattern, count=keys_count, batch_size=950):
108+
if cond.dont_stop() == False:
109+
print(f"task number {num} that deleted keys {pattern} finished")
110+
return counter
111+
counter += await delete_keys(connection, keys)
112+
await asyncio.sleep(0.2)
113+
print(f"task number {num} that deleted keys {pattern} finished")
114+
return counter
115+
116+
117+
async def run_tasks(pool, key_name, value_size, keys_count):
118+
keys = [f"{key_name}-{i}" for i in range(4)]
119+
stop_cond = TaskCancel()
120+
try:
121+
connection = aioredis.Redis(connection_pool=pool)
122+
for key in keys:
123+
print(f"creating key {key} with size {value_size} of count {keys_count}")
124+
await connection.execute_command("DEBUG", "POPULATE", keys_count, key, value_size)
125+
await asyncio.sleep(2)
126+
tasks = []
127+
count = 0
128+
for key in keys:
129+
pattern=f"{key}:"
130+
print(f"deleting keys from {pattern}")
131+
tasks.append(mem_cleanup(connection=connection, pattern=pattern, num=count, cond=stop_cond, keys_count=int(keys_count)))
132+
count += 1
133+
monitor_task = asyncio.create_task(stats_check(connection, stop_cond))
134+
total = await asyncio.gather(*tasks, return_exceptions=True)
135+
print(f"successfully deleted {sum(total)} keys")
136+
stop_cond.stop()
137+
await monitor_task
138+
print("finish executing")
139+
return True
140+
except Exception as e:
141+
print(f"got error {e} while running delete keys")
142+
return False
143+
144+
145+
def connect_and_run(key_name, value_size, keys_count, host="localhost", port=6379):
146+
async_pool = aioredis.ConnectionPool(host=host, port=port,
147+
db=0, decode_responses=True, max_connections=16)
148+
149+
loop = asyncio.new_event_loop()
150+
success = loop.run_until_complete(run_tasks(pool=async_pool, key_name=key_name, value_size=value_size, keys_count=keys_count))
151+
return success
152+
153+
154+
if __name__ == "__main__":
155+
parser = argparse.ArgumentParser(description='active memory testing', formatter_class=argparse.ArgumentDefaultsHelpFormatter)
156+
parser.add_argument('-k', '--keys', type=int, default=4023467, help='total number of keys')
157+
parser.add_argument('-v', '--value_size', type=int, default=64, help='size of the values')
158+
parser.add_argument('-n', '--key_name', type=str, default="key-for-testing", help='the base key name')
159+
parser.add_argument('-s', '--server', type=str, default="localhost", help='server host name')
160+
parser.add_argument('-p', '--port', type=int, default=6379, help='server port number')
161+
args = parser.parse_args()
162+
keys_num = args.keys
163+
key_name = args.key_name
164+
value_size = args.value_size
165+
host = args.server
166+
port = args.port
167+
print(f"running key deletion on {host}:{port} for keys {key_name} value size of {value_size} and number of keys {keys_num}")
168+
result = connect_and_run(key_name=key_name, value_size=value_size, keys_count=keys_num, host=host, port=port)
169+
if result == True:
170+
print("finished successfully")
171+
else:
172+
print("failed")

tools/requirements.txt

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
aioredis==2.0.1
2+
async_timeout==4.0.2
3+
pytoml==0.1.21
4+
PyYAML==6.0
5+
railroad==0.5.0
6+
redis==4.3.4
7+
requests==2.28.1

0 commit comments

Comments
 (0)