Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 50 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,30 +24,57 @@ which will tell all the servers to create a client and make 1000 calls on the RP
To use a different driver you can either alter the transport url or use other configuration options, e.g.

ombt.py --conf rpc_backend rabbit --conf rabbit_host 127.0.0.1:5672

--------------------------------------------------------------------------------------------------------------------------

Setting up qpidd to work with the AMQP 1.0 driver requires qpid 0.26 or later, with 1.0 support enabled and appropriate queue and topic patterns specified, e.g.
Qpid C++ broker
---------------

Setting up qpidd to work with the AMQP 1.0 driver requires qpid 0.26
or later, with 1.0 support enabled and appropriate queue and topic
patterns specified, e.g.

./src/qpidd --load-module=./src/amqp.so --auth no --queue-patterns exclusive --queue-patterns unicast --topic-patterns broadcast

Setting up Qpid Dispatch Router to work with the same driver requires 0.2[1] or later and adding the following address definitions to your config file(s):

fixed-address {
prefix: /unicast
fanout: single
bias: closest
}

fixed-address {
prefix: /exclusive
fanout: single
bias: closest
}

fixed-address {
prefix: /broadcast
fanout: multiple
}

[1] You will also probably want the patch attached to https://issues.apache.org/jira/browse/DISPATCH-51 if using 0.2

Qpid Dispatch Router
--------------------

Setting up Qpid Dispatch Router to work with the AMQP 1.0 driver
requires version 0.6.1 or later of the router. To configure the
router you must add the following address definitions to your
__qdrouterd__ configuration file (located by default in
/etc/qpid-dispatch/qdrouterd.conf):


address {
prefix: openstack.org/om/rpc/multicast
distribution: multicast
}

address {
prefix: openstack.org/om/rpc/unicast
distribution: closest
}

address {
prefix: openstack.org/om/rpc/anycast
distribution: balanced
}

address {
prefix: openstack.org/om/notify/multicast
distribution: multicast
}

address {
prefix: openstack.org/om/notify/unicast
distribution: closest
}

address {
prefix: openstack.org/om/notify/anycast
distribution: balanced
}



64 changes: 38 additions & 26 deletions ombt.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
#!/usr/bin/env python

# must be first import!
import eventlet
eventlet.monkey_patch()

import logging
import math
import optparse
Expand All @@ -10,8 +14,8 @@
import threading
import time

from oslo.config import cfg
from oslo import messaging
from oslo_config import cfg
import oslo_messaging as messaging


class Stats(object):
Expand Down Expand Up @@ -44,7 +48,8 @@ def _update(self, value, min_value=None, max_value=None, count=1, squared=None):
self._sum_of_squares += squared
n = float(self.count)
self.average = self.total / n
self.std_deviation = math.sqrt((self._sum_of_squares / n) - (self.average ** 2))
self.std_deviation = math.sqrt((self._sum_of_squares / n)
- (self.average ** 2))

def __str__(self):
return "min=%i, max=%i, avg=%f, std-dev=%f" % (self.min, self.max, self.average, self.std_deviation)
Expand Down Expand Up @@ -76,7 +81,9 @@ def __init__(self, transport, target, name):
self._data = None
self._controller = None

