Skip to content

Commit f1669f5

Browse files
committed
- rework the types to be internal, use built in types for reporting
1 parent e42a768 commit f1669f5

File tree

9 files changed

+156
-74
lines changed

9 files changed

+156
-74
lines changed

collectdconsole.conf

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,26 @@ LoadPlugin python
3535
</Module>
3636
</Plugin>
3737

38+
39+
LoadPlugin write_graphite
40+
41+
<Plugin write_graphite>
42+
<Node "local">
43+
Host localhost
44+
Port "2003"
45+
Protocol "tcp"
46+
ReconnectInterval 0
47+
LogSendErrors true
48+
Prefix "collectd."
49+
#Postfix "collectd"
50+
StoreRates true
51+
AlwaysAppendDS false
52+
EscapeCharacter "_"
53+
SeparateInstances false
54+
PreserveSeparator false
55+
DropDuplicateFields false
56+
</Node>
57+
</Plugin>
58+
59+
#LoadPlugin interface
60+
#LoadPlugin cpu

sqlalchemy_collectd/client/collector.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ def __init__(self, name):
3131
# doesn't include DBAPI implicit transactions
3232
self.transactions = set()
3333

34+
self.total_checkouts = 0
35+
self.total_invalidated = 0
36+
self.total_connects = 0
37+
self.total_disconnects = 0
38+
3439
@classmethod
3540
def collection_for_name(cls, name):
3641
cls.create_mutex.acquire()
@@ -107,11 +112,13 @@ def conn_ident(self, dbapi_connection):
107112
def _connect_evt(self, dbapi_conn, connection_rec):
108113
worker._check_threads_started()
109114
id_ = self.conn_ident(dbapi_conn)
115+
self.collection_target.total_connects += 1
110116
self.connections.add(id_)
111117
self.checkedin.add(id_)
112118

113119
def _checkout_evt(self, dbapi_conn, connection_rec, connection_proxy):
114120
id_ = self.conn_ident(dbapi_conn)
121+
self.collection_target.total_checkouts += 1
115122
self.checkedin.remove(id_)
116123

117124
def _checkin_evt(self, dbapi_conn, connection_rec):
@@ -120,6 +127,7 @@ def _checkin_evt(self, dbapi_conn, connection_rec):
120127

121128
def _invalidate_evt(self, dbapi_conn, connection_rec):
122129
id_ = self.conn_ident(dbapi_conn)
130+
self.collection_target.total_invalidated += 1
123131
self.invalidated.add(id_)
124132

125133
def _reset_evt(self, dbapi_conn, connection_rec):
@@ -135,6 +143,7 @@ def _close_evt(self, dbapi_conn, connection_rec):
135143

136144
try:
137145
self.connections.remove(id_)
146+
self.collection_target.total_disconnects += 1
138147
except KeyError:
139148
self._warn_missing_connection(dbapi_conn)
140149

@@ -165,6 +174,7 @@ def _close_detached_evt(self, dbapi_conn):
165174

166175
try:
167176
self.connections.remove(id_)
177+
self.collection_target.total_disconnects += 1
168178
except KeyError:
169179
self._warn_missing_connection(dbapi_conn)
170180

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from .. import protocol
2+
3+
pool = protocol.Type(
4+
"sqlalchemy_pool",
5+
("numpools", protocol.VALUE_GAUGE),
6+
("checkedout", protocol.VALUE_GAUGE),
7+
("checkedin", protocol.VALUE_GAUGE),
8+
("detached", protocol.VALUE_GAUGE),
9+
("invalidated", protocol.VALUE_GAUGE),
10+
("connections", protocol.VALUE_GAUGE),
11+
)
12+
13+
transactions = protocol.Type(
14+
"sqlalchemy_transactions",
15+
("commits", protocol.VALUE_DERIVE),
16+
("rollbacks", protocol.VALUE_DERIVE),
17+
("transactions", protocol.VALUE_DERIVE),
18+
)
19+
20+
totals = protocol.Type(
21+
"sqlalchemy_totals",
22+
("checkouts", protocol.VALUE_DERIVE),
23+
("invalidated", protocol.VALUE_DERIVE),
24+
("connects", protocol.VALUE_DERIVE),
25+
("disconnects", protocol.VALUE_DERIVE),
26+
)

