Skip to content

Commit 57d6a68

Browse files
committed
- dev
1 parent 1aa1236 commit 57d6a68

File tree

7 files changed

+281
-42
lines changed

7 files changed

+281
-42
lines changed

sqlalchemy.conf

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,11 +32,11 @@ LoadPlugin "network"
3232
#
3333
# <host>/<plugin>-<program-name>/<type>-<process pid>
3434
#
35-
# hostname/sqlalchemy-nova/checkouts-12345
36-
# hostname/sqlalchemy-nova/checkouts-5839
37-
# hostname/sqlalchemy-nova/checkouts-9905
38-
# hostname/sqlalchemy-neutron/checkouts-19385
39-
# hostname/sqlalchemy-neutron/checkouts-6991
35+
# hostname/sqlalchemy-nova/sqlalchemy_pool-12345
36+
# hostname/sqlalchemy-nova/sqlalchemy_pool-5839
37+
# hostname/sqlalchemy-nova/sqlalchemy_pool-9905
38+
# hostname/sqlalchemy-neutron/sqlalchemy_pool-19385
39+
# hostname/sqlalchemy-neutron/sqlalchemy_pool-6991
4040
#
4141
# to:
4242
#

sqlalchemy_collectd/collectd.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import socket
33
import logging
44
import threading
5+
import os
56

67
log = logging.getLogger(__name__)
78

