Skip to content

Commit d70a2e5

Browse files
committed
Added UWSGIFlush storage.
1 parent 7404386 commit d70a2e5

File tree

9 files changed

+274
-143
lines changed

9 files changed

+274
-143
lines changed

README.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ Features
1414
- Four types of metric are supported: Counter, Gauge, Summary(without quantiles) and Histogram.
1515
- InMemoryStorage (do not use it for multiprocessing apps)
1616
- UWSGI storage - share metrics between processes
17+
- UWAGI flush storage - sync metrics with uwsgi sharedarea by flush call
1718
- time decorator
1819
- time context manager
1920

conftest.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,16 @@ def project_root():
2222
def run_around_tests():
2323
m = uwsgi.sharedarea_memoryview(0)
2424
for x in xrange(len(m)):
25-
m[x] = '\x00'
25+
m[x] = "\x00"
2626

2727
yield
2828

29+
2930
@pytest.fixture
3031
def measure_time():
3132
return measure_time_manager
33+
34+
35+
@pytest.fixture()
36+
def iterations():
37+
return 500

pyprometheus/contrib/uwsgi_features.py

Lines changed: 112 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
import os
1616
import struct
1717
import uuid
18+
import copy
1819
from contextlib import contextmanager
19-
20+
from logging import getLogger
2021
from pyprometheus.const import TYPES
2122
from pyprometheus.metrics import Gauge
22-
from pyprometheus.storage import BaseStorage
23+
from pyprometheus.storage import BaseStorage, LocalMemoryStorage
2324

2425

2526
try:
@@ -36,13 +37,15 @@
3637
class InvalidUWSGISharedareaPagesize(Exception):
3738
pass
3839

40+
logger = getLogger('pyprometheus.uwsgi_features')
41+
3942

4043
class UWSGICollector(object):
4144
"""Grap UWSGI stats and export to prometheus
4245
"""
4346
def __init__(self, namespace, labels={}):
4447
self._namespace = namespace
45-
self._labels =tuple(sorted(labels.items(), key=lambda x: x[0]))
48+
self._labels = tuple(sorted(labels.items(), key=lambda x: x[0]))
4649
self._collectors = self.declare_metrics()
4750

