-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbenchmark.py
More file actions
114 lines (87 loc) · 2.52 KB
/
benchmark.py
File metadata and controls
114 lines (87 loc) · 2.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import logging
import time
import threading
from lru_cache.abstract_lru_cache import (
Serializer,
AbstractLRUCache,
ProxyCache,
CacheError)
LOGGER = logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG,
format="%(asctime)s %(threadName)s "
"%(filename)s:%(lineno)d %(message)s",
datefmt="%Y-%m-%d %H:%M:%S")
THREAD_COUNT = 100
LOOP_COUNT = 15000
CACHE_COUNT = 5
GROUP_COUNT = 20
class TestSerializer(Serializer):
def loads(self, data):
return data
def dumps(self, obj):
return 4, obj
class MemoryLRUCache(AbstractLRUCache):
def __init__(self, *a, **kw):
AbstractLRUCache.__init__(self, *a, **kw)
def load(self):
yield 0
def prepare(self):
self._d = {}
def finalize(self):
self._d = None
def read_cache(self, key):
return self._d[key]
def write_cache(self, key, value):
self._d[key] = value
def delete_cache(self, key):
self._d.pop(key, None)
proxy_cache = ProxyCache()
proxy_cache.set_serializer(TestSerializer())
proxy_cache.set_call_func_when_failure(False)
proxy_cache.set_key_func(lambda _, key, *a, **kw: key)
for cache_id in range(CACHE_COUNT):
cache = MemoryLRUCache(
name="memory-cache-%d" % cache_id,
max_entry_count=10000,
max_size=10*1024*1024*1024,
max_inactive=3600,
expire_interval=10,
forced_expire_interval=2,
min_uses=1,
lock_age=2,
wait_count=4)
cache.start()
cache.wait_for_usable()
proxy_cache.add_cache(cache)
@proxy_cache.deco
def func(key):
return key
def target(k):
for _ in range(LOOP_COUNT):
try:
func(k)
except CacheError:
LOGGER.error(
"fail to call func",
exc_info=True)
def test(thread_count):
threads = []
for ind in range(thread_count):
group_id = ind % GROUP_COUNT
t = threading.Thread(target=target, args=(group_id, ))
t.start()
threads.append(t)
start_time = time.time()
for thread in threads:
thread.join()
time_used = time.time() - start_time
LOGGER.debug("use %.3fs", time_used)
LOGGER.debug("average: %fr/s",
(THREAD_COUNT*LOOP_COUNT/time_used))
if __name__ == "__main__":
try:
test(THREAD_COUNT)
finally:
for cache in proxy_cache.caches:
cache.stop()