55# for futher analysis.
66
77import datetime
8- import influxdb
98import json
109import logging
1110import os
1211import random
1312import string
14- import zlib
1513
1614from utils import fileopt
1715from utils import util
@@ -41,7 +39,26 @@ def unique_dbname(self):
4139
4240 return '_' .join (dbname )
4341
44- def load_dump (self ):
42+ def exec_importer (self , file = None , chunk_size = 2000 ):
43+ if not file :
44+ logging .fatal ("No file specified." )
45+ return (None , "No metric dump file specified to load." )
46+ base_dir = os .path .join (util .pwd (), "../" )
47+ importer = os .path .join (base_dir , "bin/prom2influx" )
48+ cmd = [importer ,
49+ "-db" , self .db_name ,
50+ "-host" , self .host ,
51+ "-port" , "%s" % self .port ,
52+ "-chunk" , "%s" % chunk_size , # chunk size of one write request
53+ "-file" , file
54+ ]
55+ logging .debug ("Running cmd: %s" % ' ' .join (cmd ))
56+ return util .run_cmd (cmd )
57+
58+ def run_importing (self ):
59+ logging .info ("Metrics will be imported to database '%s'." %
60+ self .db_name )
61+
4562 def file_list (dir = None ):
4663 f_list = []
4764 for file in fileopt .list_dir (dir ):
@@ -52,58 +69,17 @@ def file_list(dir=None):
5269 return f_list
5370
5471 for file in file_list (self .datadir ):
55- if file .endswith ('.json' ):
56- raw = fileopt .read_file (file )
57- elif file .endswith ('.dat' ):
58- raw = zlib .decompress (fileopt .read_file (file , 'rb' ))
59- else :
60- logging .debug ("Skipped unrecorgnized file '%s'" % file )
72+ # all dumped files are in 'prometheus' sub-directory
73+ if not file or not file .endswith ('.json' ) or 'prometheus' not in file :
6174 continue
62- yield json .loads (raw )
63-
64- def build_series (self ):
65- def format_prom_metric (key = None ):
66- points = []
67- point = {'fields' : {}}
68- # build point header
69- for metric in key :
70- point ['measurement' ] = metric ['metric' ]['__name__' ]
71- point ['tags' ] = {
72- 'cluster' : self .db_name ,
73- 'monitor' : 'prometheus' ,
74- }
75- for k , v in metric ['metric' ].items ():
76- point ['tags' ][k ] = v
77- # build point values
78- for value in metric ['values' ]:
79- point ['time' ] = datetime .datetime .utcfromtimestamp (
80- value [0 ]).strftime ('%Y-%m-%dT%H:%M:%SZ' )
81- try :
82- point ['fields' ]['value' ] = float (value [1 ])
83- except ValueError :
84- point ['fields' ]['value' ] = value [1 ]
85- points .append (point .copy ())
86- return points
87-
88- for key in self .load_dump ():
89- yield format_prom_metric (key )
90-
91- def write2influxdb (self ):
92- client = influxdb .InfluxDBClient (
93- host = self .host , port = self .port , username = self .user , password = self .passwd ,
94- database = self .db_name , timeout = 30 )
95- # create_database has no effect if the database already exist
96- client .create_database (self .db_name )
97- logging .info ("Metrics will be imported to database '%s'." %
98- self .db_name )
99-
100- for series in self .build_series ():
101- try :
102- client .write_points (series , batch_size = 2000 )
103- except influxdb .exceptions .InfluxDBClientError as e :
104- logging .warn (
105- "Write error for key '%s', data may be empty." % series [0 ]['measurement' ])
106- logging .debug (e )
107-
108- def run_importing (self ):
109- self .write2influxdb ()
75+ stderr = self .exec_importer (file )[1 ]
76+ if stderr and "Request Entity Too Large" in stderr .decode ('utf-8' ):
77+ logging .info ("Write to DB failed, retry for once..." )
78+ retry_stderr = self .exec_importer (file , chunk_size = 100 )[1 ]
79+ if not retry_stderr :
80+ logging .info ("Retry succeeded." )
81+ else :
82+ logging .warning ("Retry failed, stderr is: '%s'" %
83+ retry_stderr )
84+ elif stderr :
85+ logging .warning (stderr )
0 commit comments