Skip to content

Commit a927485

Browse files
rwlock: Add benchmark, fix bugs
1 parent 1492bb1 commit a927485

File tree

5 files changed

+392
-20
lines changed

5 files changed

+392
-20
lines changed

benchmarks/rwlock_cache.py

Lines changed: 337 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,337 @@
1+
"""Simulation of a pool of workers reading a cached value which
2+
occasionally must be replaced when it expires.
3+
"""
4+
5+
import random
6+
import threading
7+
import time
8+
import uuid
9+
from dataclasses import dataclass
10+
from datetime import datetime
11+
from numbers import Number
12+
from pathlib import Path
13+
from typing import Literal
14+
from typing import Optional
15+
from typing import Self
16+
from typing import TextIO
17+
from typing import TypeAlias
18+
from typing import Union
19+
20+
import pandas as pd
21+
22+
from redis.client import Redis
23+
from redis.exceptions import LockMaxWritersError
24+
from redis.rwlock import RwLock
25+
26+
27+
def _now() -> datetime:
28+
return datetime.now()
29+
30+
31+
AcquireStatus: TypeAlias = Union[
32+
Literal['success'],
33+
Literal['timeout'],
34+
Literal['aborted'],
35+
]
36+
37+
38+
@dataclass
39+
class InvocationMetric:
40+
timestamp: Optional[datetime] = None
41+
read_acquire_time: Optional[float] = None
42+
read_acquire_status: Optional[AcquireStatus] = None
43+
read_release_time: Optional[float] = None
44+
write_acquire_time: Optional[float] = None
45+
write_acquire_status: Optional[AcquireStatus] = None
46+
write_release_time: Optional[float] = None
47+
48+
49+
@dataclass
50+
class TimeSeriesMetric:
51+
timestamp: datetime
52+
num_readers: int
53+
num_waiting_writers: int
54+
55+
@staticmethod
56+
def collect(lock: RwLock) -> Self:
57+
metric = TimeSeriesMetric(
58+
timestamp=_now(),
59+
num_readers=lock.redis.zcard(lock._reader_lock_name()),
60+
num_waiting_writers=lock.redis.zcard(lock._writer_semaphore_name()),
61+
)
62+
assert metric.num_waiting_writers <= 1
63+
return metric
64+
65+
66+
class Worker:
67+
# Keys used:
68+
# - worker_invocations: Total worker invocations
69+
# - current_key: Holds the read counter key. Value is a random
70+
# UUID4. Expires after `ttl` seconds.
71+
# - previous_key: Previous value of current_key. Does not expire.
72+
# - total: Total of all increments to the read counter. Should equal
73+
# worker_invocations at the end.
74+
75+
lock: RwLock
76+
ttl: float
77+
io_time: float
78+
metrics: list[InvocationMetric]
79+
series: list[TimeSeriesMetric]
80+
81+
def __init__(
82+
self,
83+
lock: RwLock,
84+
ttl: float,
85+
io_time: float,
86+
) -> None:
87+
self.lock = lock
88+
self.ttl = ttl
89+
self.io_time = io_time
90+
self.metrics = []
91+
self.series = []
92+
93+
@property
94+
def redis(self) -> Redis:
95+
return self.lock.redis
96+
97+
def rand_io_time(self) -> float:
98+
mean = self.io_time
99+
std = mean
100+
shape = mean**2 / std**2
101+
scale = std**2 / mean
102+
return random.gammavariate(shape, scale)
103+
104+
def replace_key(self, metric: InvocationMetric) -> None:
105+
write_guard = self.lock.write()
106+
107+
# Acquire lock for writing
108+
acquire_start = time.perf_counter()
109+
try:
110+
acquired = write_guard.acquire()
111+
except LockMaxWritersError:
112+
# Another worker has the lock; abort
113+
metric.write_acquire_status = 'aborted'
114+
return
115+
metric.write_acquire_time = time.perf_counter() - acquire_start
116+
self.series.append(TimeSeriesMetric.collect(self.lock))
117+
if not acquired:
118+
metric.write_acquire_status = 'timeout'
119+
return
120+
121+
metric.write_acquire_status = 'success'
122+
123+
def release() -> None:
124+
release_start = time.perf_counter()
125+
write_guard.release()
126+
metric.write_release_time = time.perf_counter() - release_start
127+
self.series.append(TimeSeriesMetric.collect(self.lock))
128+
129+
if self.redis.exists('current_key'):
130+
release()
131+
return
132+
133+
# Update total with writes to previous key
134+
previous_key: bytes = self.redis.get('previous_key')
135+
if previous_key:
136+
previous_value = self.redis.get(previous_key)
137+
if previous_value:
138+
self.redis.incrby('total', int(previous_value))
139+
140+
# Pretend to do I/O
141+
time.sleep(self.rand_io_time())
142+
143+
# Update keys
144+
new_key = f'cache:{uuid.uuid4().hex}'
145+
self.redis.set('current_key', new_key, px=int(self.ttl * 1000))
146+
self.redis.set('previous_key', new_key)
147+
148+
release()
149+
150+
def work_inner(self, metric: InvocationMetric) -> None:
151+
read_guard = self.lock.read()
152+
153+
# Acquire lock for reading
154+
acquire_start = time.perf_counter()
155+
acquired = read_guard.acquire()
156+
metric.read_acquire_time = time.perf_counter() - acquire_start
157+
self.series.append(TimeSeriesMetric.collect(self.lock))
158+
if not acquired:
159+
metric.read_acquire_status = 'timeout'
160+
return
161+
metric.read_acquire_status = 'success'
162+
163+
def release() -> None:
164+
release_start = time.perf_counter()
165+
read_guard.release()
166+
metric.read_release_time = time.perf_counter() - release_start
167+
self.series.append(TimeSeriesMetric.collect(self.lock))
168+
169+
current_key = self.redis.get('current_key')
170+
if current_key:
171+
# Key exists; simulate I/O and bump counters
172+
time.sleep(self.rand_io_time())
173+
174+
self.redis.incr(current_key)
175+
self.redis.incr('worker_invocations')
176+
177+
release()
178+
else:
179+
# Key does not exist; release lock and try to update key
180+
release()
181+
self.replace_key(metric)
182+
183+
def work(self) -> None:
184+
metric = InvocationMetric()
185+
self.work_inner(metric)
186+
metric.timestamp = _now()
187+
self.metrics.append(metric)
188+
189+
def loop(self, stop_at: float) -> None:
190+
while time.time() < stop_at:
191+
self.work()
192+
193+
194+
def write_headers(csv_file: TextIO) -> None:
195+
headers = [
196+
'timestamp',
197+
'num_readers',
198+
'num_waiting_writers',
199+
'num_workers',
200+
'ttl',
201+
]
202+
df = pd.DataFrame(columns=headers)
203+
df.to_csv(csv_file, mode='w', header=True, index=False)
204+
205+
206+
def write_time_series(
207+
csv_file: TextIO,
208+
n: int,
209+
ttl: Number,
210+
time_series: list[TimeSeriesMetric],
211+
) -> None:
212+
ts_records = [
213+
{
214+
'timestamp': metric.timestamp.isoformat(),
215+
'num_readers': metric.num_readers,
216+
'num_waiting_writers': metric.num_waiting_writers,
217+
'num_workers': n,
218+
'ttl': ttl,
219+
}
220+
for metric in time_series
221+
]
222+
ts_df = pd.DataFrame(ts_records)
223+
ts_df.to_csv(csv_file, mode='a', header=False, index=False)
224+
225+
226+
def display_metrics(
227+
n: int,
228+
ttl: Number,
229+
invocation_metrics: list[InvocationMetric],
230+
) -> None:
231+
inv_df = pd.DataFrame.from_records([
232+
{
233+
'timestamp': metric.timestamp.isoformat() if metric.timestamp else None,
234+
'read_acquire_time': metric.read_acquire_time,
235+
'read_release_time': metric.read_release_time,
236+
'write_acquire_time': metric.write_acquire_time,
237+
'write_release_time': metric.write_release_time,
238+
'read_acquire_status': metric.read_acquire_status,
239+
'write_acquire_status': metric.write_acquire_status,
240+
}
241+
for metric in invocation_metrics
242+
])
243+
metric_columns = [
244+
'read_acquire_time',
245+
'read_release_time',
246+
'write_acquire_time',
247+
'write_release_time',
248+
]
249+
250+
stats_df = pd.DataFrame(index=metric_columns)
251+
inv_df[metric_columns] = inv_df[metric_columns].apply(pd.to_numeric, errors='coerce')
252+
stats_df['min'] = inv_df[metric_columns].min()
253+
stats_df['mean'] = inv_df[metric_columns].mean()
254+
stats_df['p95'] = inv_df[metric_columns].quantile(0.95)
255+
stats_df['max'] = inv_df[metric_columns].max()
256+
257+
cols = ('read_acquire_status', 'write_acquire_status')
258+
percentages = {}
259+
for col in cols:
260+
mask = inv_df[col].notna()
261+
percentages[col] = inv_df[mask][col].value_counts()
262+
status_df = pd.DataFrame(percentages).T.fillna(0)
263+
status_df = status_df.reindex(columns=['success', 'timeout', 'aborted'], fill_value=0)
264+
265+
print(stats_df.to_string(float_format=lambda x: f'{1e3 * x:.2f}ms'))
266+
print(status_df.to_string(float_format=lambda x: f'{x:.0f}'))
267+
print()
268+
269+
270+
def main() -> None:
271+
num_workers = [1, 2, 4, 8]
272+
ttl_values = [0.05, 0.1, 0.25, 0.5, 1]
273+
duration = 5
274+
io_time = 0.025
275+
cache_dir = Path('.cache')
276+
cache_dir.mkdir(exist_ok=True)
277+
csv_path = cache_dir / 'rwlock_cache.csv'
278+
csv_file = open(csv_path, 'w')
279+
write_headers(csv_file)
280+
281+
for n in num_workers:
282+
for ttl in ttl_values:
283+
redis = Redis(db=11)
284+
redis.flushdb()
285+
286+
lock = RwLock(
287+
redis=redis,
288+
prefix='lock',
289+
timeout=10,
290+
sleep=io_time,
291+
blocking_timeout=1,
292+
max_writers=1,
293+
)
294+
295+
stop_at = time.time() + duration
296+
297+
# Spawn workers
298+
workers = [Worker(lock=lock, ttl=ttl, io_time=io_time) for _ in range(n)]
299+
threads = [
300+
threading.Thread(target=worker.loop, args=(stop_at,), daemon=True) for worker in workers
301+
]
302+
for thread in threads:
303+
thread.start()
304+
305+
# Gather series metrics
306+
time_series = []
307+
while time.time() < stop_at:
308+
time_series.append(TimeSeriesMetric.collect(lock))
309+
time.sleep(0.01)
310+
311+
# Wait for workers
312+
for thread in threads:
313+
thread.join()
314+
315+
# Verify that total == # invocations
316+
total = int(redis.get('total') or 0)
317+
total += int(redis.get(redis.get('previous_key')) or 0)
318+
worker_invocations = int(redis.get('worker_invocations') or 0)
319+
assert worker_invocations == total
320+
321+
# Write time series data
322+
for worker in workers:
323+
time_series.extend(worker.series)
324+
write_time_series(csv_file, n, ttl, time_series)
325+
326+
# Print stats
327+
print(f'n = {n}, ttl = {ttl}')
328+
writes = len(redis.keys('cache:*'))
329+
print(f'iops: {(writes + worker_invocations) / duration:.2f}')
330+
331+
# Display metrics
332+
invocation_metrics = [metric for worker in workers for metric in worker.metrics]
333+
display_metrics(n, ttl, invocation_metrics)
334+
335+
336+
if __name__ == '__main__':
337+
main()

dev_requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ click==8.0.4
33
invoke==2.2.0
44
mock
55
packaging>=20.4
6+
pandas
67
pytest
78
pytest-asyncio>=0.23.0
89
pytest-cov

redis/exceptions.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,12 @@ class LockNotOwnedError(LockError):
9494
pass
9595

9696

97+
class LockMaxWritersError(LockError):
98+
"Error trying to acquire a lock that has reached the max number of writers."
99+
100+
pass
101+
102+
97103
class ChildDeadlockedError(Exception):
98104
"Error indicating that a child process is deadlocked after a fork()"
99105

0 commit comments

Comments
 (0)