Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
cryptography, # See above for pypy-version-dependent requirement.
'zope.interface',
'Twisted>=13.2.0',
'txAMQP>=0.6.2',
'pika',
'PyYAML',
'iso8601',
'pyOpenSSL',
Expand Down
153 changes: 153 additions & 0 deletions vumi/amqp_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import pika
from pika.adapters.twisted_connection import TwistedProtocolConnection
from twisted.internet.defer import (
Deferred, succeed, inlineCallbacks, returnValue)
from twisted.internet.protocol import ClientFactory

from vumi.reconnecting_client import ReconnectingClientService


def _fire_and_return(r, d):
d.callback(r)
return r


def sub_deferred(parent_d):
d = Deferred()
parent_d.addCallback(_fire_and_return, d)
return d


class AMQPClientService(ReconnectingClientService):
"""
A service that manages an AMQP client connection
"""
_client = None

def __init__(self, endpoint):
factory = PikaClientFactory(pika.ConnectionParameters())
ReconnectingClientService.__init__(self, endpoint, factory)
self._connect_d = Deferred()

def await_connected(self):
return sub_deferred(self._connect_d)

def clientConnected(self, protocol):
ReconnectingClientService.clientConnected(self, protocol)
self._client = protocol
# If the client has a _fake_server attribute, we want to grab that so
# test code doesn't need to dig around in our guts.
if hasattr(protocol, '_fake_server'):
self._fake_server = protocol._fake_server
# `protocol.ready` is a Deferred that fires when the AMQP connection is
# open and is set to `None` after that. We need to handle both cases
# because the network might be faster than us.
d = succeed(None).addCallback(lambda _: protocol.ready)
return d.addCallback(self.ready_callback)

def clientConnectionLost(self, reason):
self._client = None
self._connect_d = Deferred()
ReconnectingClientService.clientConnectionLost(self, reason)

def ready_callback(self, _ignored):
self._connect_d.callback(self)

def get_client(self):
# TODO: Better exception.
assert self._client is not None, "AMQP not connected."
return self._client

def _declare_exchange(self, source, channel):
# get the details for AMQP
exchange_name = source.exchange_name
exchange_type = source.exchange_type
durable = source.durable
return channel.exchange_declare(exchange=exchange_name,
type=exchange_type, durable=durable)

@inlineCallbacks
def get_channel(self):
channel = yield self.get_client().channel()
patch_channel(channel, getattr(self, '_fake_server', None))
returnValue(channel)

@inlineCallbacks
def start_consumer(self, consumer_class, *args, **kwargs):
channel = yield self.get_channel()

consumer = consumer_class(channel, *args, **kwargs)
# consumer.vumi_options = self.vumi_options

# get the details for AMQP
exchange_name = consumer.exchange_name
durable = consumer.durable
queue_name = consumer.queue_name
routing_key = consumer.routing_key

# declare the exchange, doesn't matter if it already exists
yield self._declare_exchange(consumer, channel)

# declare the queue
yield channel.queue_declare(queue=queue_name, durable=durable)
# bind it to the exchange with the routing key
yield channel.queue_bind(queue=queue_name, exchange=exchange_name,
routing_key=routing_key)
yield consumer.start()
# return the newly created & consuming consumer
returnValue(consumer)

@inlineCallbacks
def start_publisher(self, publisher_class, *args, **kwargs):
channel = yield self.get_channel()
# start the publisher
publisher = publisher_class(*args, **kwargs)
# publisher.vumi_options = self.vumi_options
# declare the exchange, doesn't matter if it already exists
yield self._declare_exchange(publisher, channel)
# start!
yield publisher.start(channel)
# return the publisher
returnValue(publisher)


def patch_channel(channel, fake_server):
"""
Patch a channel object to fix some cases where it might wait forever.
"""
orig_basic_cancel = channel.basic_cancel

def basic_cancel(consumer_tag):
if consumer_tag not in channel.consumer_tags:
return succeed(None)
return orig_basic_cancel(consumer_tag=consumer_tag)
channel.basic_cancel = basic_cancel

def on_getempty(frame):
channel._on_getok_callback(None, frame.method, None, None)
channel.add_callback(on_getempty, ['Basic.GetEmpty'])

if fake_server is not None:
patch_fake_channel(channel, fake_server)


def patch_fake_channel(channel, fake_server):
"""
Patch a channel object connected to a fake broker with some things that are
useful in tests but don't exist in production.
"""
channel._fake_channel = fake_server.channels[channel.channel_number]
orig_basic_publish = channel.basic_publish

def basic_publish(*args, **kw):
return orig_basic_publish(*args, **kw).addCallback(
fake_server.broker.wait0)
channel.basic_publish = basic_publish


class PikaClientFactory(ClientFactory):
def __init__(self, connection_parameters):
self.connection_parameters = connection_parameters

def protocol(self):
return TwistedProtocolConnection(self.connection_parameters)
15 changes: 9 additions & 6 deletions vumi/blinkenlights/heartbeat/tests/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

