Skip to content

Commit d04c717

Browse files
committed
- aggregration up for pool stats
1 parent eeaf30f commit d04c717

File tree

4 files changed

+128
-40
lines changed

4 files changed

+128
-40
lines changed

sqlalchemy_collectd/server/aggregator.py

Lines changed: 41 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,47 @@
1-
import collections
1+
import itertools
22

33

4-
class Aggregator(object):
5-
def __init__(self):
6-
# TOOD: configurable size
7-
self.queue = collections.deque(maxlen=100000)
4+
def avg(values):
5+
return sum(values) / len(values)
86

9-
def put(self, message):
10-
self.queue.appendleft(message)
117

12-
def outgoing(self):
13-
while self.queue:
14-
yield self.queue.pop()
8+
class Aggregator(object):
9+
def __init__(self, interval=10):
10+
self.interval = interval
11+
self.pool_stats = TimeBucket(4, interval)
12+
13+
def set_pool_stats(
14+
self, hostname, progname, pid, timestamp, numpools,
15+
checkedout, checkedin, detached, invalidated, total):
16+
17+
records = self.pool_stats.get_data(timestamp)
18+
records[(hostname, progname, pid)] = (
19+
numpools, checkedout, checkedin, detached, invalidated, total
20+
)
21+
22+
def get_pool_stats_by_progname(self, timestamp, agg_func):
23+
return self._get_stats_by_progname(
24+
self.pool_stats, timestamp, agg_func)
25+
26+
def get_pool_stats_by_hostname(self, timestamp, agg_func):
27+
return self._get_stats_by_hostname(
28+
self.pool_stats, timestamp, agg_func)
29+
30+
def _get_stats_by_progname(self, bucket, timestamp, agg_func):
31+
records = bucket.get_data(timestamp)
32+
for (hostname, progname), keys in itertools.groupby(
33+
sorted(records), key=lambda rec: (rec[0], rec[1])
34+
):
35+
recs = [records[key] for key in keys]
36+
yield hostname, progname, [agg_func(coll) for coll in zip(*recs)]
37+
38+
def _get_stats_by_hostname(self, bucket, timestamp, agg_func):
39+
records = bucket.get_data(timestamp)
40+
for hostname, keys in itertools.groupby(
41+
sorted(records), key=lambda rec: rec[0]
42+
):
43+
recs = [records[key] for key in keys]
44+
yield hostname, [agg_func(coll) for coll in zip(*recs)]
1545

1646

1747
class TimeBucket(object):
@@ -61,6 +91,7 @@ def __init__(self, num_buckets, interval):
6191
self.interval = interval
6292

6393
def _get_bucket(self, timestamp):
94+
timestamp = int(timestamp)
6495
slot = (timestamp // self.interval)
6596
bucket_num = slot % self.num_buckets
6697
bucket = self.buckets[bucket_num]

sqlalchemy_collectd/server/plugin.py

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,16 @@
55
from . import receiver
66
from . import aggregator
77
from .. import protocol
8+
import time
9+
import logging
10+
11+
log = logging.getLogger(__name__)
812

913
aggregator_ = None
14+
receiver_ = None
1015

1116

17+
# TODO: merge collectd.notice w/ python logging?
1218
def _notice(msg):
1319
collectd.notice("[sqlalchemy-collectd] %s" % msg)
1420

@@ -23,6 +29,7 @@ def get_config(config):
2329

2430
def start_plugin(config):
2531
global aggregator_
32+
global receiver_
2633

2734
config_dict = {elem.key: tuple(elem.values) for elem in config.children}
2835
host, port = config_dict.get("listen", ("localhost", 25827))
@@ -34,29 +41,9 @@ def start_plugin(config):
3441
listener.listen(connection, aggregator_, receiver_)
3542

3643

37-
def _read_raw_struct_to_values(message):
38-
# dispatch(
39-
# [type][, values][, plugin_instance]
40-
# [, type_instance][, plugin][, host][, time][, interval]) -> None.
41-
42-
return collectd.Values(
43-
type=message[protocol.TYPE_TYPE],
44-
values=message[protocol.TYPE_VALUES],
45-
plugin=message[protocol.TYPE_PLUGIN],
46-
host=message[protocol.TYPE_HOST],
47-
time=message[protocol.TYPE_TIME],
48-
type_instance=message[protocol.TYPE_TYPE_INSTANCE],
49-
plugin_instance=message[protocol.TYPE_PLUGIN_INSTANCE],
50-
interval=message[protocol.TYPE_INTERVAL]
51-
)
52-
53-
_read_struct_to_values = _read_raw_struct_to_values
54-
55-
5644
def read(data=None):
57-
for message in aggregator_.outgoing():
58-
values = _read_struct_to_values(message)
59-
values.dispatch()
45+
now = time.time()
46+
receiver_.summarize(aggregator_, now)
6047

6148

6249
collectd.register_config(get_config)

sqlalchemy_collectd/server/receiver.py

Lines changed: 78 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,27 @@
1+
import collectd
2+
13
from .. import protocol
24
from .. import types
5+
from . import aggregator
36

7+
import logging
8+
log = logging.getLogger(__name__)
49

5-
receivers = []
10+
receivers = {}
11+
summarizers = []
612

713

814
def receives(protocol_type):
915
def decorate(fn):
10-
receivers.append((protocol_type, fn))
16+
receivers[protocol_type.name] = fn
17+
return fn
18+
19+
return decorate
20+
21+
22+
def summarizes(protocol_type):
23+
def decorate(fn):
24+
summarizers.append(fn)
1125
return fn
1226

1327
return decorate
@@ -24,11 +38,67 @@ def __init__(self):
2438
types.transactions
2539
)
2640

