33from  __future__ import  unicode_literals 
44
55import  copy 
6+ import  json 
67import  math 
8+ import  mmap 
9+ import  os 
710import  re 
11+ import  struct 
812import  types 
913from  timeit  import  default_timer 
1014
2327_RESERVED_METRIC_LABEL_NAME_RE  =  re .compile (r'^__.*$' )
2428_INF  =  float ("inf" )
2529_MINUS_INF  =  float ("-inf" )
30+ _INITIAL_MMAP_SIZE  =  1024 * 1024 
2631
2732
2833class  CollectorRegistry (object ):
@@ -220,7 +225,9 @@ def add_metric(self, labels, buckets, sum_value):
220225class  _MutexValue (object ):
221226    '''A float protected by a mutex.''' 
222227
223-     def  __init__ (self , name , labelnames , labelvalues ):
228+     _multiprocess  =  False 
229+ 
230+     def  __init__ (self , typ , metric_name , name , labelnames , labelvalues , ** kwargs ):
224231      self ._value  =  0.0 
225232      self ._lock  =  Lock ()
226233
@@ -236,7 +243,139 @@ def get(self):
236243      with  self ._lock :
237244          return  self ._value 
238245
239- _ValueClass  =  _MutexValue 
246+ class  _MmapedDict (object ):
247+     """A dict of doubles, backed by an mmapped file. 
248+ 
249+     The file starts with a 4 byte int, indicating how much of it is used. 
250+     Then 4 bytes of padding. 
251+     There's then a number of entries, consisting of a 4 byte int which is the 
252+     side of the next field, a utf-8 encoded string key, padding to a 8 byte 
253+     alignment, and then a 8 byte float which is the value. 
254+     """ 
255+     def  __init__ (self , filename ):
256+         self ._lock  =  Lock ()
257+         self ._f  =  open (filename , 'a+b' )
258+         if  os .fstat (self ._f .fileno ()).st_size  ==  0 :
259+             self ._f .truncate (_INITIAL_MMAP_SIZE )
260+         self ._capacity  =  os .fstat (self ._f .fileno ()).st_size 
261+         self ._m  =  mmap .mmap (self ._f .fileno (), self ._capacity )
262+ 
263+         self ._positions  =  {}
264+         self ._used  =  struct .unpack_from (b'i' , self ._m , 0 )[0 ]
265+         if  self ._used  ==  0 :
266+             self ._used  =  8 
267+             struct .pack_into (b'i' , self ._m , 0 , self ._used )
268+         else :
269+             for  key , _ , pos  in  self ._read_all_values ():
270+                 self ._positions [key ] =  pos 
271+ 
272+     def  _init_value (self , key ):
273+         """Initilize a value. Lock must be held by caller.""" 
274+         encoded  =  key .encode ('utf-8' )
275+         # Pad to be 8-byte aligned. 
276+         padded  =  encoded  +  (b' '  *  (8  -  (len (encoded ) +  4 ) %  8 ))
277+         value  =  struct .pack ('i{0}sd' .format (len (padded )).encode (), len (encoded ), padded , 0.0 )
278+         while  self ._used  +  len (value ) >  self ._capacity :
279+             self ._capacity  *=  2 
280+             self ._f .truncate (self ._capacity  *  2 )
281+             self ._m  =  mmap .mmap (self ._f .fileno (), self ._capacity )
282+         self ._m [self ._used :self ._used  +  len (value )] =  value 
283+ 
284+         # Update how much space we've used. 
285+         self ._used  +=  len (value )
286+         struct .pack_into (b'i' , self ._m , 0 , self ._used )
287+         self ._positions [key ] =  self ._used  -  8 
288+ 
289+     def  _read_all_values (self ):
290+         """Yield (key, value, pos). No locking is performed.""" 
291+         pos  =  8 
292+         while  pos  <  self ._used :
293+             encoded_len  =  struct .unpack_from (b'i' , self ._m , pos )[0 ]
294+             pos  +=  4 
295+             encoded  =  struct .unpack_from ('{0}s' .format (encoded_len ).encode (), self ._m , pos )[0 ]
296+             padded_len  =  encoded_len  +  (8  -  (encoded_len  +  4 ) %  8 )
297+             pos  +=  padded_len 
298+             value  =  struct .unpack_from (b'd' , self ._m , pos )[0 ]
299+             yield  encoded .decode ('utf-8' ), value , pos 
300+             pos  +=  8 
301+ 
302+     def  read_all_values (self ):
303+         """Yield (key, value, pos). No locking is performed.""" 
304+         for  k , v , _  in  self ._read_all_values ():
305+             yield  k , v 
306+ 
307+     def  read_value (self , key ):
308+         with  self ._lock :
309+             if  key  not  in self ._positions :
310+                 self ._init_value (key )
311+         pos  =  self ._positions [key ]
312+         # We assume that reading from an 8 byte aligned value is atomic 
313+         return  struct .unpack_from (b'd' , self ._m , pos )[0 ]
314+ 
315+     def  write_value (self , key , value ):
316+         with  self ._lock :
317+             if  key  not  in self ._positions :
318+                 self ._init_value (key )
319+         pos  =  self ._positions [key ]
320+         # We assume that writing to an 8 byte aligned value is atomic 
321+         struct .pack_into (b'd' , self ._m , pos , value )
322+ 
323+     def  close (self ):
324+         if  self ._f :
325+             self ._f .close ()
326+             self ._f  =  None 
327+ 
328+ 
329+ def  _MultiProcessValue (__pid = os .getpid ()):
330+     pid  =  __pid 
331+     files  =  {}
332+     files_lock  =  Lock ()
333+ 
334+     class  _MmapedValue (object ):
335+         '''A float protected by a mutex backed by a per-process mmaped file.''' 
336+ 
337+         _multiprocess  =  True 
338+ 
339+         def  __init__ (self , typ , metric_name , name , labelnames , labelvalues , multiprocess_mode = '' , ** kwargs ):
340+             if  typ  ==  'gauge' :
341+                 file_prefix  =  typ  +  '_'  +   multiprocess_mode 
342+             else :
343+                 file_prefix  =  typ 
344+             with  files_lock :
345+                 if  file_prefix  not  in files :
346+                     filename  =  os .path .join (
347+                             os .environ ['prometheus_multiproc_dir' ], '{0}_{1}.db' .format (file_prefix , pid ))
348+                     files [file_prefix ] =  _MmapedDict (filename )
349+             self ._file  =  files [file_prefix ]
350+             self ._key  =  json .dumps ((metric_name , name , labelnames , labelvalues ))
351+             self ._value  =  self ._file .read_value (self ._key )
352+             self ._lock  =  Lock ()
353+ 
354+         def  inc (self , amount ):
355+             with  self ._lock :
356+                 self ._value  +=  amount 
357+                 self ._file .write_value (self ._key , self ._value )
358+ 
359+         def  set (self , value ):
360+             with  self ._lock :
361+                 self ._value  =  value 
362+                 self ._file .write_value (self ._key , self ._value )
363+ 
364+         def  get (self ):
365+             with  self ._lock :
366+                 return  self ._value 
367+ 
368+     return  _MmapedValue 
369+ 
370+ 
371+ # Should we enable multi-process mode? 
372+ # This needs to be chosen before the first metric is constructed, 
373+ # and as that may be in some arbitrary library the user/admin has 
374+ # no control over we use an enviroment variable. 
375+ if  'prometheus_multiproc_dir'  in  os .environ :
376+     _ValueClass  =  _MultiProcessValue ()
377+ else :
378+     _ValueClass  =  _MutexValue 
240379
241380
242381class  _LabelWrapper (object ):
@@ -387,7 +526,7 @@ def f():
387526    _reserved_labelnames  =  []
388527
389528    def  __init__ (self , name , labelnames , labelvalues ):
390-         self ._value  =  _ValueClass (name , labelnames , labelvalues )
529+         self ._value  =  _ValueClass (self . _type ,  name ,  name , labelnames , labelvalues )
391530
392531    def  inc (self , amount = 1 ):
393532        '''Increment counter by the given amount.''' 
@@ -449,8 +588,12 @@ def f():
449588    _type  =  'gauge' 
450589    _reserved_labelnames  =  []
451590
452-     def  __init__ (self , name , labelnames , labelvalues ):
453-         self ._value  =  _ValueClass (name , labelnames , labelvalues )
591+     def  __init__ (self , name , labelnames , labelvalues , multiprocess_mode = 'all' ):
592+         if  (_ValueClass ._multiprocess 
593+                 and  multiprocess_mode  not  in 'min' , 'max' , 'livesum' , 'liveall' , 'all' ]):
594+             raise  ValueError ('Invalid multiprocess mode: '  +  multiprocess_mode )
595+         self ._value  =  _ValueClass (self ._type , name , name , labelnames ,
596+                 labelvalues , multiprocess_mode = multiprocess_mode )
454597
455598    def  inc (self , amount = 1 ):
456599        '''Increment gauge by the given amount.''' 
@@ -533,8 +676,8 @@ def create_response(request):
533676    _reserved_labelnames  =  ['quantile' ]
534677
535678    def  __init__ (self , name , labelnames , labelvalues ):
536-         self ._count  =  _ValueClass (name  +  '_count' , labelnames , labelvalues )
537-         self ._sum  =  _ValueClass (name  +  '_sum' , labelnames , labelvalues )
679+         self ._count  =  _ValueClass (self . _type ,  name ,  name  +  '_count' , labelnames , labelvalues )
680+         self ._sum  =  _ValueClass (self . _type ,  name ,  name  +  '_sum' , labelnames , labelvalues )
538681
539682    def  observe (self , amount ):
540683        '''Observe the given amount.''' 
@@ -607,7 +750,7 @@ def create_response(request):
607750    _reserved_labelnames  =  ['histogram' ]
608751
609752    def  __init__ (self , name , labelnames , labelvalues , buckets = (.005 , .01 , .025 , .05 , .075 , .1 , .25 , .5 , .75 , 1.0 , 2.5 , 5.0 , 7.5 , 10.0 , _INF )):
610-         self ._sum  =  _ValueClass (name  +  '_sum' , labelnames , labelvalues )
753+         self ._sum  =  _ValueClass (self . _type ,  name ,  name  +  '_sum' , labelnames , labelvalues )
611754        buckets  =  [float (b ) for  b  in  buckets ]
612755        if  buckets  !=  sorted (buckets ):
613756            # This is probably an error on the part of the user, 
@@ -621,7 +764,7 @@ def __init__(self, name, labelnames, labelvalues, buckets=(.005, .01, .025, .05,
621764        self ._buckets  =  []
622765        bucket_labelnames  =  labelnames  +  ('le' ,)
623766        for  b  in  buckets :
624-           self ._buckets .append (_ValueClass (name  +  '_bucket' , bucket_labelnames , labelvalues  +  (_floatToGoString (b ),)))
767+           self ._buckets .append (_ValueClass (self . _type ,  name ,  name  +  '_bucket' , bucket_labelnames , labelvalues  +  (_floatToGoString (b ),)))
625768
626769    def  observe (self , amount ):
627770        '''Observe the given amount.''' 
0 commit comments