Skip to content

Commit d1fb78f

Browse files
AstroProfundisLinuxGit
authored andcommitted
metric: run multiple export/import processes at the same time (#39)
1 parent 2237eec commit d1fb78f

File tree

5 files changed

+80
-31
lines changed

5 files changed

+80
-31
lines changed

metric/importer/prometheus.py

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import random
1212
import string
1313

14+
import multiprocessing as mp
15+
1416
from utils import fileopt
1517
from utils import util
1618

@@ -24,6 +26,8 @@ def __init__(self, args):
2426
self.db_name = args.db if args.db else self.unique_dbname()
2527
self.user = args.user
2628
self.passwd = args.passwd
29+
self.proc_num = args.proc_num if args.proc_num else int(
30+
mp.cpu_count() + 1)
2731

2832
# unique_dbname() generates a unique database name for importing, to prevents
2933
# overwritting of previous imported data
@@ -55,6 +59,22 @@ def exec_importer(self, file=None, chunk_size=2000):
5559
logging.debug("Running cmd: %s" % ' '.join(cmd))
5660
return util.run_cmd(cmd)
5761

62+
def importer_worker(self, filename):
63+
# all dumped files are in 'prometheus' sub-directory
64+
if not filename or not filename.endswith('.json') or 'prometheus' not in filename:
65+
return
66+
stderr = self.exec_importer(filename)[1]
67+
if stderr and "Request Entity Too Large" in stderr.decode('utf-8'):
68+
logging.info("Write to DB failed, retry for once...")
69+
retry_stderr = self.exec_importer(filename, chunk_size=100)[1]
70+
if not retry_stderr:
71+
logging.info("Retry succeeded.")
72+
else:
73+
logging.warning("Retry failed, stderr is: '%s'" %
74+
retry_stderr)
75+
elif stderr:
76+
logging.warning(stderr)
77+
5878
def run_importing(self):
5979
logging.info("Metrics will be imported to database '%s'." %
6080
self.db_name)
@@ -68,18 +88,14 @@ def file_list(dir=None):
6888
f_list.append(file)
6989
return f_list
7090

71-
for file in file_list(self.datadir):
72-
# all dumped files are in 'prometheus' sub-directory
73-
if not file or not file.endswith('.json') or 'prometheus' not in file:
74-
continue
75-
stderr = self.exec_importer(file)[1]
76-
if stderr and "Request Entity Too Large" in stderr.decode('utf-8'):
77-
logging.info("Write to DB failed, retry for once...")
78-
retry_stderr = self.exec_importer(file, chunk_size=100)[1]
79-
if not retry_stderr:
80-
logging.info("Retry succeeded.")
81-
else:
82-
logging.warning("Retry failed, stderr is: '%s'" %
83-
retry_stderr)
84-
elif stderr:
85-
logging.warning(stderr)
91+
pool = mp.Pool(self.proc_num)
92+
files = file_list(self.datadir)
93+
pool.map_async(unwrap_self_f, zip([self] * len(files), files))
94+
pool.close()
95+
pool.join()
96+
97+
98+
# a trick to use multiprocessing.Pool inside a class
99+
# see http://www.rueckstiess.net/research/snippets/show/ca1d7d90 for details
100+
def unwrap_self_f(arg, **kwarg):
101+
return PromDump.importer_worker(*arg, **kwarg)

metric/prometheus.py

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import logging
88
import os
99

10+
import multiprocessing as mp
11+
1012
from metric.base import MetricBase
1113
from utils import fileopt
1214
from utils import util
@@ -19,6 +21,9 @@ def __init__(self, args, basedir=None, subdir=None):
1921

2022
self.host = args.host if args.host else 'localhost'
2123
self.port = args.port if args.port else 9090
24+
self.proc_num = args.proc_num if args.proc_num else int(
25+
mp.cpu_count() / 2 + 1)
26+
2227
self.api_uri = '/api/v1'
2328
self.url_base = 'http://%s:%s%s' % (self.host, self.port, self.api_uri)
2429

@@ -33,20 +38,33 @@ def get_label_names(self):
3338
logging.debug("Found %s available metric keys..." % len(result))
3439
return result
3540

41+
def query_worker(self, metric):
42+
url = '%s/query_range?query=%s&start=%s&end=%s&step=%s' % (
43+
self.url_base, metric, self.start_time, self.end_time, self.resolution)
44+
response = util.read_url(url)[0]
45+
if 'success' not in response[:20].decode('utf-8'):
46+
logging.error("Error querying for key '%s'." % metric)
47+
logging.debug("Output is:\n%s" % response)
48+
return
49+
metric_filename = '%s_%s_to_%s_%ss.json' % (
50+
metric, self.start_time, self.end_time, self.resolution)
51+
fileopt.write_file(os.path.join(
52+
self.outdir, metric_filename), response)
53+
logging.debug("Saved data for key '%s'." % metric)
54+
3655
def run_collecting(self):
3756
if self.resolution < 15.0:
3857
logging.warning(
3958
"Sampling resolution < 15s don't increase accuracy but data size.")
40-
for metric in self.get_label_names():
41-
url = '%s/query_range?query=%s&start=%s&end=%s&step=%s' % (
42-
self.url_base, metric, self.start_time, self.end_time, self.resolution)
43-
matrix = json.loads(util.read_url(url)[0])
44-
if not matrix['status'] == 'success':
45-
logging.info("Error querying for key '%s'." % metric)
46-
logging.debug("Output is:\n%s" % matrix)
47-
continue
48-
metric_filename = '%s_%s_to_%s_%ss.json' % (
49-
metric, self.start_time, self.end_time, self.resolution)
50-
fileopt.write_file(os.path.join(
51-
self.outdir, metric_filename), json.dumps(matrix['data']['result']))
52-
logging.debug("Saved data for key '%s'." % metric)
59+
pool = mp.Pool(self.proc_num)
60+
metric_names = self.get_label_names()
61+
pool.map_async(unwrap_self_f, zip(
62+
[self] * len(metric_names), metric_names))
63+
pool.close()
64+
pool.join()
65+
66+
67+
# a trick to use multiprocessing.Pool inside a class
68+
# see http://www.rueckstiess.net/research/snippets/show/ca1d7d90 for details
69+
def unwrap_self_f(arg, **kwarg):
70+
return PromMetrics.query_worker(*arg, **kwarg)

tools/prom2influx.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,16 @@ type options struct {
2424
Chunk int
2525
}
2626

27+
type promResult struct {
28+
ResultType string
29+
Result model.Matrix
30+
}
31+
32+
type promDump struct {
33+
Status string
34+
Data promResult
35+
}
36+
2737
func parseOpts() options {
2838
influxHost := flag.String("host", "localhost", "The host of influxdb.")
2939
influxPort := flag.String("port", "8086", "The port of influxdb.")
@@ -114,8 +124,8 @@ func buildPoints(series *model.SampleStream, client influx.Client,
114124
return ptList, nil
115125
}
116126

117-
func writeBatchPoints(data model.Matrix, opts options) error {
118-
for _, series := range data {
127+
func writeBatchPoints(data promDump, opts options) error {
128+
for _, series := range data.Data.Result {
119129
client := newClient(opts)
120130
ptList, err := buildPoints(series, client, opts)
121131
if err != nil {
@@ -153,7 +163,7 @@ func main() {
153163
}
154164

155165
// decode JSON
156-
var data model.Matrix
166+
var data promDump
157167
if err = json.Unmarshal(input, &data); err != nil {
158168
log.Fatal(err)
159169
}

utils/fileopt.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def read_file(filename, mode='r'):
2121
# write data to file
2222
def write_file(filename, data, mode='w'):
2323
with open(filename, mode) as f:
24+
logging.debug("Writting %s of data to %s" % (len(data), filename))
2425
try:
2526
f.write(str(data, 'utf-8'))
2627
except TypeError:

utils/util.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,6 +201,8 @@ def parse_insight_opts():
201201
help="End time point of time range, format: '%%Y-%%m-%%d %%H:%%M:%%S' (local time).")
202202
parser_prom.add_argument("--resolution", type=float, default=None,
203203
help="Query resolution step width of Prometheus in seconds, 15.0 by default.")
204+
parser_prom.add_argument("--proc-num", type=int, action="store", default=None,
205+
help="Number of parallel queries to run, 'CPU count / 2 + 1' by default.")
204206

205207
parser_load = subparser_metric.add_parser(
206208
"load", help="Load dumped metrics to local influxdb.")
@@ -216,6 +218,8 @@ def parse_insight_opts():
216218
help="The user with priviledge to create database, empty (no authentication needed) by default.")
217219
parser_load.add_argument("--passwd", action="store", default=None,
218220
help="The password of user, empty (no authentication needed) by default.")
221+
parser_load.add_argument("--proc-num", type=int, action="store", default=None,
222+
help="Number of parallel importer processes to run, 'CPU count + 1' by default.")
219223
####
220224

221225
return parser.parse_args()

0 commit comments

Comments
 (0)