sqlalchemy_collectd/client/sender.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from .. import protocol
2-
from .. import types
2+
from . import internal_types
33

44

55
senders = []
@@ -29,7 +29,7 @@ def send(self, connection, collection_target, timestamp, interval, pid):
2929
sender(message_sender, connection, collection_target, timestamp)
3030

3131

32-
@sends(types.pool)
32+
@sends(internal_types.pool)
3333
def _send_pool(message_sender, connection, collection_target, timestamp):
3434
message_sender.send(
3535
connection, timestamp,
@@ -40,3 +40,15 @@ def _send_pool(message_sender, connection, collection_target, timestamp):
4040
collection_target.num_invalidated,
4141
collection_target.num_connections
4242
)
43+
44+
45+
@sends(internal_types.totals)
46+
def _send_connection_totals(
47+
message_sender, connection, collection_target, timestamp):
48+
message_sender.send(
49+
connection, timestamp,
50+
collection_target.total_checkouts,
51+
collection_target.total_invalidated,
52+
collection_target.total_connects,
53+
collection_target.total_disconnects
54+
)

sqlalchemy_collectd/protocol.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ def __init__(self, name, *db_template):
9393
for value_type in self._value_types:
9494
self._message_template += char.pack(value_type)
9595

96+
@property
97+
def names(self):
98+
return self._field_names
99+
96100
def _encode_values(self, *values):
97101
"""Encode a series of values according to the type template."""
98102

sqlalchemy_collectd/server/aggregator.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ def get_stats_by_progname(self, bucket_name, timestamp, agg_func):
2424
sorted(records), key=lambda rec: (rec[0], rec[1])
2525
):
2626
recs = [records[key] for key in keys]
27-
yield hostname, progname, [agg_func(coll) for coll in zip(*recs)]
27+
yield (
28+
hostname, progname, len(recs),
29+
[agg_func(coll) for coll in zip(*recs)]
30+
)
2831

2932
def get_stats_by_hostname(self, bucket_name, timestamp, agg_func):
3033
bucket = self.buckets[bucket_name]
@@ -33,7 +36,7 @@ def get_stats_by_hostname(self, bucket_name, timestamp, agg_func):
3336
sorted(records), key=lambda rec: rec[0]
3437
):
3538
recs = [records[key] for key in keys]
36-
yield hostname, [agg_func(coll) for coll in zip(*recs)]
39+
yield hostname, len(recs), [agg_func(coll) for coll in zip(*recs)]
3740

3841

3942
class TimeBucket(object):
Lines changed: 74 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,35 @@
11
import collectd
22

33
from .. import protocol
4-
from .. import types
54
from . import aggregator
5+
from ..client import internal_types
66

77
import logging
88
log = logging.getLogger(__name__)
99

1010

11+
summarizers = {}
12+
13+
14+
def summarizes(protocol_type):
15+
def decorate(fn):
16+
summarizers[protocol_type] = fn
17+
return fn
18+
19+
return decorate
20+
21+
1122
class Receiver(object):
1223
def __init__(self, plugin="sqlalchemy"):
1324
self.plugin = plugin
14-
self.types = types_ = [
15-
types.pool,
16-
types.checkouts,
17-
types.commits,
18-
types.rollbacks,
19-
types.invalidated,
20-
types.transactions
25+
self.internal_types = [
26+
internal_types.pool,
27+
internal_types.totals
2128
]
22-
self.message_receiver = protocol.MessageReceiver(*types_)
29+
self.message_receiver = protocol.MessageReceiver(*self.internal_types)
2330

2431
self.aggregator = aggregator.Aggregator(
25-
[type_.name for type_ in types_]
32+
[type_.name for type_ in self.internal_types]
2633
)
2734

2835
def receive(self, connection):
@@ -39,42 +46,72 @@ def receive(self, connection):
3946
)
4047

