Skip to content

Commit db7b0ea

Browse files
committed
- aggreagation can be generic for now
1 parent d04c717 commit db7b0ea

File tree

5 files changed

+68
-100
lines changed

5 files changed

+68
-100
lines changed

sqlalchemy_collectd/client/sender.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@ def decorate(fn):
1414

1515

1616
class Sender(object):
17-
def __init__(self, hostname, stats_name):
17+
def __init__(self, hostname, stats_name, plugin="sqlalchemy"):
1818
self.hostname = hostname
1919
self.stats_name = stats_name
20+
self.plugin = plugin
2021

2122
def send(self, connection, collection_target, timestamp, interval, pid):
2223
for protocol_type, sender in senders:
2324
message_sender = protocol.MessageSender(
24-
protocol_type, self.hostname, "sqlalchemy",
25+
protocol_type, self.hostname, self.plugin,
2526
plugin_instance=self.stats_name, type_instance=str(pid),
2627
interval=interval
2728
)

sqlalchemy_collectd/server/aggregator.py

Lines changed: 10 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,31 @@ def avg(values):
66

77

88
class Aggregator(object):
9-
def __init__(self, interval=10):
9+
def __init__(self, bucket_names, interval=10):
1010
self.interval = interval
11-
self.pool_stats = TimeBucket(4, interval)
11+
self.buckets = {name: TimeBucket(4, interval) for name in bucket_names}
1212