27-
def receive(self, connection, aggregator):
41+
def receive(self, connection, aggregator_):
2842
data, host = connection.receive()
2943
message = self.message_receiver.receive(data)
30-
if message is not None:
31-
message['host'] = host
32-
# TODO: look up type-specific handler
33-
# feed to aggregator per-type
34-
aggregator.put(message)
44+
type_ = message[protocol.TYPE_TYPE]
45+
timestamp = message[protocol.TYPE_TIME]
46+
host = message[protocol.TYPE_HOST]
47+
progname = message[protocol.TYPE_PLUGIN_INSTANCE]
48+
values = message[protocol.TYPE_VALUES]
49+
pid = message[protocol.TYPE_TYPE_INSTANCE]
50+
try:
51+
receiver = receivers[type_]
52+
except KeyError:
53+
log.warn("Don't understand message type %s, skipping" % type_)
54+
else:
55+
receiver(
56+
message, timestamp, host, progname, pid, values, aggregator_)
57+
58+
def summarize(self, aggregator_, timestamp):
59+
for summarizer in summarizers:
60+
summarizer(aggregator_, timestamp)
61+
62+
63+
@receives(types.pool)
64+
def _receive_pool(
65+
message, timestamp, host, progname, pid, values, aggregator_):
66+
aggregator_.set_pool_stats(host, progname, pid, timestamp, *values)
67+
68+
69+
@summarizes(types.pool)
70+
def _summarize_pool(aggregator_, timestamp):
71+
values = collectd.Values(
72+
type=types.pool.name,
73+
plugin="sqlalchemy",
74+
time=timestamp,
75+
interval=aggregator_.interval
76+
)
77+
for hostname, progname, stats in \
78+
aggregator_.get_pool_stats_by_progname(timestamp, sum):
79+
values.dispatch(
80+
type_instance="sum", host=hostname, plugin_instance=progname,
81+
values=stats
82+
)
83+
84+
for hostname, stats in aggregator_.get_pool_stats_by_hostname(
85+
timestamp, sum):
86+
values.dispatch(
87+
type_instance="sum", host=hostname, plugin_instance="all",
88+
values=stats
89+
)
90+
91+
for hostname, progname, stats in aggregator_.get_pool_stats_by_progname(
92+
timestamp, aggregator.avg):
93+
values.dispatch(
94+
type_instance="avg", host=hostname, plugin_instance=progname,
95+
values=stats
96+
)
97+
98+
for hostname, stats in aggregator_.get_pool_stats_by_hostname(
99+
timestamp, aggregator.avg):
100+
values.dispatch(
101+
type_instance="avg", host=hostname, plugin_instance="all",
102+
values=stats
103+
)
104+

sqlalchemy_collectd/server/tests/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)