from twisted.internet.defer import inlineCallbacks

from vumi.tests.fake_amqp import FakeAMQPBroker
from vumi.blinkenlights.heartbeat import publisher
from vumi.errors import MissingMessageField
from vumi.tests.helpers import VumiTestCase, WorkerHelper
Expand Down Expand Up @@ -35,15 +34,19 @@ def gen_fake_attrs(self):

@inlineCallbacks
def test_publish_heartbeat(self):
broker = FakeAMQPBroker()
client = WorkerHelper.get_fake_amqp_client(broker)
channel = yield client.get_channel()
worker_helper = self.add_helper(WorkerHelper())
client = worker_helper.get_fake_amqp_client(worker_helper.broker)
client.startService()
channel = yield client.await_connected().addCallback(
lambda c: c.get_channel())
pub = MockHeartBeatPublisher(self.gen_fake_attrs)
pub.start(channel)
pub._beat()

[msg] = broker.get_dispatched("vumi.health", "heartbeat.inbound")
self.assertEqual(json.loads(msg.body), self.gen_fake_attrs())
yield worker_helper.kick_delivery()
[msg] = worker_helper.broker.get_dispatched(
"vumi.health", "heartbeat.inbound")
self.assertEqual(json.loads(msg), self.gen_fake_attrs())

def test_message_validation(self):
attrs = self.gen_fake_attrs()
Expand Down
28 changes: 16 additions & 12 deletions vumi/blinkenlights/tests/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ def setUp(self):
@inlineCallbacks
def start_publisher(self, publisher):
client = WorkerHelper.get_fake_amqp_client(self.worker_helper.broker)
channel = yield client.get_channel()
client.startService()
channel = yield client.await_connected().addCallback(
lambda c: c.get_channel())
publisher.start(channel)

def _sleep(self, delay):
Expand Down Expand Up @@ -54,7 +56,7 @@ def test_publish_single_metric(self):
msg.append(
("vumi.test.%s" % (cnt.name,), cnt.aggs, [(time.time(), 1)]))
publisher.publish_message(msg)
self._check_msg("vumi.test.", cnt, [1])
yield self._check_msg("vumi.test.", cnt, [1])

def test_publisher_provides_interface(self):
publisher = metrics.MetricPublisher()
Expand All @@ -73,12 +75,14 @@ def on_publish(self, mm):
d.callback(mm)

def wait_publish(self):
return self._next_publish
return self._next_publish.addCallback(self.worker_helper.broker.wait0)

@inlineCallbacks
def start_manager_as_publisher(self, manager):
client = WorkerHelper.get_fake_amqp_client(self.worker_helper.broker)
channel = yield client.get_channel()
client.startService()
channel = yield client.await_connected().addCallback(
lambda c: c.get_channel())
manager.start(channel)
self.add_cleanup(manager.stop)

Expand Down Expand Up @@ -180,7 +184,7 @@ def test_publish_metrics_poll(self):

cnt.inc()
mm.publish_metrics()
self._check_msg(mm, cnt, [1])
yield self._check_msg(mm, cnt, [1])

@inlineCallbacks
def test_publish_metrics_oneshot(self):
Expand All @@ -190,7 +194,7 @@ def test_publish_metrics_oneshot(self):

mm.oneshot(cnt, 1)
mm.publish_metrics()
self._check_msg(mm, cnt, [1])
yield self._check_msg(mm, cnt, [1])

@inlineCallbacks
def test_start(self):
Expand All @@ -199,16 +203,16 @@ def test_start(self):
yield self.start_manager_as_publisher(mm)

self.assertTrue(mm._task is not None)
self._check_msg(mm, cnt, None)
yield self._check_msg(mm, cnt, None)

cnt.inc()
yield self.wait_publish()
self._check_msg(mm, cnt, [1])
yield self._check_msg(mm, cnt, [1])

cnt.inc()
cnt.inc()
yield self.wait_publish()
self._check_msg(mm, cnt, [1, 1])
yield self._check_msg(mm, cnt, [1, 1])

@inlineCallbacks
def test_publish_metrics(self):
Expand All @@ -220,7 +224,7 @@ def test_publish_metrics(self):
self.assertEqual(len(mm._oneshot_msgs), 1)
mm.publish_metrics()
self.assertEqual(mm._oneshot_msgs, [])
self._check_msg(mm, cnt, [1])
yield self._check_msg(mm, cnt, [1])

def test_publish_metrics_not_started_no_publisher(self):
mm = metrics.MetricManager("vumi.test.")
Expand All @@ -241,12 +245,12 @@ def test_in_worker(self):
acc = mm.register(metrics.Metric("my.acc"))
try:
self.assertTrue(mm._task is not None)
self._check_msg(mm, acc, None)
yield self._check_msg(mm, acc, None)

acc.set(1.5)
acc.set(1.0)
yield self.wait_publish()
self._check_msg(mm, acc, [1.5, 1.0])
yield self._check_msg(mm, acc, [1.5, 1.0])
finally:
mm.stop()

Expand Down
Loading