13-
def set_pool_stats(
14-
self, hostname, progname, pid, timestamp, numpools,
13+
def set_stats(
14+
self, bucket_name, hostname, progname, pid, timestamp, numpools,
1515
checkedout, checkedin, detached, invalidated, total):
1616

17-
records = self.pool_stats.get_data(timestamp)
17+
bucket = self.buckets[bucket_name]
18+
records = bucket.get_data(timestamp)
1819
records[(hostname, progname, pid)] = (
1920
numpools, checkedout, checkedin, detached, invalidated, total
2021
)
2122

22-
def get_pool_stats_by_progname(self, timestamp, agg_func):
23-
return self._get_stats_by_progname(
24-
self.pool_stats, timestamp, agg_func)
25-
26-
def get_pool_stats_by_hostname(self, timestamp, agg_func):
27-
return self._get_stats_by_hostname(
28-
self.pool_stats, timestamp, agg_func)
29-
30-
def _get_stats_by_progname(self, bucket, timestamp, agg_func):
23+
def get_stats_by_progname(self, bucket_name, timestamp, agg_func):
24+
bucket = self.buckets[bucket_name]
3125
records = bucket.get_data(timestamp)
3226
for (hostname, progname), keys in itertools.groupby(
3327
sorted(records), key=lambda rec: (rec[0], rec[1])
3428
):
3529
recs = [records[key] for key in keys]
3630
yield hostname, progname, [agg_func(coll) for coll in zip(*recs)]
3731

38-
def _get_stats_by_hostname(self, bucket, timestamp, agg_func):
32+
def get_stats_by_hostname(self, bucket_name, timestamp, agg_func):
33+
bucket = self.buckets[bucket_name]
3934
records = bucket.get_data(timestamp)
4035
for hostname, keys in itertools.groupby(
4136
sorted(records), key=lambda rec: rec[0]

sqlalchemy_collectd/server/listener.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import threading
22

33

4-
def _receive(connection, aggregator, receiver):
4+
def _receive(connection, receiver):
55
while True:
6-
receiver.receive(connection, aggregator)
6+
receiver.receive(connection)
77

88

9-
def listen(connection, aggregator, receiver):
9+
def listen(connection, receiver):
1010
listen_thread = threading.Thread(
11-
target=_receive, args=(connection, aggregator, receiver))
11+
target=_receive, args=(connection, receiver))
1212
listen_thread.daemon = True
1313
listen_thread.start()
1414

sqlalchemy_collectd/server/plugin.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,12 @@
33
import collectd
44
from . import listener
55
from . import receiver
6-
from . import aggregator
76
from .. import protocol
87
import time
98
import logging
109

1110
log = logging.getLogger(__name__)
1211

13-
aggregator_ = None
1412
receiver_ = None
1513

1614

@@ -28,22 +26,20 @@ def get_config(config):
2826

2927

3028
def start_plugin(config):
31-
global aggregator_
3229
global receiver_
3330

3431
config_dict = {elem.key: tuple(elem.values) for elem in config.children}
3532
host, port = config_dict.get("listen", ("localhost", 25827))
3633

37-
aggregator_ = aggregator.Aggregator()
3834
receiver_ = receiver.Receiver()
3935
connection = protocol.ServerConnection(host, int(port))
4036

41-
listener.listen(connection, aggregator_, receiver_)
37+
listener.listen(connection, receiver_)
4238

4339

4440
def read(data=None):
4541
now = time.time()
46-
receiver_.summarize(aggregator_, now)
42+
receiver_.summarize(now)
4743

4844

4945
collectd.register_config(get_config)

sqlalchemy_collectd/server/receiver.py

Lines changed: 49 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -7,98 +7,74 @@
77
import logging
88
log = logging.getLogger(__name__)
99

10-
receivers = {}
11-
summarizers = []
12-
13-
14-
def receives(protocol_type):
15-
def decorate(fn):
16-
receivers[protocol_type.name] = fn
17-
return fn
18-
19-
return decorate
20-
21-
22-
def summarizes(protocol_type):
23-
def decorate(fn):
24-
summarizers.append(fn)
25-
return fn
26-
27-
return decorate
28-
2910

3011
class Receiver(object):
31-
def __init__(self):
32-
self.message_receiver = protocol.MessageReceiver(
12+
def __init__(self, plugin="sqlalchemy"):
13+
self.plugin = plugin
14+
self.types = types_ = [
3315
types.pool,
3416
types.checkouts,
3517
types.commits,
3618
types.rollbacks,
3719
types.invalidated,
3820
types.transactions
21+
]
22+
self.message_receiver = protocol.MessageReceiver(*types_)
23+
24+
self.aggregator = aggregator.Aggregator(
25+
[type_.name for type_ in types_]
3926
)
4027

41-
def receive(self, connection, aggregator_):
28+
def receive(self, connection):
4229
data, host = connection.receive()
4330
message = self.message_receiver.receive(data)
44-
type_ = message[protocol.TYPE_TYPE]
31+
type_name = message[protocol.TYPE_TYPE]
4532
timestamp = message[protocol.TYPE_TIME]
4633
host = message[protocol.TYPE_HOST]
4734
progname = message[protocol.TYPE_PLUGIN_INSTANCE]
4835
values = message[protocol.TYPE_VALUES]
4936
pid = message[protocol.TYPE_TYPE_INSTANCE]
50-
try:
51-
receiver = receivers[type_]
52-
except KeyError:
53-
log.warn("Don't understand message type %s, skipping" % type_)
54-
else:
55-
receiver(
56-
message, timestamp, host, progname, pid, values, aggregator_)
57-
58-
def summarize(self, aggregator_, timestamp):
59-
for summarizer in summarizers:
60-
summarizer(aggregator_, timestamp)
61-
62-
63-
@receives(types.pool)
64-
def _receive_pool(
65-
message, timestamp, host, progname, pid, values, aggregator_):
66-
aggregator_.set_pool_stats(host, progname, pid, timestamp, *values)
67-
68-
69-
@summarizes(types.pool)
70-
def _summarize_pool(aggregator_, timestamp):
71-
values = collectd.Values(
72-
type=types.pool.name,
73-
plugin="sqlalchemy",
74-
time=timestamp,
75-
interval=aggregator_.interval
76-
)
77-
for hostname, progname, stats in \
78-
aggregator_.get_pool_stats_by_progname(timestamp, sum):
79-
values.dispatch(
80-
type_instance="sum", host=hostname, plugin_instance=progname,
81-
values=stats
82-
)
83-
84-
for hostname, stats in aggregator_.get_pool_stats_by_hostname(
85-
timestamp, sum):
86-
values.dispatch(
87-
type_instance="sum", host=hostname, plugin_instance="all",
88-
values=stats
37+
self.aggregator.set_stats(
38+
type_name, host, progname, pid, timestamp, *values
8939
)
9040

91-
for hostname, progname, stats in aggregator_.get_pool_stats_by_progname(
92-
timestamp, aggregator.avg):
93-
values.dispatch(
94-
type_instance="avg", host=hostname, plugin_instance=progname,
95-
values=stats
96-
)
41+
def summarize(self, timestamp):
42+
for type_ in self.types:
43+
self._summarize_for_type(type_, timestamp)
9744

98-
for hostname, stats in aggregator_.get_pool_stats_by_hostname(
99-
timestamp, aggregator.avg):
100-
values.dispatch(
101-
type_instance="avg", host=hostname, plugin_instance="all",
102-
values=stats
45+
def _summarize_for_type(self, type_, timestamp):
46+
values = collectd.Values(
47+
type=type_.name,
48+
plugin=self.plugin,
49+
time=timestamp,
50+
interval=self.aggregator.interval
10351
)
52+
for hostname, progname, stats in \
53+
self.aggregator.get_stats_by_progname(
54+
type_.name, timestamp, sum):
55+
values.dispatch(
56+
type_instance="sum", host=hostname, plugin_instance=progname,
57+
values=stats
58+
)
59+
60+
for hostname, stats in self.aggregator.get_stats_by_hostname(
61+
type_.name, timestamp, sum):
62+
values.dispatch(
63+
type_instance="sum", host=hostname, plugin_instance="all",
64+
values=stats
65+
)
66+
67+
for hostname, progname, stats in self.aggregator.get_stats_by_progname(
68+
type_.name, timestamp, aggregator.avg):
69+
values.dispatch(
70+
type_instance="avg", host=hostname, plugin_instance=progname,
71+
values=stats
72+
)
73+
74+
for hostname, stats in self.aggregator.get_stats_by_hostname(
75+
type_.name, timestamp, aggregator.avg):
76+
values.dispatch(
77+
type_instance="avg", host=hostname, plugin_instance="all",
78+
values=stats
79+
)
10480

0 commit comments

Comments
 (0)