Skip to content

Commit 7485f94

Browse files
committed
Add inflight request limit not checked test
Signed-off-by: SungJin1212 <[email protected]>
1 parent b025cc0 commit 7485f94

File tree

2 files changed

+94
-24
lines changed

2 files changed

+94
-24
lines changed

pkg/storegateway/bucket_stores_test.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"time"
1818

1919
"github.com/go-kit/log"
20+
"github.com/gogo/protobuf/types"
2021
"github.com/gogo/status"
2122
"github.com/oklog/ulid/v2"
2223
"github.com/prometheus/client_golang/prometheus"
@@ -32,6 +33,7 @@ import (
3233
"github.com/thanos-io/thanos/pkg/block"
3334
thanos_metadata "github.com/thanos-io/thanos/pkg/block/metadata"
3435
"github.com/thanos-io/thanos/pkg/store"
36+
"github.com/thanos-io/thanos/pkg/store/hintspb"
3537
"github.com/thanos-io/thanos/pkg/store/labelpb"
3638
"github.com/thanos-io/thanos/pkg/store/storepb"
3739
"github.com/weaveworks/common/logging"
@@ -726,7 +728,27 @@ func generateStorageBlock(t *testing.T, storageDir, userID string, metricName st
726728
require.NoError(t, db.Snapshot(userDir, true))
727729
}
728730

729-
func querySeries(stores BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) {
731+
func querySeries(stores BucketStores, userID, metricName string, minT, maxT int64, blockIDs ...string) ([]*storepb.Series, annotations.Annotations, error) {
732+
var (
733+
anyHints *types.Any
734+
err error
735+
)
736+
if len(blockIDs) > 0 {
737+
hints := &hintspb.SeriesRequestHints{
738+
BlockMatchers: []storepb.LabelMatcher{
739+
{
740+
Type: storepb.LabelMatcher_RE,
741+
Name: block.BlockIDLabel,
742+
Value: strings.Join(blockIDs, "|"),
743+
},
744+
},
745+
}
746+
anyHints, err = types.MarshalAny(hints)
747+
if err != nil {
748+
return nil, nil, err
749+
}
750+
}
751+
730752
req := &storepb.SeriesRequest{
731753
MinTime: minT,
732754
MaxTime: maxT,
@@ -736,11 +758,12 @@ func querySeries(stores BucketStores, userID, metricName string, minT, maxT int6
736758
Value: metricName,
737759
}},
738760
PartialResponseStrategy: storepb.PartialResponseStrategy_ABORT,
761+
Hints: anyHints,
739762
}
740763

741764
ctx := setUserIDToGRPCContext(context.Background(), userID)
742765
srv := newBucketStoreSeriesServer(ctx)
743-
err := stores.Series(req, srv)
766+
err = stores.Series(req, srv)
744767

745768
return srv.SeriesSet, srv.Warnings, err
746769
}

pkg/storegateway/parquet_bucket_stores_test.go

Lines changed: 69 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@ package storegateway
33
import (
44
"context"
55
"errors"
6+
"fmt"
7+
"os"
8+
"path/filepath"
69
"testing"
710

811
"github.com/go-kit/log"
12+
"github.com/oklog/ulid"
13+
"github.com/prometheus-community/parquet-common/convert"
914
"github.com/prometheus/client_golang/prometheus"
1015
"github.com/prometheus/prometheus/model/labels"
16+
"github.com/prometheus/prometheus/tsdb"
1117
"github.com/prometheus/prometheus/tsdb/chunkenc"
1218
"github.com/stretchr/testify/assert"
1319
"github.com/stretchr/testify/require"
@@ -302,25 +308,66 @@ func TestParquetBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReach
302308
assert.Empty(t, warnings)
303309
}
304310

305-
//func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) {
306-
// cfg := prepareStorageConfig(t)
307-
// cfg.BucketStore.BucketStoreType = string(ParquetBucketStore)
308-
// reg := prometheus.NewPedanticRegistry()
309-
// storageDir := t.TempDir()
310-
// generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15)
311-
// bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
312-
// require.NoError(t, err)
313-
//
314-
// stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
315-
// require.NoError(t, err)
316-
// require.NoError(t, stores.InitialSync(context.Background()))
317-
//
318-
// parquetStores := stores.(*ParquetBucketStores)
319-
// // Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled)
320-
// for i := 0; i < 10; i++ {
321-
// parquetStores.inflightRequests.Inc()
322-
// }
323-
// series, _, err := querySeriesWithBlockIDs(stores, "user_id", "series_1", 0, 100)
324-
// require.NoError(t, err)
325-
// assert.Equal(t, 1, len(series))
326-
//}
311+
func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) {
312+
cfg := prepareStorageConfig(t)
313+
cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore)
314+
reg := prometheus.NewPedanticRegistry()
315+
storageDir := t.TempDir()
316+
userId := "user_id"
317+
generateStorageBlock(t, storageDir, userId, "series_1", 0, 100, 15)
318+
bkt, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir})
319+
require.NoError(t, err)
320+
321+
stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bkt), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg)
322+
require.NoError(t, err)
323+
require.NoError(t, stores.InitialSync(context.Background()))
324+
325+
parquetStores := stores.(*ParquetBucketStores)
326+
// Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled)for range 10 {
327+
for range 10 {
328+
parquetStores.inflightRequests.Inc()
329+
}
330+
331+
userPath := fmt.Sprintf("%s/%s", storageDir, userId)
332+
333+
limits := validation.Limits{}
334+
overrides := validation.NewOverrides(limits, nil)
335+
uBucket := bucket.NewUserBucketClient(userId, bkt, overrides)
336+
blockIds, err := convertToParquetBlocksForTesting(userPath, uBucket)
337+
require.NoError(t, err)
338+
339+
series, _, err := querySeries(stores, userId, "series_1", 0, 100, blockIds...)
340+
require.NoError(t, err)
341+
assert.Equal(t, 1, len(series))
342+
}
343+
344+
func convertToParquetBlocksForTesting(userPath string, userBkt objstore.InstrumentedBucket) ([]string, error) {
345+
var blockIDs []string
346+
347+
pool := chunkenc.NewPool()
348+
userDir, err := os.ReadDir(userPath)
349+
if err != nil {
350+
return nil, err
351+
}
352+
353+
for _, file := range userDir {
354+
_, err := ulid.Parse(file.Name())
355+
if err != nil {
356+
continue
357+
}
358+
blockIDs = append(blockIDs, file.Name())
359+
bdir := filepath.Join(userPath, file.Name())
360+
361+
tsdbBlock, err := tsdb.OpenBlock(nil, bdir, pool, tsdb.DefaultPostingsDecoderFactory)
362+
if err != nil {
363+
return nil, err
364+
}
365+
converterOptions := []convert.ConvertOption{convert.WithName(file.Name())}
366+
_, err = convert.ConvertTSDBBlock(context.Background(), userBkt, tsdbBlock.MinTime(), tsdbBlock.MaxTime(), []convert.Convertible{tsdbBlock}, converterOptions...)
367+
if err != nil {
368+
return nil, err
369+
}
370+
_ = tsdbBlock.Close()
371+
}
372+
return blockIDs, nil
373+
}

0 commit comments

Comments
 (0)