Skip to content

Commit 052fd93

Browse files
some improvements and optimizations to KombuManager class
1 parent cc90275 commit 052fd93

File tree

1 file changed

+17
-12
lines changed

1 file changed

+17
-12
lines changed

socketio/kombu_manager.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,23 +45,28 @@ def __init__(self, url='amqp://guest:guest@localhost:5672//',
4545
'virtualenv).')
4646
super(KombuManager, self).__init__(channel=channel)
4747
self.url = url
48-
self.writer_conn = kombu.Connection(self.url)
49-
self.writer_queue = self._queue(self.writer_conn)
48+
self.producer = self._producer()
5049

51-
def _queue(self, conn=None):
52-
exchange = kombu.Exchange(self.channel, type='fanout', durable=False)
53-
queue = kombu.Queue(str(uuid.uuid4()), exchange,
54-
queue_arguments={'x-expires': 300000})
55-
return queue
50+
def _connection(self):
51+
return kombu.Connection(self.url)
52+
53+
def _exchange(self):
54+
return kombu.Exchange(self.channel, type='fanout', durable=False)
55+
56+
def _queue(self):
57+
queue_name = 'flask-socketio.' + str(uuid.uuid4())
58+
return kombu.Queue(queue_name, self._exchange(),
59+
queue_arguments={'x-expires': 300000})
60+
61+
def _producer(self):
62+
return self._connection().Producer(exchange=self._exchange())
5663

5764
def _publish(self, data):
58-
with self.writer_conn.SimpleQueue(self.writer_queue) as queue:
59-
queue.put(pickle.dumps(data))
65+
self.producer.publish(pickle.dumps(data))
6066

6167
def _listen(self):
62-
reader_conn = kombu.Connection(self.url)
63-
reader_queue = self._queue(reader_conn)
64-
with reader_conn.SimpleQueue(reader_queue) as queue:
68+
reader_queue = self._queue()
69+
with self._connection().SimpleQueue(reader_queue) as queue:
6570
while True:
6671
message = queue.get(block=True)
6772
message.ack()

0 commit comments

Comments
 (0)