44
55import copy
66import math
7+ import json
8+ import os
79import re
10+ import shelve
811import types
912from timeit import default_timer
1013
@@ -220,7 +223,9 @@ def add_metric(self, labels, buckets, sum_value):
220223class _MutexValue (object ):
221224 '''A float protected by a mutex.'''
222225
223- def __init__ (self , name , labelnames , labelvalues ):
226+ _multiprocess = False
227+
228+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , ** kwargs ):
224229 self ._value = 0.0
225230 self ._lock = Lock ()
226231
@@ -236,7 +241,60 @@ def get(self):
236241 with self ._lock :
237242 return self ._value
238243
239- _ValueClass = _MutexValue
244+
245+ def _MultiProcessValue (__pid = os .getpid ()):
246+ pid = __pid
247+ samples = {}
248+ samples_lock = Lock ()
249+
250+ class _ShelveValue (object ):
251+ '''A float protected by a mutex backed by a per-process shelve.'''
252+
253+ _multiprocess = True
254+
255+ def __init__ (self , typ , metric_name , name , labelnames , labelvalues , multiprocess_mode = '' , ** kwargs ):
256+ with samples_lock :
257+ if typ == 'gauge' :
258+ file_prefix = typ + '_' + multiprocess_mode
259+ else :
260+ file_prefix = typ
261+ if file_prefix not in samples :
262+ filename = os .path .join (os .environ ['prometheus_multiproc_dir' ], '{0}_{1}.db' .format (file_prefix , pid ))
263+ samples [file_prefix ] = shelve .open (filename )
264+ self ._samples = samples [file_prefix ]
265+ self ._key = json .dumps ((metric_name , name , labelnames , labelvalues ))
266+ self ._value = self ._samples .get (self ._key , 0.0 )
267+ self ._samples [self ._key ] = self ._value
268+ self ._samples .sync ()
269+ self ._lock = Lock ()
270+
271+ def inc (self , amount ):
272+ with self ._lock :
273+ self ._value += amount
274+ self ._samples [self ._key ] = self ._value
275+ self ._samples .sync ()
276+
277+ def set (self , value ):
278+ with self ._lock :
279+ self ._value = value
280+ self ._samples [self ._key ] = self ._value
281+ self ._samples .sync ()
282+
283+ def get (self ):
284+ with self ._lock :
285+ return self ._value
286+
287+ return _ShelveValue
288+
289+
290+ # Should we enable multi-process mode?
291+ # This needs to be chosen before the first metric is constructed,
292+ # and as that may be in some arbitrary library the user/admin has
293+ # no control over we use an enviroment variable.
294+ if 'prometheus_multiproc_dir' in os .environ :
295+ _ValueClass = _MultiProcessValue ()
296+ else :
297+ _ValueClass = _MutexValue
240298
241299
242300class _LabelWrapper (object ):
@@ -387,7 +445,7 @@ def f():
387445 _reserved_labelnames = []
388446
389447 def __init__ (self , name , labelnames , labelvalues ):
390- self ._value = _ValueClass (name , labelnames , labelvalues )
448+ self ._value = _ValueClass (self . _type , name , name , labelnames , labelvalues )
391449
392450 def inc (self , amount = 1 ):
393451 '''Increment counter by the given amount.'''
@@ -449,8 +507,12 @@ def f():
449507 _type = 'gauge'
450508 _reserved_labelnames = []
451509
452- def __init__ (self , name , labelnames , labelvalues ):
453- self ._value = _ValueClass (name , labelnames , labelvalues )
510+ def __init__ (self , name , labelnames , labelvalues , multiprocess_mode = 'all' ):
511+ if (_ValueClass ._multiprocess
512+ and multiprocess_mode not in ['min' , 'max' , 'livesum' , 'liveall' , 'all' ]):
513+ raise ValueError ('Invalid multiprocess mode: ' + multiprocess_mode )
514+ self ._value = _ValueClass (self ._type , name , name , labelnames ,
515+ labelvalues , multiprocess_mode = multiprocess_mode )
454516
455517 def inc (self , amount = 1 ):
456518 '''Increment gauge by the given amount.'''
@@ -533,8 +595,8 @@ def create_response(request):
533595 _reserved_labelnames = ['quantile' ]
534596
535597 def __init__ (self , name , labelnames , labelvalues ):
536- self ._count = _ValueClass (name + '_count' , labelnames , labelvalues )
537- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
598+ self ._count = _ValueClass (self . _type , name , name + '_count' , labelnames , labelvalues )
599+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
538600
539601 def observe (self , amount ):
540602 '''Observe the given amount.'''
@@ -607,7 +669,7 @@ def create_response(request):
607669 _reserved_labelnames = ['histogram' ]
608670
609671 def __init__ (self , name , labelnames , labelvalues , buckets = (.005 , .01 , .025 , .05 , .075 , .1 , .25 , .5 , .75 , 1.0 , 2.5 , 5.0 , 7.5 , 10.0 , _INF )):
610- self ._sum = _ValueClass (name + '_sum' , labelnames , labelvalues )
672+ self ._sum = _ValueClass (self . _type , name , name + '_sum' , labelnames , labelvalues )
611673 buckets = [float (b ) for b in buckets ]
612674 if buckets != sorted (buckets ):
613675 # This is probably an error on the part of the user,
@@ -621,7 +683,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621683 self ._buckets = []
622684 bucket_labelnames = labelnames + ('le' ,)
623685 for b in buckets :
624- self ._buckets .append (_ValueClass (name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
686+ self ._buckets .append (_ValueClass (self . _type , name , name + '_bucket' , bucket_labelnames , labelvalues + (_floatToGoString (b ),)))
625687
626688 def observe (self , amount ):
627689 '''Observe the given amount.'''
0 commit comments