Skip to content

Commit eeaf30f

Browse files
committed
- add a data structure that can store aggregated data within
slots based on interval
1 parent 9a5d45b commit eeaf30f

File tree

3 files changed

+146
-1
lines changed

3 files changed

+146
-1
lines changed

sqlalchemy_collectd/server/aggregator.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,73 @@ def put(self, message):
1212
def outgoing(self):
1313
while self.queue:
1414
yield self.queue.pop()
15+
16+
17+
class TimeBucket(object):
18+
"""Store the last N seconds of time-stamped data within
19+
interval-keyed buckets.
20+
21+
The idea is we can store and retrieve records that were
22+
within the last N seconds only, or in the previous
23+
2N-N seconds, or 3N-2N seconds, including that we can efficiently
24+
clean up old ranges in O(1) time.
25+
26+
E.g. assume four buckets and interval of 100::
27+
28+
bucket[0] -> timestamp 50000-50100
29+
bucket[1] -> timestamp 50101-50200
30+
bucket[2] -> timestamp 50201-50300
31+
bucket[3] -> timestamp 50301-50400
32+
33+
100 seconds later::
34+
35+
bucket[0] -> timestamp 50401-50500
36+
bucket[1] -> timestamp 50101-50200
37+
bucket[2] -> timestamp 50201-50300
38+
bucket[3] -> timestamp 50301-50400
39+
40+
100 seconds later::
41+
42+
bucket[0] -> timestamp 50401-50500
43+
bucket[1] -> timestamp 50501-50600
44+
bucket[2] -> timestamp 50201-50300
45+
bucket[3] -> timestamp 50301-50400
46+
47+
etc.
48+
49+
The object assumes if a new timestamp is coming in that is newer
50+
than the current bucket, we go to the next bucket. If the next bucket
51+
has data from the old range it had B buckets ago, we empty it out first.
52+
53+
"""
54+
__slots__ = 'num_buckets', 'buckets', 'interval'
55+
56+
def __init__(self, num_buckets, interval):
57+
self.num_buckets = num_buckets
58+
self.buckets = [
59+
{"slot": None, "data": {}} for i in range(num_buckets)
60+
]
61+
self.interval = interval
62+
63+
def _get_bucket(self, timestamp):
64+
slot = (timestamp // self.interval)
65+
bucket_num = slot % self.num_buckets
66+
bucket = self.buckets[bucket_num]
67+
bucket_slot = bucket["slot"]
68+
if bucket_slot is None:
69+
bucket["slot"] = slot
70+
elif bucket_slot < slot:
71+
bucket["data"].clear()
72+
bucket["slot"] = slot
73+
elif bucket_slot > slot:
74+
raise KeyError()
75+
return bucket
76+
77+
def put(self, timestamp, key, data):
78+
self._get_bucket(timestamp)['data'][key] = data
79+
80+
def get(self, current_time, key):
81+
return self._get_bucket(current_time)['data'].get(key)
82+
83+
def get_data(self, current_time):
84+
return self._get_bucket(current_time)['data']
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import mock
2+
import unittest
3+
import random
4+
5+
from .. import aggregator
6+
7+
8+
class TimeBucketTest(unittest.TestCase):
9+
def _generate(self, interval):
10+
current = random.randint(729404, 930285)
11+
12+
for i in range(200):
13+
yield current
14+
current += random.randint(0, interval // 2)
15+
16+
def test_put(self):
17+
agg = aggregator.TimeBucket(4, 10)
18+
agg.put(50530, "key", "value_50530")
19+
20+
self.assertEqual(agg.get(50532, "key"), "value_50530")
21+
22+
agg.put(50534, "key", "value_50534")
23+
self.assertEqual(agg.get(50535, "key"), "value_50534")
24+
25+
self.assertEqual(agg.get(50538, "key"), "value_50534")
26+
27+
# bucket has expired with this timestamp
28+
self.assertEqual(agg.get(50542, "key"), None)
29+
30+
# but we can still see it w/ recent timestamp
31+
self.assertEqual(agg.get(50539, "key"), "value_50534")
32+
33+
# current bucket
34+
agg.put(50545, "key", "value_50545")
35+
36+
# bump
37+
agg.put(50556, "key", "value_50556")
38+
39+
# still see old value
40+
self.assertEqual(agg.get(50539, "key"), "value_50534")
41+
42+
# bump
43+
agg.put(50562, "key", "value_50562")
44+
45+
# still see old value
46+
self.assertEqual(agg.get(50539, "key"), "value_50534")
47+
self.assertEqual(agg.get_data(50539)["key"], "value_50534")
48+
49+
# bump
50+
agg.put(50574, "key", "value_50574")
51+
52+
# now it's gone
53+
self.assertRaises(
54+
KeyError,
55+
agg.get_data, 50539
56+
)
57+
58+
def test_series(self):
59+
agg = aggregator.TimeBucket(4, 10)
60+
previous_time = None
61+
for round_, time in enumerate(self._generate(10)):
62+
if round_ % 4 == 1:
63+
value = agg.get(time, "key")
64+
if value is None:
65+
assert previous_time // 10 != time // 10
66+
else:
67+
assert previous_time // 10 == time // 10
68+
self.assertEqual(
69+
max([bucket["data"].get("key", '')
70+
for bucket in agg.buckets]),
71+
value
72+
)
73+
else:
74+
agg.put(time, "key", "value_%s" % time)
75+
previous_time = time

sqlalchemy_collectd/tests/test_protocol.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import mock
22
import unittest
33

4-
from sqlalchemy_collectd import protocol
4+
from .. import protocol
55

66

77
class CollectDProtocolTest(unittest.TestCase):

0 commit comments

Comments
 (0)