4851
@property
@@ -64,40 +67,40 @@ def metric_name(self, name):
6467
6568
:param name:
6669
"""
67-
return ':'.join([self._namespace, name])
70+
return ":".join([self._namespace, name])
6871

6972
def declare_metrics(self):
7073
return {
71-
'memory': Gauge(self.metric_name("uwsgi_memory_bytes"), "UWSGI memory usage in bytes", ('type',) + self._labels),
72-
'processes': Gauge(self.metric_name("processes_total"), "Number of UWSGI processes", self._labels),
73-
'worker_status': Gauge(self.metric_name("worker_status_totla"), "Current workers status", self._labels),
74-
'total_requests': Gauge(self.metric_name("requests_total"), "Total processed request", self._labels),
75-
'buffer_size': Gauge(self.metric_name("buffer_size_bytes"), "UWSGI buffer size in bytes", self._labels),
76-
'started_on': Gauge(self.metric_name("started_on"), "UWSGI started on timestamp", self._labels),
77-
'cores': Gauge(self.metric_name("cores"), "system cores", self._labels),
78-
79-
80-
'process:respawn_count': Gauge(self.metric_name("process:respawn_count"), "Process respawn count", ('id', ) + self._labels),
81-
'process:last_spawn': Gauge(self.metric_name("process:last_spawn"), "Process last spawn", ('id', ) + self._labels),
82-
'process:signals': Gauge(self.metric_name("process:signals"), "Process signals total", ('id', ) + self._labels),
83-
'process:avg_rt': Gauge(self.metric_name("process:avg_rt"), "Process average response time", ('id', ) + self._labels),
84-
'process:tx': Gauge(self.metric_name("process:tx"), "Process transmitted data", ('id',) + self._labels),
85-
86-
'process:status': Gauge(self.metric_name("process:status"), "Process status", ('id', 'status') + self._labels),
87-
'process:running_time': Gauge(self.metric_name("process:running_time"), "Process running time", ('id', ) + self._labels),
88-
'process:exceptions': Gauge(self.metric_name("process:exceptions"), "Process exceptions", ('id', ) + self._labels),
89-
'process:requests': Gauge(self.metric_name("process:requests"), "Process requests", ('id', ) + self._labels),
90-
'process:delta_requests': Gauge(self.metric_name("process:delta_requests"), "Process delta_requests", ('id', ) + self._labels),
91-
'process:rss': Gauge(self.metric_name("process:rss"), "Process rss memory", ('id', ) + self._labels),
92-
'process:vsz': Gauge(self.metric_name("process:vzs"), "Process vsz address space", ('id', ) + self._labels),
74+
"memory": Gauge(self.metric_name("uwsgi_memory_bytes"), "UWSGI memory usage in bytes", ("type",) + self._labels),
75+
"processes": Gauge(self.metric_name("processes_total"), "Number of UWSGI processes", self._labels),
76+
"worker_status": Gauge(self.metric_name("worker_status_totla"), "Current workers status", self._labels),
77+
"total_requests": Gauge(self.metric_name("requests_total"), "Total processed request", self._labels),
78+
"buffer_size": Gauge(self.metric_name("buffer_size_bytes"), "UWSGI buffer size in bytes", self._labels),
79+
"started_on": Gauge(self.metric_name("started_on"), "UWSGI started on timestamp", self._labels),
80+
"cores": Gauge(self.metric_name("cores"), "system cores", self._labels),
81+
82+
83+
"process:respawn_count": Gauge(self.metric_name("process:respawn_count"), "Process respawn count", ("id", ) + self._labels),
84+
"process:last_spawn": Gauge(self.metric_name("process:last_spawn"), "Process last spawn", ("id", ) + self._labels),
85+
"process:signals": Gauge(self.metric_name("process:signals"), "Process signals total", ("id", ) + self._labels),
86+
"process:avg_rt": Gauge(self.metric_name("process:avg_rt"), "Process average response time", ("id", ) + self._labels),
87+
"process:tx": Gauge(self.metric_name("process:tx"), "Process transmitted data", ("id",) + self._labels),
88+
89+
"process:status": Gauge(self.metric_name("process:status"), "Process status", ("id", "status") + self._labels),
90+
"process:running_time": Gauge(self.metric_name("process:running_time"), "Process running time", ("id", ) + self._labels),
91+
"process:exceptions": Gauge(self.metric_name("process:exceptions"), "Process exceptions", ("id", ) + self._labels),
92+
"process:requests": Gauge(self.metric_name("process:requests"), "Process requests", ("id", ) + self._labels),
93+
"process:delta_requests": Gauge(self.metric_name("process:delta_requests"), "Process delta_requests", ("id", ) + self._labels),
94+
"process:rss": Gauge(self.metric_name("process:rss"), "Process rss memory", ("id", ) + self._labels),
95+
"process:vsz": Gauge(self.metric_name("process:vzs"), "Process vsz address space", ("id", ) + self._labels),
9396
}
9497

9598
def collect(self):
96-
for name, value in [('processes', uwsgi.numproc),
97-
('total_requests', uwsgi.total_requests()),
98-
('buffer_size', uwsgi.buffer_size),
99-
('started_on', uwsgi.started_on),
100-
('cores', uwsgi.cores)]:
99+
for name, value in [("processes", uwsgi.numproc),
100+
("total_requests", uwsgi.total_requests()),
101+
("buffer_size", uwsgi.buffer_size),
102+
("started_on", uwsgi.started_on),
103+
("cores", uwsgi.cores)]:
101104
yield self.get_sample(name, value)
102105

103106
yield self.get_memory_samples()
@@ -111,23 +114,23 @@ def get_workers_samples(self, workers):
111114
112115
:param worker: worker stats
113116
"""
114-
for name in ['requests', 'respawn_count', 'running_time',
115-
'exceptions', 'delta_requests',
116-
'rss', 'vsz', 'last_spawn', 'tx', 'avg_rt', 'signals']:
117+
for name in ["requests", "respawn_count", "running_time",
118+
"exceptions", "delta_requests",
119+
"rss", "vsz", "last_spawn", "tx", "avg_rt", "signals"]:
117120
metric = self._collectors["process:" + name]
118121

119122
for worker in workers:
120-
labels = self._labels + (('id', worker['id']),)
123+
labels = self._labels + (("id", worker["id"]),)
121124
metric.add_sample(labels, metric.build_sample(labels,
122-
( (TYPES.GAUGE, metric.name, '', self._labels + (('id', worker['id']),), worker[name]), )))
125+
( (TYPES.GAUGE, metric.name, "", self._labels + (("id", worker["id"]),), worker[name]), )))
123126

124127
yield metric
125128

126129
metric = self._collectors["process:status"]
127130
for worker in workers:
128-
labels = self._labels + (('id', worker['id']), ('status', worker['status']))
131+
labels = self._labels + (("id", worker["id"]), ("status", worker["status"]))
129132
metric.add_sample(labels, metric.build_sample(labels,
130-
( (TYPES.GAUGE, metric.name, '', self._labels + (('id', worker['id']), ('status', worker['status'])), 1), )))
133+
( (TYPES.GAUGE, metric.name, "", self._labels + (("id", worker["id"]), ("status", worker["status"])), 1), )))
131134