@@ -61,6 +62,12 @@ def __init__(self, name, *db_template):
6162
("midterm", VALUE_GAUGE),
6263
("longterm", VALUE_GAUGE)
6364
)
65+
66+
note: for all the great effort here in working up types,
67+
collectd aggregation plugin doesn't support more than one dsvalue
68+
at a time in a record so all this flexibility is all a waste of
69+
time :(
70+
6471
"""
6572
self.name = name
6673
self._value_types = [value_type for dsname, value_type in db_template]
@@ -127,15 +134,39 @@ def send(self, connection, timestamp, *values):
127134

128135

129136
class Connection(object):
137+
connections = {}
138+
create_mutex = threading.Lock()
139+
130140
def __init__(self, host="localhost", port=25826):
131141
self.host = host
132142
self.port = port
133-
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
134143
self._mutex = threading.Lock()
144+
self.socket = None
145+
self.pid = None
146+
147+
def _check_connect(self):
148+
if self.socket is None or self.pid != os.getpid():
149+
self.pid = os.getpid()
150+
self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
151+
152+
@classmethod
153+
def for_host_port(cls, host, port):
154+
cls.create_mutex.acquire()
155+
try:
156+
key = (host, port)
157+
if key not in cls.connections:
158+
cls.connections[key] = connection = Connection(host, port)
159+
return connection
160+
else:
161+
return cls.connections[key]
162+
163+
finally:
164+
cls.create_mutex.release()
135165

136166
def send(self, message):
137167
self._mutex.acquire()
138168
try:
169+
self._check_connect()
139170
log.debug("sending: %r", message)
140171
self.socket.sendto(message, (self.host, self.port))
141172
except IOError:

sqlalchemy_collectd/collector.py

Lines changed: 95 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,111 @@
11
from sqlalchemy import event
22
import threading
3+
import weakref
4+
import logging
5+
from . import worker
36

47

5-
class Collector(object):
6-
collectors = {}
7-
create_mutex = threading.Mutex()
8+
class CollectionTarget(object):
9+
targets = {}
10+
create_mutex = threading.Lock()
811

912
def __init__(self, name):
1013
self.name = name
1114

15+
self.collectors = weakref.WeakSet()
16+
1217
# all identifiers for known DBAPI connections
1318
self.connections = set()
1419

1520
# identifers for connections that have not been checked out
1621
# or were checked in
1722
self.checkedin = set()
1823

19-
# identifiers for connections where we've seen begin().
20-
# doesn't include DBAPI implicit transactions
21-
self.transactions = set()
22-
2324
# note these are prior to being closed and/or discarded
2425
self.invalidated = set()
2526

2627
# detached connections.
2728
self.detached = set()
2829

30+
# identifiers for connections where we've seen begin().
31+
# doesn't include DBAPI implicit transactions
32+
self.transactions = set()
33+
2934
@classmethod
30-
def collector_for_name(cls, name):
35+
def collection_for_name(cls, name):
3136
cls.create_mutex.acquire()
3237
try:
33-
if name not in cls.collectors:
34-
cls.collectors[name] = collector = Collector(name)
35-
return collector
38+
if name not in cls.targets:
39+
cls.targets[name] = collection = CollectionTarget(name)
40+
return collection
3641
else:
37-
return cls.collectors[name]
42+
return cls.targets[name]
3843
finally:
3944
cls.create_mutex.release()
4045

46+
@property
47+
def num_pools(self):
48+
return len(self.collectors)
49+
50+
@property
51+
def num_checkedout(self):
52+
checkedout = self.connections.\
53+
difference(self.detached).\
54+
difference(self.invalidated).\
55+
difference(self.checkedin)
56+
return len(checkedout)
57+
58+
@property
59+
def num_checkedin(self):
60+
return len(self.checkedin)
61+
62+
@property
63+
def num_detached(self):
64+
return len(self.detached)
65+
66+
@property
67+
def num_invalidated(self):
68+
return len(self.invalidated)
69+
70+
@property
71+
def num_connections(self):
72+
return len(self.connections)
73+
74+
@property
75+
def num_transactions(self):
76+
return len(self.transactions)
77+
78+
79+
class EngineCollector(object):
80+
81+
def __init__(self, collection_target, engine):
82+
self.collection_target = collection_target
83+
self.engine = engine
84+
collection_target.collectors.add(self)
85+
86+
eng = engine
87+
event.listen(eng, "connect", self._connect_evt)
88+
event.listen(eng, "checkout", self._checkout_evt)
89+
event.listen(eng, "checkin", self._checkin_evt)
90+
event.listen(eng, "invalidate", self._invalidate_evt)
91+
event.listen(eng, "soft_invalidate", self._invalidate_evt)
92+
event.listen(eng, "reset", self._reset_evt)
93+
event.listen(eng, "close", self._close_evt)
94+
event.listen(eng, "detach", self._detach_evt)
95+
event.listen(eng, "close_detached", self._close_detached_evt)
96+
97+
self.connections = collection_target.connections
98+
self.checkedin = collection_target.checkedin
99+
self.transactions = collection_target.transactions
100+
self.invalidated = collection_target.invalidated
101+
self.detached = collection_target.detached
102+
self.logger = logging.getLogger("%s.%s" % (__name__, eng.logging_name))
103+
41104
def conn_ident(self, dbapi_connection):
42105
return id(dbapi_connection)
43106

44107
def _connect_evt(self, dbapi_conn, connection_rec):
108+
worker._check_threads_started()
45109
id_ = self.conn_ident(dbapi_conn)
46110
self.connections.add(id_)
47111
self.checkedin.add(id_)
@@ -69,12 +133,23 @@ def _close_evt(self, dbapi_conn, connection_rec):
69133
self.invalidated.discard(id_)
70134
self.checkedin.discard(id_)
71135

72-
if not self.connections.discard(id_):
136+
try:
137+
self.connections.remove(id_)
138+
except KeyError:
73139
self._warn_missing_connection(dbapi_conn)
74140

75141
# this shouldn't be there
76-
if self.detached.discard(id_):
142+
if id_ in self.detached:
77143
self._warn("shouldn't have detached")
144+
self.detached.discard(id_)
145+
146+
def _warn_missing_connection(self, dbapi_conn):
147+
self._warn(
148+
"connection %s was closed but not part of "
149+
"total connections" % dbapi_conn)
150+
151+
def _warn(self, msg):
152+
self.logger.warn(msg)
78153

79154
def _detach_evt(self, dbapi_conn, connection_rec):
80155
id_ = self.conn_ident(dbapi_conn)
@@ -83,23 +158,15 @@ def _detach_evt(self, dbapi_conn, connection_rec):
83158
def _close_detached_evt(self, dbapi_conn):
84159
id_ = self.conn_ident(dbapi_conn)
85160

86-
if not self.connections.discard(id_):
87-
self._warn_missing_connection(dbapi_conn)
88-
89161
self.transactions.discard(id_)
90162
self.invalidated.discard(id_)
91163
self.checkedin.discard(id_)
92164
self.detached.discard(id_)
93165

94-
def add_engine(self, sqlalchemy_engine):
95-
eng = sqlalchemy_engine
96-
event.listen(eng, "connect", self._connect_evt)
97-
event.listen(eng, "checkout", self._checkout_evt)
98-
event.listen(eng, "checkin", self._checkin_evt)
99-
event.listen(eng, "invalidate", self._invalidate_evt)
100-
event.listen(eng, "soft_invalidate", self._invalidate_evt)
101-
event.listen(eng, "reset", self._reset_evt)
102-
event.listen(eng, "close", self._close_evt)
103-
event.listen(eng, "detach", self._detach_evt)
104-
event.listen(eng, "close_detached", self._close_detached_evt)
166+
try:
167+
self.connections.remove(id_)
168+
except KeyError:
169+
self._warn_missing_connection(dbapi_conn)
170+
171+
105172

sqlalchemy_collectd/plugin.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,14 @@
1+
import os
2+
import socket
3+
import sys
4+
15
from sqlalchemy.engine import CreateEnginePlugin
26

7+
from . import collectd
8+
from . import sender
9+
from . import worker
10+
from . import collector
11+
312

413
class Plugin(CreateEnginePlugin):
514
def __init__(self, url, kwargs):
@@ -19,4 +28,25 @@ def engine_created(self, engine):
1928
2029
"""
2130

31+
# TODO: all this configurable
32+
hostname = socket.gethostname()
33+
progname = sys.argv[0]
34+
35+
collectd_hostname = "localhost"
36+
collectd_port = 25826
37+
38+
sender_ = sender.Sender(hostname, progname)
39+
collection_target = collector.CollectionTarget.collection_for_name(
40+
progname)
41+
collector.EngineCollector(collection_target, engine)
42+
43+
connection = collectd.Connection.for_host_port(
44+
collectd_hostname, collectd_port)
45+
46+
worker.add_target(
47+
connection,
48+
collection_target,
49+
sender_)
50+
51+
2252

sqlalchemy_collectd/sender.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
from . import collectd
2+
3+
4+
senders = []
5+
6+
7+
def sends(name, type_):
8+
collectd_type = collectd.Type(
9+
name, ("value", type_)
10+
)
11+
12+
def decorate(fn):
13+
senders.append((collectd_type, fn))
14+
return fn
15+
16+
return decorate
17+
18+
19+
class Sender(object):
20+
def __init__(self, hostname, stats_name):
21+
self.hostname = hostname
22+
self.stats_name = stats_name
23+
24+
def send(self, connection, collection_target, timestamp, pid):
25+
for collectd_type, sender in senders:
26+
message_sender = collectd.MessageSender(
27+
collectd_type, self.hostname, "sqlalchemy",
28+
plugin_instance=self.stats_name, type_instance=str(pid)
29+
)
30+
value = sender(collection_target)
31+
message_sender.send(connection, timestamp, value)
32+
33+
34+
@sends("sqlalchemy_numpools", collectd.VALUE_GAUGE)
35+
def _numpools(collection_target):
36+
return collection_target.num_pools
37+
38+
39+
@sends("sqlalchemy_checkedout", collectd.VALUE_GAUGE)
40+
def _checkedout(collection_target):
41+
return collection_target.num_checkedout
42+
43+
44+
@sends("sqlalchemy_checkedin", collectd.VALUE_GAUGE)
45+
def _checkedin(collection_target):
46+
return collection_target.num_checkedin
47+
48+
49+
@sends("sqlalchemy_detached", collectd.VALUE_GAUGE)
50+
def _detached(collection_target):
51+
return collection_target.num_detached
52+
53+
54+
@sends("sqlalchemy_invalidated", collectd.VALUE_GAUGE)
55+
def _invalidated(collection_target):
56+
return collection_target.num_invalidated
57+
58+
59+
@sends("sqlalchemy_connections", collectd.VALUE_GAUGE)
60+
def _connections(collection_target):
61+
return collection_target.num_connections
62+

0 commit comments

Comments
 (0)