Skip to content

Commit 1aeb3f8

Browse files
authored
Thanos Streamer supports dropping a replica label (#153)
2 parents 8cf768f + 54616f7 commit 1aeb3f8

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

cmd/thanos/streamer.go

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type StreamerConfig struct {
3838
socketPath string
3939
storeAddrPort string
4040
streamTimeoutSeconds int
41+
replicaLabel string
4142
}
4243

4344
func registerStreamer(app *extkingpin.App) {
@@ -46,6 +47,7 @@ func registerStreamer(app *extkingpin.App) {
4647
cmd.Flag("socket.path", "Path to the Unix socket").Default("/tmp/thanos-streamer.sock").StringVar(&config.socketPath)
4748
cmd.Flag("store", "Thanos Store API gRPC endpoint").Default("localhost:10901").StringVar(&config.storeAddrPort)
4849
cmd.Flag("stream.timeout_seconds", "One stream's overall timeout in seconds ").Default("36000").IntVar(&config.streamTimeoutSeconds)
50+
cmd.Flag("stream.replica_label", "Drop this replica label from all returns time series and dedup them.").Default("").StringVar(&config.replicaLabel)
4951

5052
hc := &httpConfig{}
5153
hc = hc.registerFlag(cmd)
@@ -129,14 +131,17 @@ func (c aggrChunkByTimestamp) Less(i, j int) bool {
129131
(c[i].MinTime == c[j].MinTime && c[i].MaxTime > c[j].MaxTime)
130132
}
131133

132-
func convertToSeriesReq(streamerReq *streamer.StreamerRequest) *storepb.SeriesRequest {
134+
func convertToSeriesReq(config *StreamerConfig, streamerReq *streamer.StreamerRequest) *storepb.SeriesRequest {
133135
req := &storepb.SeriesRequest{
134136
Aggregates: []storepb.Aggr{storepb.Aggr_RAW},
135137
Matchers: make([]storepb.LabelMatcher, 0),
136138
MinTime: streamerReq.StartTimestampMs,
137139
MaxTime: streamerReq.EndTimestampMs,
138140
SkipChunks: streamerReq.SkipChunks,
139141
}
142+
if config.replicaLabel != "" {
143+
req.WithoutReplicaLabels = []string{config.replicaLabel}
144+
}
140145
for _, labelMatcher := range streamerReq.LabelMatchers {
141146
req.Matchers = append(req.Matchers, storepb.LabelMatcher{
142147
Type: storepb.LabelMatcher_Type(labelMatcher.Type),
@@ -176,7 +181,8 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io
176181
outOfTimeRangeSampleTotal := 0
177182
outOfOrderSampleTotal := 0
178183
duplicateSampleTotal := 0
179-
storeReq := convertToSeriesReq(request)
184+
duplicateChunks := 0
185+
storeReq := convertToSeriesReq(&s.config, request)
180186
responseSeq := 0
181187

182188
// Write on a socket connection can be blocked until the overall stream timeout (config.streamTimeoutSeconds)
@@ -234,6 +240,7 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io
234240
"out_of_time_range_samples", outOfTimeRangeSampleTotal,
235241
"out_of_order_samples", outOfOrderSampleTotal,
236242
"duplicate_samples", duplicateSampleTotal,
243+
"dupliate_chunks", duplicateChunks,
237244
)
238245
}
239246
}()
@@ -309,6 +316,8 @@ func (s *Streamer) streamOneRequest(request *streamer.StreamerRequest, writer io
309316
"is_duplicate_chunk", prevChunkMinTime == chunk.MinTime,
310317
)
311318
if prevChunkMinTime == chunk.MinTime {
319+
// Chunks are sorted by MinTime.
320+
duplicateChunks++
312321
continue
313322
}
314323
prevChunkMinTime = chunk.MinTime

cmd/thanos/streamer_test.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,15 @@ func TestConvertToSeriesReq(t *testing.T) {
4545
MaxTime: streamerReq.EndTimestampMs,
4646
SkipChunks: streamerReq.SkipChunks,
4747
}
48+
config := &StreamerConfig{}
4849

49-
actualReq := convertToSeriesReq(streamerReq)
50+
actualReq := convertToSeriesReq(config, streamerReq)
5051
assert.Equal(t, expectedReq, actualReq, "Expected SeriesRequest to match")
52+
53+
config.replicaLabel = "replica"
54+
expectedReq.WithoutReplicaLabels = []string{"replica"}
55+
actualReq = convertToSeriesReq(config, streamerReq)
56+
assert.Equal(t, expectedReq, actualReq, "Expected SeriesRequest to match with replica label removed")
5157
}
5258

5359
type grpcServer struct {

cmd/thanos/tools_streamer.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,13 +71,6 @@ func runStreamerTool(conf *streamerToolConfig, logger log.Logger) error {
7171
EndTimestampMs: startMs + 4*3600*1000,
7272
SkipChunks: conf.skipChunks,
7373
Metric: conf.metric,
74-
LabelMatchers: []streamer_pkg.LabelMatcher{
75-
{
76-
Name: "__replica__",
77-
Value: "",
78-
Type: streamer_pkg.LabelMatcher_EQ,
79-
},
80-
},
8174
}
8275

8376
level.Info(logger).Log(

0 commit comments

Comments
 (0)