Skip to content

Commit 29060e9

Browse files
authored
Abstract out SimpleThreadedWorkerPool from RiakClient. (#5499)
This isolates out the worker pool logic from the Riak client itself so that its behavior is easier to understand. The primary side effect of this change is that it causes the thread pool to be set up when the first job is submitted, which should prevent a deadlock that would manifest itself when the Riak client was instantiated in the parent process of a multi-process environment. This also makes some other minor changes for the sake of readability (e.g. using `partial` rather than binding `key` and `event` to callback function as default arguments, and unpacking the tuple argument provided to `SimpleThreadedWorkerPool.submit` to ensure it is the correct length.) This also fixes the same issue as GH-5411 (since otherwise it would conflict.)
1 parent 0dfd0c6 commit 29060e9

File tree

1 file changed

+55
-32
lines changed

1 file changed

+55
-32
lines changed

src/sentry/nodestore/riak/client.py

Lines changed: 55 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from __future__ import absolute_import
1010

11+
import functools
1112
import six
1213
import sys
1314
import socket
@@ -36,37 +37,54 @@ def encode_basic_auth(auth):
3637
return 'Basic ' + b64encode(auth).decode('utf-8')
3738

3839

40+
class SimpleThreadedWorkerPool(object):
41+
"""\
42+
Manages a simple threaded worker pool. The pool will be started when the
43+
first job is submitted, and will run to process completion.
44+
"""
45+
def __init__(self, size):
46+
assert size > 0, 'pool must have at laest one worker thread'
47+
48+
self.__started = False
49+
self.__size = size
50+
51+
def __start(self):
52+
self.__tasks = tasks = Queue()
53+
54+
def consumer():
55+
while True:
56+
func, args, kwargs, cb = tasks.get()
57+
try:
58+
rv = func(*args, **kwargs)
59+
except Exception as e:
60+
rv = e
61+
finally:
62+
cb(rv)
63+
tasks.task_done()
64+
65+
for _ in range(self.__size):
66+
t = Thread(target=consumer)
67+
t.setDaemon(True)
68+
t.start()
69+
70+
def submit(self, (func, arg, kwargs, cb)):
71+
"""\
72+
Submit a task to the worker pool.
73+
"""
74+
if not self.__started:
75+
self.__start()
76+
77+
self.__tasks.put((func, arg, kwargs, cb))
78+
79+
3980
class RiakClient(object):
4081
"""
4182
A thread-safe simple light-weight riak client that does only
4283
the bare minimum.
4384
"""
4485
def __init__(self, multiget_pool_size=5, **kwargs):
4586
self.manager = ConnectionManager(**kwargs)
46-
self.queue = Queue()
47-
48-
# TODO: maybe start this lazily? Probably not valuable though
49-
# since we definitely will need it.
50-
self._start(multiget_pool_size)
51-
52-
def _start(self, size):
53-
assert size > 0
54-
for _ in range(size):
55-
t = Thread(target=self._target)
56-
t.setDaemon(True)
57-
t.start()
58-
59-
def _target(self):
60-
q = self.queue
61-
while True:
62-
func, args, kwargs, cb = q.get()
63-
try:
64-
rv = func(*args, **kwargs)
65-
except Exception as e:
66-
rv = e
67-
finally:
68-
cb(rv)
69-
q.task_done()
87+
self.pool = SimpleThreadedWorkerPool(multiget_pool_size)
7088

7189
def build_url(self, bucket, key, qs):
7290
url = '/buckets/%s/keys/%s' % tuple(map(quote_plus, (bucket, key)))
@@ -108,22 +126,27 @@ def multiget(self, bucket, keys, headers=None, **kwargs):
108126
"""
109127
# Each request is paired with a thread.Event to signal when it is finished
110128
requests = [
111-
(key, self.build_url(bucket, key, {'foo': 'bar'}), Event())
129+
(key, self.build_url(bucket, key, kwargs), Event())
112130
for key in keys
113131
]
114132

115133
results = {}
116-
for key, url, event in requests:
117-
def callback(rv, key=key, event=event):
118-
results[key] = rv
119-
# Signal that this request is finished
120-
event.set()
121134

122-
self.queue.put((
135+
def callback(key, event, rv):
136+
results[key] = rv
137+
# Signal that this request is finished
138+
event.set()
139+
140+
for key, url, event in requests:
141+
self.pool.submit((
123142
self.manager.urlopen, # func
124143
('GET', url), # args
125144
{'headers': headers}, # kwargs
126-
callback, # callback
145+
functools.partial(
146+
callback,
147+
key,
148+
event,
149+
), # callback
127150
))
128151

129152
# Now we wait for all of the callbacks to be finished

0 commit comments

Comments
 (0)