132135
yield metric
133136

@@ -138,23 +141,21 @@ def get_sample(self, name, value):
138141
:param value:
139142
"""
140143
metric = self._collectors[name]
141-
return metric.build_samples([(self._labels, ( (TYPES.GAUGE, metric.name, '', self._labels, float(value)), ))])
144+
return metric.build_samples([(self._labels, ( (TYPES.GAUGE, metric.name, "", self._labels, float(value)), ))])
142145

143146
def get_memory_samples(self):
144147
"""Get memory usage samples
145148
"""
146-
metric = self._collectors['memory']
149+
metric = self._collectors["memory"]
147150
return metric.build_samples(
148-
[(self._labels + (('type', 'rss'),), ( (TYPES.GAUGE, metric.name, '', self._labels + (('type', 'rss'),), uwsgi.mem()[0]), )),
149-
(self._labels + (('type', 'vsz'),), ( (TYPES.GAUGE, metric.name, '', self._labels + (('type', 'vsz'),), uwsgi.mem()[1]), ))])
150-
151+
[(self._labels + (("type", "rss"),), ( (TYPES.GAUGE, metric.name, "", self._labels + (("type", "rss"),), uwsgi.mem()[0]), )),
152+
(self._labels + (("type", "vsz"),), ( (TYPES.GAUGE, metric.name, "", self._labels + (("type", "vsz"),), uwsgi.mem()[1]), ))])
151153

152154

153155
class UWSGIStorage(BaseStorage):
154-
"""A dict of doubles, backend by uwsgi sharedarea
155-
"""
156+
"""A dict of doubles, backend by uwsgi sharedarea"""
156157

157-
SHAREDAREA_ID = int(os.environ.get('PROMETHEUS_UWSGI_SHAREDAREA', 0))
158+
SHAREDAREA_ID = int(os.environ.get("PROMETHEUS_UWSGI_SHAREDAREA", 0))
158159
KEY_SIZE_SIZE = 4
159160
KEY_VALUE_SIZE = 8
160161
SIGN_SIZE = 10
@@ -216,14 +217,14 @@ def get_slice(self, start, size):
216217
def get_area_size(self):
217218
"""Read area size from uwsgi
218219
"""
219-
return struct.unpack(b'i', self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)])[0]
220+
return struct.unpack(b"i", self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)])[0]
220221

221222
def init_area_size(self):
222223
return self.update_area_size(self.AREA_SIZE_SIZE)
223224

224225
def update_area_size(self, size):
225226
self._used = size
226-
self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)] = struct.pack(b'i', size)
227+
self.m[self.get_slice(self.AREA_SIZE_POSITION, self.AREA_SIZE_SIZE)] = struct.pack(b"i", size)
227228
return True
228229

229230
def update_area_sign(self):
@@ -298,7 +299,7 @@ def get_key_size(self, key):
298299

299300

300301
def get_binary_string(self, key, value):
301-
item_template = '=i{0}sd'.format(len(key)).encode()
302+
item_template = "=i{0}sd".format(len(key)).encode()
302303

303304
return struct.pack(item_template, len(key), key, value)
304305

@@ -327,30 +328,30 @@ def read_key_string(self, position, size):
327328
:param size: int key size in bytes to read
328329
"""
329330
key_string_bytes = self.m[self.get_slice(position, size)]
330-
return struct.unpack(b'{0}s'.format(size), key_string_bytes)[0]
331+
return struct.unpack(b"{0}s".format(size), key_string_bytes)[0]
331332

332333
def read_key_value(self, position):
333334
"""Read float value of position
334335
335336
:param position: int offset for key value float
336337
"""
337338
key_value_bytes = self.m[self.get_slice(position, self.KEY_VALUE_SIZE)]
338-
return struct.unpack(b'd', key_value_bytes)[0]
339+
return struct.unpack(b"d", key_value_bytes)[0]
339340

340341
def read_key_size(self, position):
341342
"""Read key size from position
342343
343344
:param position: int offset for 4-byte key size
344345
"""
345346
key_size_bytes = self.m[self.get_slice(position, self.KEY_SIZE_SIZE)]
346-
return struct.unpack(b'i', key_size_bytes)[0]
347+
return struct.unpack(b"i", key_size_bytes)[0]
347348

348349
def write_key_value(self, position, value):
349350
"""Write float value to position
350351
351352
:param position: int offset for 8-byte float value
352353
"""
353-
self.m[self.get_slice(position, self.KEY_VALUE_SIZE)] = struct.pack(b'd', value)
354+
self.m[self.get_slice(position, self.KEY_VALUE_SIZE)] = struct.pack(b"d", value)
354355
return value
355356