def start(self, ctx, controller, count=None, fanout=False, server=None, timeout=2, data="abcdefghijklmnopqrstuvwxyz"):
def start(self, ctx, controller, count=None, fanout=False, server=None,
timeout=2, data="abcdefghijklmnopqrstuvwxyz"):
# Endpoint method - start the RPC call test
if controller == self.name:
return
self._controller = controller
Expand All @@ -92,10 +99,10 @@ def start(self, ctx, controller, count=None, fanout=False, server=None, timeout=

stub = messaging.RPCClient(self._transport, target, timeout=timeout)
self._client = Client(stub)
self._thread = threading.Thread(target=self.run)
self._thread = threading.Thread(target=self._run)
self._thread.start()

def run(self):
def _run(self):
target = messaging.Target(exchange=self._target.exchange,
topic=self._target.topic,
server = self._controller)
Expand All @@ -105,8 +112,8 @@ def run(self):
stats = self._client.run(self._data, self._count)
ctrlr.cast({}, 'submit', server=self.name, results=stats)


def stop(self, ctx):
# Endpoint method - stop the test
if self._client:
self._client.stop()
self._client = None
Expand Down Expand Up @@ -136,16 +143,17 @@ def submit(self, ctxt, server, results):
l = Stats()
l.__dict__.update(results['latency'])
self.latency.merge(l)
print " result %i of %i submitted by %s; Throughput: %i, Latency:%s" % (self.count, self.expected(), server, t, l)
print(" result %i of %i submitted by %s; Throughput: %i, Latency:%s" % (self.count, self.expected(), server, t, l))

def report(self):
print
print "Latency (millisecs): %s" % self.latency
print "Throughput (calls/sec): %s" % self.throughput
print("\n")
print("Latency (millisecs): %s" % self.latency)
print("Throughput (calls/sec): %s" % self.throughput)

def is_complete(self):
return self.expected() and self.expected() <= self.count


class Client(object):
def __init__(self, client):
self._stopped = False
Expand All @@ -160,7 +168,7 @@ def run(self, value, count=None, verbose=False):
value = self._client.call({}, 'reverse', value=value)
self.calls += 1
if verbose and count and self.calls % (max(10, count)/10) == 0:
print "Call %i of %i completed" % (self.calls, count)
print("Call %i of %i completed" % (self.calls, count))
self.latency.update((time.time() - t) * 1000)
if count and self.calls >= count:
self._stopped = True
Expand All @@ -176,7 +184,7 @@ def get_stats(self):


class Server(object):
def __init__(self, transport, name, controller, workers):
def __init__(self, transport, name, controller, workers, executor):
target = messaging.Target(exchange="test-exchange",
topic="test-topic",
server=name)
Expand All @@ -189,28 +197,26 @@ def __init__(self, transport, name, controller, workers):
endpoints.append(self.collector)
else:
self.collector = None
self._server = messaging.get_rpc_server(transport, target, endpoints)
self._server = messaging.get_rpc_server(transport, target, endpoints,
executor)
self._ctrl = messaging.RPCClient(transport, target, timeout=2)

def start(self):
self._thread = threading.Thread(target=self._server.start)
self._thread.start()

def stop(self, wait=False):
self._server.stop()
def stop(self):
# tell minions to stop RPC call test
self._ctrl.cast({}, 'stop')
if wait:
self._server.wait()
self.wait()
self._server.stop()

def wait(self):
if self.collector:
while not self.collector.is_complete():
time.sleep(0.5)
self.stop()
while self._thread.is_alive():
time.sleep(0.5)
#self._thread.join()
self._server.wait()
self._thread.join()


def handle_config_option(option, opt_string, opt_value, parser):
Expand All @@ -229,18 +235,21 @@ def main(argv=None):
parser.add_option("--timeout", action="store", type=int, default=2)
parser.add_option("--config", action="callback",
callback=handle_config_option, nargs=2, type="string")
parser.add_option("--executor", action="store", default="eventlet",
help="Executor to use when servicing RPCs")

opts, extra = parser.parse_args(args=argv)

logging.basicConfig(level=logging.INFO) #make this an option
logging.basicConfig(level=logging.WARNING) #make this an option

if opts.url:
cfg.CONF.transport_url=opts.url

transport = messaging.get_transport(cfg.CONF)

server_name = opts.id or "%s_%s" % (socket.gethostname(), os.getpid())
server = Server(transport, server_name, opts.controller, opts.workers)
server = Server(transport, server_name, opts.controller, opts.workers,
opts.executor)
def signal_handler(s, f):
server.stop()
signal.signal(signal.SIGINT, signal_handler)
Expand All @@ -251,18 +260,21 @@ def signal_handler(s, f):
target = messaging.Target(exchange="test-exchange",
topic="test-topic",fanout=True)
stub = messaging.RPCClient(transport, target, timeout=2)
stub.cast({}, 'start', controller=server_name, count=opts.calls,timeout=opts.timeout)
# signal all minions to start calling
stub.cast({}, 'start', controller=server_name, count=opts.calls,
timeout=opts.timeout)
elif opts.calls:
target = messaging.Target(exchange="test-exchange",
topic="test-topic")
stub = messaging.RPCClient(transport, target, timeout=2)
client = Client(stub)
stats = client.run("abcdefghijklmnopqrstuvwxyz", count=opts.calls, verbose=True)
print stats
print("%s" % str(stats))
server.stop()
server.wait()
if opts.controller:
server.collector.report()
transport.cleanup()
return 0

if __name__ == "__main__":
Expand Down