Skip to content

Commit 9a5d45b

Browse files
committed
- work up client/server round trip
1 parent d53964e commit 9a5d45b

File tree

7 files changed

+190
-18
lines changed

7 files changed

+190
-18
lines changed

collectdconsole.conf

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# A config file that lets you test collectd locally.
2+
# Run just like this:
3+
# collectd -f -C collectdconsole.conf
4+
# received messages will be dumped as JSON to stdout.
5+
6+
TypesDB "/usr/share/collectd/types.db"
7+
TypesDB "/home/classic/dev/sqlalchemy-collectd/types.db"
8+
9+
LoadPlugin logfile
10+
<Plugin logfile>
11+
LogLevel info
12+
File STDOUT
13+
Timestamp true
14+
PrintSeverity false
15+
</Plugin>
16+
17+
LoadPlugin write_log
18+
<Plugin write_log>
19+
Format JSON
20+
</Plugin>
21+
22+
LoadPlugin python
23+
<Plugin python>
24+
ModulePath "/home/classic/dev/sqlalchemy-collectd"
25+
LogTraces true
26+
27+
</Plugin>
28+
29+
<Plugin python>
30+
Import "sqlalchemy_collectd.server.plugin"
31+
32+
<Module "sqlalchemy_collectd.server.plugin">
33+
listen "localhost" 25827
34+
35+
</Module>
36+
</Plugin>
37+

