Skip to content

Commit b0bb008

Browse files
MichaHoffmannjnyi
authored andcommitted
Stores: respect replica labels in LabelValues and LabelNames (thanos-io#7310)
* Proxy: acceptance test for proxy store with replica labels Signed-off-by: Michael Hoffmann <[email protected]> * Stores: handle replica labels in label_value and label_names grpcs Signed-off-by: Michael Hoffmann <[email protected]> --------- Signed-off-by: Michael Hoffmann <[email protected]>
1 parent e4371bf commit b0bb008

File tree

8 files changed

+320
-127
lines changed

8 files changed

+320
-127
lines changed

pkg/query/querier.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -472,14 +472,19 @@ func (q *querier) LabelValues(ctx context.Context, name string, matchers ...*lab
472472
if err != nil {
473473
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
474474
}
475-
476-
resp, err := q.proxy.LabelValues(ctx, &storepb.LabelValuesRequest{
475+
req := &storepb.LabelValuesRequest{
477476
Label: name,
478477
PartialResponseStrategy: q.partialResponseStrategy,
479478
Start: q.mint,
480479
End: q.maxt,
481480
Matchers: pbMatchers,
482-
})
481+
}
482+
483+
if q.isDedupEnabled() {
484+
req.WithoutReplicaLabels = q.replicaLabels
485+
}
486+
487+
resp, err := q.proxy.LabelValues(ctx, req)
483488
if err != nil {
484489
return nil, nil, errors.Wrap(err, "proxy LabelValues()")
485490
}
@@ -506,12 +511,18 @@ func (q *querier) LabelNames(ctx context.Context, matchers ...*labels.Matcher) (
506511
return nil, nil, errors.Wrap(err, "converting prom matchers to storepb matchers")
507512
}
508513

509-
resp, err := q.proxy.LabelNames(ctx, &storepb.LabelNamesRequest{
514+
req := &storepb.LabelNamesRequest{
510515
PartialResponseStrategy: q.partialResponseStrategy,
511516
Start: q.mint,
512517
End: q.maxt,
513518
Matchers: pbMatchers,
514-
})
519+
}
520+
521+
if q.isDedupEnabled() {
522+
req.WithoutReplicaLabels = q.replicaLabels
523+
}
524+
525+
resp, err := q.proxy.LabelNames(ctx, req)
515526
if err != nil {
516527
return nil, nil, errors.Wrap(err, "proxy LabelNames()")
517528
}

pkg/store/acceptance_test.go

Lines changed: 64 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
109109
},
110110
labelValuesCalls: []labelValuesCallCase{
111111
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "foo", expectedValues: []string{"foovalue1"}},
112+
{start: timestamp.FromTime(minTime), end: timestamp.FromTime(maxTime), label: "replica"},
112113
},
113114
},
114115
{
@@ -722,9 +723,10 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
722723
for _, c := range tc.labelNameCalls {
723724
t.Run("label_names", func(t *testing.T) {
724725
resp, err := store.LabelNames(context.Background(), &storepb.LabelNamesRequest{
725-
Start: c.start,
726-
End: c.end,
727-
Matchers: c.matchers,
726+
Start: c.start,
727+
End: c.end,
728+
Matchers: c.matchers,
729+
WithoutReplicaLabels: []string{"replica"},
728730
})
729731
if c.expectErr != nil {
730732
testutil.NotOk(t, err)
@@ -740,12 +742,13 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
740742
})
741743
}
742744
for _, c := range tc.labelValuesCalls {
743-
t.Run("label_name_values", func(t *testing.T) {
745+
t.Run("label_values", func(t *testing.T) {
744746
resp, err := store.LabelValues(context.Background(), &storepb.LabelValuesRequest{
745-
Start: c.start,
746-
End: c.end,
747-
Label: c.label,
748-
Matchers: c.matchers,
747+
Start: c.start,
748+
End: c.end,
749+
Label: c.label,
750+
Matchers: c.matchers,
751+
WithoutReplicaLabels: []string{"replica"},
749752
})
750753
if c.expectErr != nil {
751754
testutil.NotOk(t, err)
@@ -764,10 +767,11 @@ func testStoreAPIsAcceptance(t *testing.T, startStore startStoreFn) {
764767
t.Run("series", func(t *testing.T) {
765768
srv := newStoreSeriesServer(context.Background())
766769
err := store.Series(&storepb.SeriesRequest{
767-
MinTime: c.start,
768-
MaxTime: c.end,
769-
Matchers: c.matchers,
770-
SkipChunks: c.skipChunks,
770+
MinTime: c.start,
771+
MaxTime: c.end,
772+
Matchers: c.matchers,
773+
SkipChunks: c.skipChunks,
774+
WithoutReplicaLabels: []string{"replica"},
771775
}, srv)
772776
if c.expectErr != nil {
773777
testutil.NotOk(t, err)
@@ -882,23 +886,24 @@ func TestBucketStore_Acceptance(t *testing.T) {
882886
tt.Skip("Bucket Store cannot handle empty HEAD")
883887
}
884888

885-
id := createBlockFromHead(tt, auxDir, h)
889+
for _, replica := range []string{"r1", "r2"} {
890+
id := createBlockFromHead(tt, auxDir, h)
886891

887-
auxBlockDir := filepath.Join(auxDir, id.String())
888-
meta, err := metadata.ReadFromDir(auxBlockDir)
889-
testutil.Ok(t, err)
890-
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
891-
testutil.Ok(t, err)
892-
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
893-
Labels: extLset.Map(),
894-
Downsample: metadata.ThanosDownsample{Resolution: 0},
895-
Source: metadata.TestSource,
896-
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
897-
}, nil)
898-
testutil.Ok(tt, err)
899-
900-
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
901-
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
892+
auxBlockDir := filepath.Join(auxDir, id.String())
893+
meta, err := metadata.ReadFromDir(auxBlockDir)
894+
testutil.Ok(t, err)
895+
stats, err := block.GatherIndexHealthStats(ctx, logger, filepath.Join(auxBlockDir, block.IndexFilename), meta.MinTime, meta.MaxTime)
896+
testutil.Ok(t, err)
897+
_, err = metadata.InjectThanos(log.NewNopLogger(), auxBlockDir, metadata.Thanos{
898+
Labels: labels.NewBuilder(extLset).Set("replica", replica).Labels().Map(),
899+
Downsample: metadata.ThanosDownsample{Resolution: 0},
900+
Source: metadata.TestSource,
901+
IndexStats: metadata.IndexStats{SeriesMaxSize: stats.SeriesMaxSize, ChunkMaxSize: stats.ChunkMaxSize},
902+
}, nil)
903+
testutil.Ok(tt, err)
904+
905+
testutil.Ok(tt, block.Upload(ctx, logger, bkt, auxBlockDir, metadata.NoneFunc))
906+
}
902907

903908
chunkPool, err := NewDefaultChunkBytesPool(2e5)
904909
testutil.Ok(tt, err)
@@ -1026,3 +1031,34 @@ func TestProxyStoreWithTSDBSelector_Acceptance(t *testing.T) {
10261031

10271032
testStoreAPIsAcceptance(t, startStore)
10281033
}
1034+
1035+
func TestProxyStoreWithReplicas_Acceptance(t *testing.T) {
1036+
t.Cleanup(func() { custom.TolerantVerifyLeak(t) })
1037+
1038+
startStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
1039+
startNestedStore := func(tt *testing.T, extLset labels.Labels, appendFn func(app storage.Appender)) storepb.StoreServer {
1040+
db, err := e2eutil.NewTSDB()
1041+
testutil.Ok(tt, err)
1042+
tt.Cleanup(func() { testutil.Ok(tt, db.Close()) })
1043+
appendFn(db.Appender(context.Background()))
1044+
1045+
return NewTSDBStore(nil, db, component.Rule, extLset)
1046+
1047+
}
1048+
1049+
extLset1 := labels.NewBuilder(extLset).Set("replica", "r1").Labels()
1050+
extLset2 := labels.NewBuilder(extLset).Set("replica", "r2").Labels()
1051+
1052+
p1 := startNestedStore(tt, extLset1, appendFn)
1053+
p2 := startNestedStore(tt, extLset2, appendFn)
1054+
1055+
clients := []Client{
1056+
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p1), ExtLset: []labels.Labels{extLset1}},
1057+
storetestutil.TestClient{StoreClient: storepb.ServerAsClient(p2), ExtLset: []labels.Labels{extLset2}},
1058+
}
1059+
1060+
return NewProxyStore(nil, nil, func() []Client { return clients }, component.Query, labels.EmptyLabels(), 0*time.Second, RetrievalStrategy(EagerRetrieval))
1061+
}
1062+
1063+
testStoreAPIsAcceptance(t, startStore)
1064+
}

pkg/store/bucket.go

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1759,6 +1759,12 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
17591759
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request hints labels matchers").Error())
17601760
}
17611761
}
1762+
extLsetToRemove := make(map[string]struct{})
1763+
if len(req.WithoutReplicaLabels) > 0 {
1764+
for _, l := range req.WithoutReplicaLabels {
1765+
extLsetToRemove[l] = struct{}{}
1766+
}
1767+
}
17621768

17631769
g, gctx := errgroup.WithContext(ctx)
17641770

@@ -1815,15 +1821,18 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
18151821
// b.extLset is already sorted by label name, no need to sort it again.
18161822
extRes := make([]string, 0, b.extLset.Len())
18171823
b.extLset.Range(func(l labels.Label) {
1818-
extRes = append(extRes, l.Name)
1824+
if _, ok := extLsetToRemove[l.Name]; !ok {
1825+
extRes = append(extRes, l.Name)
1826+
}
18191827
})
18201828

18211829
result = strutil.MergeSlices(res, extRes)
18221830
} else {
18231831
seriesReq := &storepb.SeriesRequest{
1824-
MinTime: req.Start,
1825-
MaxTime: req.End,
1826-
SkipChunks: true,
1832+
MinTime: req.Start,
1833+
MaxTime: req.End,
1834+
SkipChunks: true,
1835+
WithoutReplicaLabels: req.WithoutReplicaLabels,
18271836
}
18281837
blockClient := newBlockSeriesClient(
18291838
newCtx,
@@ -1840,7 +1849,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
18401849
s.metrics.seriesFetchDurationSum,
18411850
nil,
18421851
nil,
1843-
nil,
1852+
extLsetToRemove,
18441853
s.enabledLazyExpandedPostings,
18451854
s.metrics.lazyExpandedPostingsCount,
18461855
s.metrics.lazyExpandedPostingSizeBytes,
@@ -1942,6 +1951,11 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
19421951
if err != nil {
19431952
return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "translate request labels matchers").Error())
19441953
}
1954+
for i := range req.WithoutReplicaLabels {
1955+
if req.Label == req.WithoutReplicaLabels[i] {
1956+
return &storepb.LabelValuesResponse{}, nil
1957+
}
1958+
}
19451959

19461960
tenant, _ := tenancy.GetTenantFromGRPCMetadata(ctx)
19471961

@@ -2026,9 +2040,10 @@ func (s *BucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesR
20262040
result = res
20272041
} else {
20282042
seriesReq := &storepb.SeriesRequest{
2029-
MinTime: req.Start,
2030-
MaxTime: req.End,
2031-
SkipChunks: true,
2043+
MinTime: req.Start,
2044+
MaxTime: req.End,
2045+
SkipChunks: true,
2046+
WithoutReplicaLabels: req.WithoutReplicaLabels,
20322047
}
20332048
blockClient := newBlockSeriesClient(
20342049
newCtx,

pkg/store/prometheus.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -606,9 +606,16 @@ func (p *PrometheusStore) LabelNames(ctx context.Context, r *storepb.LabelNamesR
606606
}
607607
}
608608

609+
extLsetToRemove := map[string]struct{}{}
610+
for _, lbl := range r.WithoutReplicaLabels {
611+
extLsetToRemove[lbl] = struct{}{}
612+
}
613+
609614
if len(lbls) > 0 {
610615
extLset.Range(func(l labels.Label) {
611-
lbls = append(lbls, l.Name)
616+
if _, ok := extLsetToRemove[l.Name]; !ok {
617+
lbls = append(lbls, l.Name)
618+
}
612619
})
613620
sort.Strings(lbls)
614621
}
@@ -621,6 +628,11 @@ func (p *PrometheusStore) LabelValues(ctx context.Context, r *storepb.LabelValue
621628
if r.Label == "" {
622629
return nil, status.Error(codes.InvalidArgument, "label name parameter cannot be empty")
623630
}
631+
for i := range r.WithoutReplicaLabels {
632+
if r.Label == r.WithoutReplicaLabels[i] {
633+
return &storepb.LabelValuesResponse{}, nil
634+
}
635+
}
624636

625637
extLset := p.externalLabelsFn()
626638

pkg/store/proxy.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -600,6 +600,7 @@ func (s *ProxyStore) LabelNames(ctx context.Context, r *storepb.LabelNamesReques
600600
Start: r.Start,
601601
End: r.End,
602602
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
603+
WithoutReplicaLabels: r.WithoutReplicaLabels,
603604
})
604605
if err != nil {
605606
err = errors.Wrapf(err, "fetch label names from store %s", st)
@@ -702,6 +703,7 @@ func (s *ProxyStore) LabelValues(ctx context.Context, r *storepb.LabelValuesRequ
702703
Start: r.Start,
703704
End: r.End,
704705
Matchers: append(r.Matchers, MatchersForLabelSets(extraMatchers)...),
706+
WithoutReplicaLabels: r.WithoutReplicaLabels,
705707
})
706708
if err != nil {
707709
msg := "fetch label values from store %s"

0 commit comments

Comments
 (0)