356357
def read_item(self, position):
@@ -367,7 +368,7 @@ def read_item(self, position):
367368

368369
key = self.read_key_string(key_string_position, key_size)
369370

370-
key_value_position = key_string_position + key_size# + self.get_string_padding(key)
371+
key_value_position = key_string_position + key_size # + self.get_string_padding(key)
371372

372373
key_value = self.read_key_value(key_value_position)
373374
return (key_size,
@@ -470,19 +471,73 @@ def unlock(self):
470471
self._wlocked, self._rlocked = False, False
471472
uwsgi.sharedarea_unlock(self._sharedarea_id)
472473

473-
474474
def __len__(self):
475475
return len(self._positions)
476476

477477
def clear(self):
478478
for x in xrange(self.AREA_SIZE_SIZE + self.AREA_SIZE_SIZE):
479-
self.m[x] = '\x00'
479+
self.m[x] = "\x00"
480480

481481
self._positions.clear()
482482

483-
484483
def get_items(self):
485484
self.validate_actuality()
486485

487486
for key, position in self._positions.items():
488487
yield self.unserialize_key(key), self.read_key_value(position[2])
488+
489+
def inc_items(self, items):
490+
self.validate_actuality()
491+
with self.lock():
492+
for key, value in items:
493+
try:
494+
positions, created = self.get_key_position(self.serialize_key(key), value)
495+
if created:
496+
continue
497+
self.write_key_value(positions[2], self.read_key_value(positions[2]) + value)
498+
except InvalidUWSGISharedareaPagesize:
499+
logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m)))
500+
501+
def write_items(self, items):
502+
self.validate_actuality()
503+
with self.lock():
504+
for key, value in items:
505+
try:
506+
positions, created = self.get_key_position(self.serialize_key(key), value)
507+
if created:
508+
continue
509+
self.write_key_value(positions[2], value)
510+
except InvalidUWSGISharedareaPagesize:
511+
logger.error("Invalid sharedarea pagesize {0} bytes".format(len(self._m)))
512+
513+
514+
class UWSGIFlushStorage(LocalMemoryStorage):
515+
"""Storage wrapper for UWSGI storage that update couters inmemory and flush into uwsgi sharedarea
516+
"""
517+
SHAREDAREA_ID = int(os.environ.get("PROMETHEUS_UWSGI_SHAREDAREA", 0))
518+
519+
def __init__(self, sharedarea_id=UWSGIStorage.SHAREDAREA_ID):
520+
self._uwsgi_storage = UWSGIStorage(sharedarea_id)
521+
self._flush = 0
522+
self._get_items = 0
523+
self._clear = 0
524+
super(UWSGIFlushStorage, self).__init__()
525+
526+
@property
527+
def persistent_storage(self):
528+
return self._uwsgi_storage
529+
530+
def flush(self):
531+
items = list(super(UWSGIFlushStorage, self).get_items())
532+
self._uwsgi_storage.inc_items(items)
533+
super(UWSGIFlushStorage, self).clear()
534+
535+
def get_items(self):
536+
return self._uwsgi_storage.get_items()
537+
538+
def __len__(self):
539+
return super(UWSGIFlushStorage, self).__len__()
540+
541+
def clear(self):
542+
self._uwsgi_storage.clear()
543+
super(UWSGIFlushStorage, self).clear()

pyprometheus/metrics.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,10 @@
1111
:github: http://github.com/Lispython/pyprometheus
1212
"""
1313

14-
15-
1614
from pyprometheus.const import TYPES
1715
from pyprometheus.values import (MetricValue, GaugeValue,
1816
CounterValue, SummaryValue,
1917
HistogramValue)
20-
2118
class BaseMetric(object):
2219

2320
value_class = MetricValue
@@ -132,10 +129,6 @@ def __getattr__(self, name):
132129
# return super(BaseMetric, self).__getattr__(name)
133130

134131

135-
136-
137-
138-
139132
class Gauge(BaseMetric):
140133

141134
TYPE = "gauge"
@@ -146,7 +139,6 @@ class Gauge(BaseMetric):
146139
'set_to_current_time', 'time', 'value'))
147140

148141

149-
150142
class Counter(BaseMetric):
151143
TYPE = "counter"
152144

pyprometheus/storage.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,9 @@ def label_group(self, value):
8383
class LocalMemoryStorage(BaseStorage):
8484

8585
def __init__(self):
86-
self._storage = defaultdict(lambda: defaultdict(lambda: defaultdict(float)))
8786
self._storage = defaultdict(float)
8887
self._lock = Lock()
8988

90-
9189
def inc_value(self, key, value):
9290
with self._lock:
9391
self._storage[key] += value

0 commit comments

Comments
 (0)