sqlalchemy_collectd/client/plugin.py

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -26,26 +26,27 @@ def engine_created(self, engine):
2626
registering engine or connection pool events.
2727
2828
"""
29+
start_plugin(engine)
2930

30-
# TODO: all this configurable
31-
hostname = socket.gethostname()
32-
progname = sys.argv[0]
3331

34-
collectd_hostname = "localhost"
35-
collectd_port = 25826
32+
def start_plugin(engine):
33+
# TODO: all this configurable
34+
hostname = socket.gethostname()
35+
progname = sys.argv[0]
3636

37-
sender_ = sender.Sender(hostname, progname)
38-
collection_target = collector.CollectionTarget.collection_for_name(
39-
progname)
40-
collector.EngineCollector(collection_target, engine)
37+
collectd_hostname = "localhost"
38+
collectd_port = 25827
4139

42-
connection = protocol.ClientConnection.for_host_port(
43-
collectd_hostname, collectd_port)
44-
45-
worker.add_target(
46-
connection,
47-
collection_target,
48-
sender_)
40+
sender_ = sender.Sender(hostname, progname)
41+
collection_target = collector.CollectionTarget.collection_for_name(
42+
progname)
43+
collector.EngineCollector(collection_target, engine)
4944

45+
connection = protocol.ClientConnection.for_host_port(
46+
collectd_hostname, collectd_port)
5047

48+
worker.add_target(
49+
connection,
50+
collection_target,
51+
sender_)
5152

sqlalchemy_collectd/protocol.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def receive(self, buf):
171171
type_ = self._types[type_name]
172172
except KeyError:
173173
log.warn("Type %s not known, skipping", type_name)
174+
return None
174175
else:
175176
result["type"] = type_
176177
result["values"] = {
@@ -202,7 +203,7 @@ def _unpack_long(self, type_, length, buf):
202203
return long_.unpack_from(buf, header.size)[0]
203204

204205
def _unpack_string(self, type_, length, buf):
205-
return buf[header.size:length - 1]
206+
return buf[header.size:length - 1].decode('ascii')
206207

207208
def _unpack_values(self, type_, length, buf):
208209
num = short.unpack_from(buf, header.size)[0]
@@ -261,4 +262,11 @@ def send(self, message):
261262

262263

263264
class ServerConnection(object):
264-
pass
265+
def __init__(self, host, port):
266+
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
267+
self.sock.bind((host, port))
268+
269+
def receive(self):
270+
data, addr = self.sock.recvfrom(1024)
271+
return data, addr
272+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import collections
2+
3+
4+
class Aggregator(object):
5+
def __init__(self):
6+
# TOOD: configurable size
7+
self.queue = collections.deque(maxlen=100000)
8+
9+
def put(self, message):
10+
self.queue.appendleft(message)
11+
12+
def outgoing(self):
13+
while self.queue:
14+
yield self.queue.pop()
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
import threading
2+
3+
4+
def _receive(connection, aggregator, receiver):
5+
while True:
6+
receiver.receive(connection, aggregator)
7+
8+
9+
def listen(connection, aggregator, receiver):
10+
listen_thread = threading.Thread(
11+
target=_receive, args=(connection, aggregator, receiver))
12+
listen_thread.daemon = True
13+
listen_thread.start()
14+
15+

sqlalchemy_collectd/server/plugin.py

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
from .. import __version__
2+
import sys
3+
import collectd
4+
from . import listener
5+
from . import receiver
6+
from . import aggregator
7+
from .. import protocol
8+
9+
aggregator_ = None
10+
11+
12+
def _notice(msg):
13+
collectd.notice("[sqlalchemy-collectd] %s" % msg)
14+
15+
16+
def get_config(config):
17+
global aggregator_
18+
19+
_notice("sqlalchemy_collectd plugin version %s" % __version__)
20+
_notice("Python version: %s" % sys.version)
21+
start_plugin(config)
22+
23+
24+
def start_plugin(config):
25+
global aggregator_
26+
27+
config_dict = {elem.key: tuple(elem.values) for elem in config.children}
28+
host, port = config_dict.get("listen", ("localhost", 25827))
29+
30+
aggregator_ = aggregator.Aggregator()
31+
receiver_ = receiver.Receiver()
32+
connection = protocol.ServerConnection(host, int(port))
33+
34+
listener.listen(connection, aggregator_, receiver_)
35+
36+
37+
def _read_raw_struct_to_values(message):
38+
# dispatch(
39+
# [type][, values][, plugin_instance]
40+
# [, type_instance][, plugin][, host][, time][, interval]) -> None.
41+
42+
return collectd.Values(
43+
type=message[protocol.TYPE_TYPE],
44+
values=message[protocol.TYPE_VALUES],
45+
plugin=message[protocol.TYPE_PLUGIN],
46+
host=message[protocol.TYPE_HOST],
47+
time=message[protocol.TYPE_TIME],
48+
type_instance=message[protocol.TYPE_TYPE_INSTANCE],
49+
plugin_instance=message[protocol.TYPE_PLUGIN_INSTANCE],
50+
interval=message[protocol.TYPE_INTERVAL]
51+
)
52+
53+
_read_struct_to_values = _read_raw_struct_to_values
54+
55+
56+
def read(data=None):
57+
for message in aggregator_.outgoing():
58+
values = _read_struct_to_values(message)
59+
values.dispatch()
60+
61+
62+
collectd.register_config(get_config)
63+
collectd.register_read(read)
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
from .. import protocol
2+
from .. import types
3+
4+
5+
receivers = []
6+
7+
8+
def receives(protocol_type):
9+
def decorate(fn):
10+
receivers.append((protocol_type, fn))
11+
return fn
12+
13+
return decorate
14+
15+
16+
class Receiver(object):
17+
def __init__(self):
18+
self.message_receiver = protocol.MessageReceiver(
19+
types.pool,
20+
types.checkouts,
21+
types.commits,
22+
types.rollbacks,
23+
types.invalidated,
24+
types.transactions
25+
)
26+
27+
def receive(self, connection, aggregator):
28+
data, host = connection.receive()
29+
message = self.message_receiver.receive(data)
30+
if message is not None:
31+
message['host'] = host
32+
# TODO: look up type-specific handler
33+
# feed to aggregator per-type
34+
aggregator.put(message)

0 commit comments

Comments
 (0)