1
- import collections
2
1
import struct
3
2
import socket
4
3
import logging
5
4
import threading
6
- import time
7
5
8
6
log = logging .getLogger (__name__ )
9
7
10
8
DEFAULT_INTERVAL = 10
11
9
MAX_PACKET_SIZE = 1024
12
10
13
- VALUE_COUNTER = 0
14
- VALUE_GAUGE = 1
15
- VALUE_DERIVE = 2
11
+ VALUE_COUNTER = 0
12
+ VALUE_GAUGE = 1
13
+ VALUE_DERIVE = 2
16
14
VALUE_ABSOLUTE = 3
17
15
18
16
# https://git.octo.it/?p=collectd.git;a=blob;hb=master;f=src/network.h
25
23
TYPE_VALUES = 0x0006
26
24
TYPE_INTERVAL = 0x0007
27
25
26
+ _value_formats = {
27
+ VALUE_COUNTER : "!Q" ,
28
+ VALUE_GAUGE : "<d" ,
29
+ VALUE_DERIVE : "!q" ,
30
+ VALUE_ABSOLUTE : "!Q"
31
+ }
32
+
33
+
34
+ class Type (object ):
35
+ """Represents a collectd type and its data template.
36
+
37
+ Here, we are encoding what we need to know about a collectd type that we'd
38
+ be targeting to send to a collectd server.A type includes a name and a set
39
+ of values and types that go along with it. These names and value
40
+ definitions are a fixed thing on the collectd server side, which are
41
+ listed in a file called types.db. Additional custom types can be added
42
+ by specifying additional type files in the collectd configuration.
43
+
44
+ .. seealso::
45
+
46
+ collectd's default types.db:
47
+ https://github.com/collectd/collectd/blob/master/src/types.db
48
+
49
+ """
50
+
51
+ __slots__ = 'name' , '_value_types' , '_value_formats' , '_message_template'
52
+
53
+ def __init__ (self , name , * db_template ):
54
+ """Contruct a new Type.
55
+
56
+ E.g. to represent the "load" type in collectd's types.db::
57
+
58
+ load = Type(
59
+ "load",
60
+ ("shortterm", VALUE_GAUGE),
61
+ ("midterm", VALUE_GAUGE),
62
+ ("longterm", VALUE_GAUGE)
63
+ )
64
+ """
65
+ self .name = name
66
+ self ._value_types = [value_type for dsname , value_type in db_template ]
67
+ self ._value_formats = [
68
+ _value_formats [value_type ] for value_type in self ._value_types ]
69
+ self ._message_template = struct .pack (
70
+ "!HHH" , TYPE_VALUES , 6 + (9 * len (db_template )),
71
+ len (db_template ))
72
+ for value_type in self ._value_types :
73
+ self ._message_template += struct .pack ("B" , value_type )
74
+
75
+ def encode_values (self , * values ):
76
+ """Encode a series of values according to the type template."""
77
+
78
+ msg = self ._message_template
79
+ for format_ , dsvalue in zip (self ._value_formats , values ):
80
+ msg += struct .pack (format_ , dsvalue )
81
+
82
+ return msg
83
+
28
84
29
85
class MessageSender (object ):
86
+ """Represents all the fields necessary to send a message."""
87
+
88
+ __slots__ = (
89
+ 'type' , 'host' , 'plugin' , 'plugin_instance' , 'type_instance' ,
90
+ 'interval' , '_host_message_part' , '_remainder_message_parts'
91
+ )
92
+
30
93
def __init__ (
31
- self , type , host , plugin = "sqlalchemy" , plugin_instance = None ,
94
+ self , type , host , plugin , plugin_instance = None ,
32
95
type_instance = None , interval = DEFAULT_INTERVAL ):
33
96
34
- # TODO: send template just like in types.db and fix to that
35
-
36
97
self .type = type
37
98
self .host = host
38
99
self .plugin = plugin
39
100
self .plugin_instance = plugin_instance
40
101
self .type_instance = type_instance
41
102
self .interval = interval
42
- self ._queue = collections .deque ()
43
-
44
- def _header (self , timestamp ):
45
- buf = b''
46
- buf += struct .pack ("!HH" , TYPE_HOST , 5 + len (self .host )) + self .host + b"\0 "
47
- buf += struct .pack ("!HHq" , TYPE_TIME , 12 , timestamp )
48
- buf += struct .pack ("!HH" , TYPE_PLUGIN , 5 + len (self .plugin )) + self .plugin + b"\0 "
49
- buf += struct .pack ("!HH" , TYPE_PLUGIN_INSTANCE , 5 + len (self .plugin_instance )) + self .plugin_instance + b"\0 "
50
- buf += struct .pack ("!HH" , TYPE_TYPE , 5 + len (self .type )) + self .type + b"\0 "
51
- buf += struct .pack ("!HHq" , TYPE_INTERVAL , 12 , self .interval )
52
- buf += struct .pack ("!HH" , TYPE_TYPE_INSTANCE , 5 + len (self .type_instance )) + self .type_instance + b'\0 '
53
-
54
- return buf
55
-
56
- def _gauge (self , dsname , dsvalue ):
57
- buf = b''
58
- buf += struct .pack ("!HHH" , TYPE_VALUES , 15 , 1 )
59
- buf += struct .pack ("<Bd" , VALUE_GAUGE , dsvalue )
60
-
61
- return buf
62
-
63
- def _derive (self , dsname , dsvalue ):
64
- buf = b''
65
- buf += struct .pack ("!HHH" , TYPE_VALUES , 15 , 1 )
66
- buf += struct .pack ("!Bq" , VALUE_DERIVE , dsvalue )
67
-
68
- return buf
69
-
70
- def queue_stat (self , * values ):
71
- # TODO TODO
72
- pass
73
103
74
- def queue_gauge (self , name , value ):
75
- self ._queue .append ((VALUE_GAUGE , time .time (), name , value ))
104
+ self ._host_message_part = self ._pack_string (TYPE_HOST , self .host )
105
+ self ._remainder_message_parts = (
106
+ self ._pack_string (TYPE_PLUGIN , self .plugin ) +
107
+ self ._pack_string (TYPE_PLUGIN_INSTANCE , self .plugin_instance ) +
108
+ self ._pack_string (TYPE_TYPE , self .type .name ) +
109
+ struct .pack ("!HHq" , TYPE_INTERVAL , 12 , self .interval ) +
110
+ self ._pack_string (TYPE_TYPE_INSTANCE , self .type_instance )
111
+ )
76
112
77
- def queue_derive (self , name , value ):
78
- self . _queue . append (( VALUE_DERIVE , time . time (), name , value ))
113
+ def _pack_string (self , typecode , value ):
114
+ return struct . pack ( "!HH" , typecode , 5 + len ( value )) + value + b" \0 "
79
115
80
- def flush (self , connection ):
81
- now = time .time ()
82
- too_old = now - self .interval
83
- header = self ._header (now )
116
+ def send (self , connection , timestamp , * values ):
117
+ """Send a message on a connection."""
84
118
85
- while self ._queue :
86
- type_ , timestamp , name , value = self ._queue .popleft ()
87
- if timestamp < too_old :
88
- continue
119
+ header = self ._host_message_part + \
120
+ struct .pack ("!HHq" , TYPE_TIME , 12 , timestamp ) + \
121
+ self ._remainder_message_parts
89
122
90
- if type_ == VALUE_GAUGE :
91
- element = self ._gauge (name , value )
92
- else :
93
- element = self ._derive (name , value )
123
+ payload = self .type .encode_values (* values )
94
124
95
- connection .send (header + element )
125
+ connection .send (header + payload )
96
126
97
127
98
128
class Connection (object ):
@@ -105,7 +135,7 @@ def __init__(self, host="localhost", port=25826):
105
135
def send (self , message ):
106
136
self ._mutex .acquire ()
107
137
try :
108
- log .debug ("sending: %s " , message )
138
+ log .debug ("sending: %r " , message )
109
139
self .socket .sendto (message , (self .host , self .port ))
110
140
except IOError :
111
141
log .error ("Error in socket.sendto" , exc_info = True )
0 commit comments