4148
def summarize(self, timestamp):
42-
for type_ in self.types:
43-
self._summarize_for_type(type_, timestamp)
44-
45-
def _summarize_for_type(self, type_, timestamp):
46-
values = collectd.Values(
47-
type=type_.name,
48-
plugin=self.plugin,
49-
time=timestamp,
50-
interval=self.aggregator.interval
51-
)
52-
for hostname, progname, stats in \
53-
self.aggregator.get_stats_by_progname(
54-
type_.name, timestamp, sum):
49+
for type_ in self.internal_types:
50+
summarizer = summarizers.get(type_, None)
51+
if summarizer:
52+
summarizer(self, type_, timestamp)
53+
54+
55+
@summarizes(internal_types.pool)
56+
def _summarize_pool_stats(receiver, type_, timestamp):
57+
values = collectd.Values(
58+
type="count",
59+
plugin=receiver.plugin,
60+
time=timestamp,
61+
interval=receiver.aggregator.interval
62+
)
63+
for hostname, progname, numrecs, stats in \
64+
receiver.aggregator.get_stats_by_progname(
65+
type_.name, timestamp, sum):
66+
for name, value in zip(type_.names, stats):
5567
values.dispatch(
56-
type_instance="sum", host=hostname, plugin_instance=progname,
57-
values=stats
68+
host=hostname, plugin_instance=progname,
69+
type_instance=name,
70+
values=[value]
5871
)
5972

60-
for hostname, stats in self.aggregator.get_stats_by_hostname(
61-
type_.name, timestamp, sum):
73+
values.dispatch(
74+
host=hostname, plugin_instance=progname,
75+
type_instance="numprocs", values=[numrecs])
76+
77+
for hostname, numrecs, stats in receiver.aggregator.get_stats_by_hostname(
78+
type_.name, timestamp, sum):
79+
for name, value in zip(type_.names, stats):
6280
values.dispatch(
63-
type_instance="sum", host=hostname, plugin_instance="all",
64-
values=stats
81+
host=hostname, plugin_instance="host",
82+
type_instance=name,
83+
values=[value]
6584
)
85+
values.dispatch(
86+
host=hostname, plugin_instance="host",
87+
type_instance="numprocs", values=[numrecs])
6688

67-
for hostname, progname, stats in self.aggregator.get_stats_by_progname(
68-
type_.name, timestamp, aggregator.avg):
89+
90+
@summarizes(internal_types.totals)
91+
def _summarize_totals(receiver, type_, timestamp):
92+
values = collectd.Values(
93+
type="derive",
94+
plugin=receiver.plugin,
95+
time=timestamp,
96+
interval=receiver.aggregator.interval
97+
)
98+
99+
for hostname, progname, numrecs, stats in \
100+
receiver.aggregator.get_stats_by_progname(
101+
type_.name, timestamp, sum):
102+
for name, value in zip(type_.names, stats):
69103
values.dispatch(
70-
type_instance="avg", host=hostname, plugin_instance=progname,
71-
values=stats
104+
host=hostname, plugin_instance=progname,
105+
type_instance=name,
106+
values=[value]
72107
)
73108

74-
for hostname, stats in self.aggregator.get_stats_by_hostname(
75-
type_.name, timestamp, aggregator.avg):
109+
for hostname, numrecs, stats in receiver.aggregator.get_stats_by_hostname(
110+
type_.name, timestamp, sum):
111+
for name, value in zip(type_.names, stats):
76112
values.dispatch(
77-
type_instance="avg", host=hostname, plugin_instance="all",
78-
values=stats
113+
host=hostname, plugin_instance="host",
114+
type_instance=name,
115+
values=[value]
79116
)
80117

sqlalchemy_collectd/types.py

Lines changed: 0 additions & 27 deletions
This file was deleted.

types.db

Lines changed: 0 additions & 6 deletions
This file was deleted.

0 commit comments

Comments
 (0)