Skip to content

Commit d53964e

Browse files
committed
- enterprise it!
1 parent 78f2adb commit d53964e

File tree

10 files changed

+201
-99
lines changed

10 files changed

+201
-99
lines changed

sqlalchemy_collectd/client/__init__.py

Whitespace-only changes.

sqlalchemy_collectd/plugin.py renamed to sqlalchemy_collectd/client/plugin.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import os
21
import socket
32
import sys
43

54
from sqlalchemy.engine import CreateEnginePlugin
65

7-
from . import collectd
6+
from .. import protocol
87
from . import sender
98
from . import worker
109
from . import collector
@@ -40,7 +39,7 @@ def engine_created(self, engine):
4039
progname)
4140
collector.EngineCollector(collection_target, engine)
4241

43-
connection = collectd.ClientConnection.for_host_port(
42+
connection = protocol.ClientConnection.for_host_port(
4443
collectd_hostname, collectd_port)
4544

4645
worker.add_target(

sqlalchemy_collectd/sender.py renamed to sqlalchemy_collectd/client/sender.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
1-
from . import collectd
2-
from . import types
1+
from .. import protocol
2+
from .. import types
33

44

55
senders = []
66

77

8-
def sends(collectd_type):
8+
def sends(protocol_type):
99
def decorate(fn):
10-
senders.append((collectd_type, fn))
10+
senders.append((protocol_type, fn))
1111
return fn
1212

1313
return decorate
@@ -19,9 +19,9 @@ def __init__(self, hostname, stats_name):
1919
self.stats_name = stats_name
2020

2121
def send(self, connection, collection_target, timestamp, interval, pid):
22-
for collectd_type, sender in senders:
23-
message_sender = collectd.MessageSender(
24-
collectd_type, self.hostname, "sqlalchemy",
22+
for protocol_type, sender in senders:
23+
message_sender = protocol.MessageSender(
24+
protocol_type, self.hostname, "sqlalchemy",
2525
plugin_instance=self.stats_name, type_instance=str(pid),
2626
interval=interval
2727
)
File renamed without changes.

sqlalchemy_collectd/collectd.py renamed to sqlalchemy_collectd/protocol.py

Lines changed: 88 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
"""Using the collectd protocol because we originally were going to
2+
connect straight to the network plugin.
3+
4+
"""
15
import struct
26
import socket
37
import logging
@@ -24,18 +28,16 @@
2428
TYPE_VALUES = 0x0006
2529
TYPE_INTERVAL = 0x0007
2630

27-
header = struct.Struct("!2H")
28-
number = struct.Struct("!Q")
31+
header = struct.Struct("!HH")
2932
short = struct.Struct("!H")
30-
double = struct.Struct("<d")
3133
char = struct.Struct("B")
3234
long_ = struct.Struct("!q")
3335

3436
_value_formats = {
35-
VALUE_COUNTER: number,
36-
VALUE_GAUGE: double,
37+
VALUE_COUNTER: struct.Struct("!Q"),
38+
VALUE_GAUGE: struct.Struct("<d"),
3739
VALUE_DERIVE: long_,
38-
VALUE_ABSOLUTE: number
40+
VALUE_ABSOLUTE: struct.Struct("!Q")
3941
}
4042

4143

@@ -56,7 +58,10 @@ class Type(object):
5658
5759
"""
5860

59-
__slots__ = 'name', '_value_types', '_value_formats', '_message_template'
61+
__slots__ = (
62+
'name', '_value_types', '_value_formats',
63+
'_message_template', '_field_names'
64+
)
6065

6166
def __init__(self, name, *db_template):
6267
"""Contruct a new Type.
@@ -77,6 +82,7 @@ def __init__(self, name, *db_template):
7782
7883
"""
7984
self.name = name
85+
self._field_names = [dsname for dsname, value_type in db_template]
8086
self._value_types = [value_type for dsname, value_type in db_template]
8187
self._value_formats = [
8288
_value_formats[value_type] for value_type in self._value_types]
@@ -87,7 +93,7 @@ def __init__(self, name, *db_template):
8793
for value_type in self._value_types:
8894
self._message_template += char.pack(value_type)
8995

90-
def encode_values(self, *values):
96+
def _encode_values(self, *values):
9197
"""Encode a series of values according to the type template."""
9298

9399
msg = self._message_template
@@ -137,11 +143,80 @@ def send(self, connection, timestamp, *values):
137143
long_.pack(int(timestamp)) + \
138144
self._remainder_message_parts
139145

140-
payload = self.type.encode_values(*values)
146+
payload = self.type._encode_values(*values)
141147

142148
connection.send(header_ + payload)
143149

144150

151+
class MessageReceiver(object):
152+
def __init__(self, *types):
153+
self._receivers = {
154+
TYPE_HOST: self._unpack_string,
155+
TYPE_TIME: self._unpack_long,
156+
TYPE_PLUGIN: self._unpack_string,
157+
TYPE_PLUGIN_INSTANCE: self._unpack_string,
158+
TYPE_TYPE: self._unpack_string,
159+
TYPE_TYPE_INSTANCE: self._unpack_string,
160+
TYPE_VALUES: self._unpack_values,
161+
TYPE_INTERVAL: self._unpack_long,
162+
}
163+
self._types = {
164+
type_.name: type_ for type_ in types
165+
}
166+
167+
def receive(self, buf):
168+
result = self._unpack_packet(buf)
169+
type_name = result[TYPE_TYPE]
170+
try:
171+
type_ = self._types[type_name]
172+
except KeyError:
173+
log.warn("Type %s not known, skipping", type_name)
174+
else:
175+
result["type"] = type_
176+
result["values"] = {
177+
name: value
178+
for name, value in zip(type_._field_names, result[TYPE_VALUES])
179+
}
180+
return result
181+
182+
def _unpack_packet(self, buf):
183+
pos = 0
184+
length = len(buf)
185+
result = {}
186+
while pos < length:
187+
type_, len_ = header.unpack_from(buf, pos)
188+
189+
try:
190+
fn = self._receivers[type_]
191+
except KeyError:
192+
log.warn("Message %s not known, skipping", type_)
193+
else:
194+
value = fn(type_, len_, buf[pos:])
195+
196+
result[type_] = value
197+
198+
pos += len_
199+
return result
200+
201+
def _unpack_long(self, type_, length, buf):
202+
return long_.unpack_from(buf, header.size)[0]
203+
204+
def _unpack_string(self, type_, length, buf):
205+
return buf[header.size:length - 1]
206+
207+
def _unpack_values(self, type_, length, buf):
208+
num = short.unpack_from(buf, header.size)[0]
209+
types_start = header.size + short.size
210+
values_pos = types_start + num * char.size
211+
result = []
212+
for pos in range(0, num * char.size, char.size):
213+
value_type = char.unpack_from(buf, types_start + pos)[0]
214+
struct_ = _value_formats[value_type]
215+
result.append(struct_.unpack_from(buf, values_pos)[0])
216+
values_pos += struct_.size
217+
return result
218+
219+
145220
class ClientConnection(object):
146221
connections = {}
147222
create_mutex = threading.Lock()
@@ -183,3 +258,7 @@ def send(self, message):
183258
log.error("Error in socket.sendto", exc_info=True)
184259
finally:
185260
self._mutex.release()
261+
262+
263+
class ServerConnection(object):
264+
pass

sqlalchemy_collectd/server/__init__.py

Whitespace-only changes.

sqlalchemy_collectd/tests/test_collectd.py

Lines changed: 0 additions & 62 deletions
This file was deleted.
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import mock
2+
import unittest
3+
4+
from sqlalchemy_collectd import protocol
5+
6+
7+
class CollectDProtocolTest(unittest.TestCase):
8+
9+
def test_encode_type_values(self):
10+
type_ = protocol.Type(
11+
"my_type",
12+
("some_val", protocol.VALUE_GAUGE),
13+
("some_other_val", protocol.VALUE_DERIVE)
14+
)
15+
16+
self.assertEqual(
17+
self.value_block,
18+
type_._encode_values(25.809, 450)
19+
)
20+
21+
value_block = (
22+
b'\x00\x06' # TYPE_VALUES
23+
b'\x00\x18' # part length
24+
b'\x00\x02' # number of values
25+
b'\x01\x02' # dstype codes GAUGE, DERIVE
26+
b'\xc9v\xbe\x9f\x1a\xcf9@' # 8 bytes for 25.809
27+
b'\x00\x00\x00\x00\x00\x00\x01\xc2' # 8 bytes for 450
28+
)
29+
30+
message = (
31+
b'\x00\x00\x00\rsomehost\x00' # TYPE_HOST
32+
b'\x00\x01\x00\x0c\x00\x00\x00\x00Zt\xd8\x82' # TYPE_TIME
33+
b'\x00\x02\x00\x0fsomeplugin\x00' # TYPE_PLUGIN
34+
# TYPE_PLUGIN_INSTANCE
35+
b'\x00\x03\x00\x17someplugininstance\x00'
36+
b'\x00\x04\x00\x0cmy_type\x00' # TYPE_TYPE
37+
# TYPE_TIMESTAMP
38+
b'\x00\x07\x00\x0c\x00\x00\x00\x00\x00\x00\x00\n'
39+
b'\x00\x05\x00\x15sometypeinstance\x00' # TYPE_TYPE_INSTANCE
40+
) + value_block
41+
42+
def test_message_construct(self):
43+
type_ = protocol.Type(
44+
"my_type",
45+
("some_val", protocol.VALUE_GAUGE),
46+
("some_other_val", protocol.VALUE_DERIVE)
47+
)
48+
49+
sender = protocol.MessageSender(
50+
type_, "somehost", "someplugin", "someplugininstance",
51+
"sometypeinstance"
52+
)
53+
54+
connection = mock.Mock()
55+
56+
sender.send(connection, 1517607042.95968, 25.809, 450)
57+
58+
self.assertEqual(
59+
[mock.call(self.message)],
60+
connection.send.mock_calls
61+
)
62+
63+
def test_decode_packet(self):
64+
type_ = protocol.Type(
65+
"my_type",
66+
("some_val", protocol.VALUE_GAUGE),
67+
("some_other_val", protocol.VALUE_DERIVE)
68+
)
69+
70+
message_receiver = protocol.MessageReceiver(type_)
71+
result = message_receiver.receive(self.message)
72+
self.assertEqual(
73+
{
74+
protocol.TYPE_HOST: 'somehost',
75+
protocol.TYPE_TIME: 1517607042,
76+
protocol.TYPE_PLUGIN: 'someplugin',
77+
protocol.TYPE_PLUGIN_INSTANCE: 'someplugininstance',
78+
protocol.TYPE_TYPE: 'my_type',
79+
protocol.TYPE_TYPE_INSTANCE: 'sometypeinstance',
80+
protocol.TYPE_VALUES: [25.809, 450],
81+
protocol.TYPE_INTERVAL: 10,
82+
"type": type_,
83+
"values": {'some_other_val': 450, 'some_val': 25.809}
84+
},
85+
result
86+
)

sqlalchemy_collectd/types.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,27 @@
1-
from . import collectd
1+
from . import protocol
22

3-
pool = collectd.Type(
3+
pool = protocol.Type(
44
"sqlalchemy_pool",
5-
("numpools", collectd.VALUE_GAUGE),
6-
("checkedout", collectd.VALUE_GAUGE),
7-
("checkedin", collectd.VALUE_GAUGE),
8-
("detached", collectd.VALUE_GAUGE),
9-
("invalidated", collectd.VALUE_GAUGE),
10-
("total", collectd.VALUE_GAUGE),
5+
("numpools", protocol.VALUE_GAUGE),
6+
("checkedout", protocol.VALUE_GAUGE),
7+
("checkedin", protocol.VALUE_GAUGE),
8+
("detached", protocol.VALUE_GAUGE),
9+
("invalidated", protocol.VALUE_GAUGE),
10+
("total", protocol.VALUE_GAUGE),
1111
)
1212

13-
checkouts = collectd.Type(
14-
"sqlalchemy_checkouts", ("count", collectd.VALUE_DERIVE))
13+
checkouts = protocol.Type(
14+
"sqlalchemy_checkouts", ("count", protocol.VALUE_DERIVE))
1515

16-
commits = collectd.Type(
17-
"sqlalchemy_commits", ("count", collectd.VALUE_DERIVE))
16+
commits = protocol.Type(
17+
"sqlalchemy_commits", ("count", protocol.VALUE_DERIVE))
1818

19-
rollbacks = collectd.Type(
20-
"sqlalchemy_rollbacks", ("count", collectd.VALUE_DERIVE))
19+
rollbacks = protocol.Type(
20+
"sqlalchemy_rollbacks", ("count", protocol.VALUE_DERIVE))
2121

22-
invalidated = collectd.Type(
23-
"sqlalchemy_invalidated", ("count", collectd.VALUE_DERIVE))
22+
invalidated = protocol.Type(
23+
"sqlalchemy_invalidated", ("count", protocol.VALUE_DERIVE))
2424

25-
transactions = collectd.Type(
26-
"sqlalchemy_transactions", ("count", collectd.VALUE_DERIVE))
25+
transactions = protocol.Type(
26+
"sqlalchemy_transactions", ("count", protocol.VALUE_DERIVE))
2727

0 commit comments

Comments
 (0)