Skip to content

Commit d08ffe6

Browse files
authored
Add a tool to stream a metric (#125)
2 parents 85c58c8 + d0fcb4a commit d08ffe6

File tree

2 files changed

+148
-0
lines changed

2 files changed

+148
-0
lines changed

cmd/thanos/tools.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ func registerTools(app *extkingpin.App) {
2828

2929
registerBucket(cmd)
3030
registerCheckRules(cmd)
31+
registerStreamMetric(cmd)
3132
}
3233

3334
func (tc *checkRulesConfig) registerFlag(cmd extkingpin.FlagClause) *checkRulesConfig {

cmd/thanos/tools_metric.go

Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Copyright (c) The Thanos Authors.
2+
// Licensed under the Apache License 2.0.
3+
4+
package main
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"io"
10+
"strings"
11+
"time"
12+
13+
"github.com/go-kit/log"
14+
"github.com/go-kit/log/level"
15+
"github.com/oklog/run"
16+
"github.com/opentracing/opentracing-go"
17+
"github.com/prometheus/client_golang/prometheus"
18+
"github.com/prometheus/prometheus/model/labels"
19+
"github.com/thanos-io/thanos/pkg/extkingpin"
20+
"github.com/thanos-io/thanos/pkg/store/storepb"
21+
22+
"github.com/prometheus/prometheus/tsdb/chunkenc"
23+
"google.golang.org/grpc"
24+
"google.golang.org/grpc/credentials/insecure"
25+
)
26+
27+
type rawMetricConfig struct {
28+
storeAddr string
29+
metric string
30+
hoursAgo int
31+
skipChunks bool
32+
}
33+
34+
func registerFlags(cmd extkingpin.FlagClause) *rawMetricConfig {
35+
conf := &rawMetricConfig{}
36+
cmd.Flag("store", "Thanos Store API gRPC endpoint").Default("localhost:10901").StringVar(&conf.storeAddr)
37+
cmd.Flag("metric", "The metric name to stream time series for this metric").Default("node_cpu_seconds_total").StringVar(&conf.metric)
38+
cmd.Flag("hours_ago", "Stream the metric from this number of hours ago").Default("16").IntVar(&conf.hoursAgo)
39+
cmd.Flag("skip_chunks", "Skip chunks in the response").Default("false").BoolVar(&conf.skipChunks)
40+
return conf
41+
}
42+
43+
func registerStreamMetric(app extkingpin.AppClause) {
44+
cmd := app.Command("metric", "Stream time series for a metric")
45+
46+
conf := registerFlags(cmd)
47+
cmd.Setup(func(
48+
g *run.Group,
49+
logger log.Logger,
50+
_ *prometheus.Registry,
51+
_ opentracing.Tracer,
52+
_ <-chan struct{},
53+
_ bool) error {
54+
// Dummy actor to immediately kill the group after the run function returns.
55+
g.Add(func() error { return nil }, func(error) {})
56+
return streamMetric(conf, logger)
57+
})
58+
}
59+
60+
func streamMetric(conf *rawMetricConfig, logger log.Logger) error {
61+
nowMs := time.Now().Unix() * 1000
62+
startMs := nowMs - int64(conf.hoursAgo)*3600*1000
63+
conn, err := grpc.Dial(
64+
conf.storeAddr,
65+
grpc.WithTransportCredentials(insecure.NewCredentials()),
66+
)
67+
if err != nil {
68+
level.Error(logger).Log("msg", "Failed to create gRPC client", "err", err, "store", conf.storeAddr)
69+
return err
70+
}
71+
storeClient := storepb.NewStoreClient(conn)
72+
labelMatchers := []storepb.LabelMatcher{
73+
{Type: storepb.LabelMatcher_EQ, Name: "__name__", Value: conf.metric},
74+
}
75+
storeReq := &storepb.SeriesRequest{
76+
Aggregates: []storepb.Aggr{storepb.Aggr_RAW},
77+
Matchers: labelMatchers,
78+
MinTime: startMs,
79+
MaxTime: 4*3600*1000 + startMs,
80+
SkipChunks: conf.skipChunks,
81+
}
82+
83+
level.Info(logger).Log(
84+
"msg", "sending a gRPC call to store",
85+
"num_label_matchers", len(storeReq.Matchers),
86+
"label_matchers", fmt.Sprintf("%+v", storeReq.Matchers),
87+
"req", storeReq.String(),
88+
)
89+
90+
storeRes, err := storeClient.Series(context.Background(), storeReq)
91+
if err != nil {
92+
return err
93+
}
94+
seq := 0
95+
for ; true; seq++ {
96+
resPtr, err := storeRes.Recv()
97+
if err == io.EOF {
98+
level.Info(logger).Log("msg", "Got EOF from store")
99+
break
100+
}
101+
if err != nil {
102+
return err
103+
}
104+
series := resPtr.GetSeries()
105+
if series == nil {
106+
return fmt.Errorf("Got a nil series")
107+
}
108+
if 0 == (seq % 1000) {
109+
level.Info(logger).Log("msg", "streaming time series", "seq", seq)
110+
}
111+
metric := ""
112+
fmt.Printf("{")
113+
for _, label := range series.Labels {
114+
name := strings.Clone(label.Name)
115+
value := strings.Clone(label.Value)
116+
fmt.Printf("%s=%s,", label.Name, label.Value)
117+
if name == labels.MetricName {
118+
metric = value
119+
}
120+
}
121+
fmt.Printf("}")
122+
if metric != conf.metric {
123+
return fmt.Errorf("%d-th time series from the response has a different metric name:\n Actual: %+v\n Expected: %+v",
124+
seq, []byte(metric), []byte(conf.metric))
125+
}
126+
order := 0
127+
for _, chunk := range series.Chunks {
128+
raw, err := chunkenc.FromData(chunkenc.EncXOR, chunk.Raw.Data)
129+
if err != nil {
130+
level.Error(logger).Log("err", err, "msg", "error in decoding chunk")
131+
continue
132+
}
133+
134+
iter := raw.Iterator(nil)
135+
for iter.Next() != chunkenc.ValNone {
136+
ts, value := iter.At()
137+
if order < 5 {
138+
fmt.Printf(" %f @%d,", value, ts)
139+
}
140+
order++
141+
}
142+
}
143+
fmt.Printf("\n")
144+
}
145+
level.Info(logger).Log("msg", "successfully streamed all time series for the metric", "metric", conf.metric, "num_series", seq)
146+
return nil
147+
}

0 commit comments

Comments
 (0)