diff --git a/docs/blocks-storage/querier.md b/docs/blocks-storage/querier.md index 855ff5c9028..be429ba38d6 100644 --- a/docs/blocks-storage/querier.md +++ b/docs/blocks-storage/querier.md @@ -1729,6 +1729,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy [block_discovery_strategy: | default = "concurrent"] + # Type of bucket store to use (tsdb or parquet). + # CLI flag: -blocks-storage.bucket-store.bucket-store-type + [bucket_store_type: | default = "tsdb"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/blocks-storage/store-gateway.md b/docs/blocks-storage/store-gateway.md index 506cf0f32a1..2c21c8bd6cb 100644 --- a/docs/blocks-storage/store-gateway.md +++ b/docs/blocks-storage/store-gateway.md @@ -311,13 +311,13 @@ store_gateway: # Minimum time to wait for ring stability at startup. 0 to disable. # CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration - [wait_stability_min_duration: | default = 1m] + [wait_stability_min_duration: | default = 0s] # Maximum time to wait for ring stability at startup. If the store-gateway # ring keeps changing after this period of time, the store-gateway will # start anyway. # CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration - [wait_stability_max_duration: | default = 5m] + [wait_stability_max_duration: | default = 5s] # Timeout for waiting on store-gateway to become desired state in the ring. # CLI flag: -store-gateway.sharding-ring.wait-instance-state-timeout @@ -1815,6 +1815,10 @@ blocks_storage: # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy [block_discovery_strategy: | default = "concurrent"] + # Type of bucket store to use (tsdb or parquet). + # CLI flag: -blocks-storage.bucket-store.bucket-store-type + [bucket_store_type: | default = "tsdb"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes diff --git a/docs/configuration/config-file-reference.md b/docs/configuration/config-file-reference.md index a642c4f3241..9dfb2f00a68 100644 --- a/docs/configuration/config-file-reference.md +++ b/docs/configuration/config-file-reference.md @@ -2396,6 +2396,10 @@ bucket_store: # CLI flag: -blocks-storage.bucket-store.block-discovery-strategy [block_discovery_strategy: | default = "concurrent"] + # Type of bucket store to use (tsdb or parquet). + # CLI flag: -blocks-storage.bucket-store.bucket-store-type + [bucket_store_type: | default = "tsdb"] + # Max size - in bytes - of a chunks pool, used to reduce memory allocations. # The pool is shared across all tenants. 0 to disable the limit. # CLI flag: -blocks-storage.bucket-store.max-chunk-pool-bytes @@ -6458,13 +6462,13 @@ sharding_ring: # Minimum time to wait for ring stability at startup. 0 to disable. # CLI flag: -store-gateway.sharding-ring.wait-stability-min-duration - [wait_stability_min_duration: | default = 1m] + [wait_stability_min_duration: | default = 0s] # Maximum time to wait for ring stability at startup. If the store-gateway # ring keeps changing after this period of time, the store-gateway will start # anyway. # CLI flag: -store-gateway.sharding-ring.wait-stability-max-duration - [wait_stability_max_duration: | default = 5m] + [wait_stability_max_duration: | default = 5s] # Timeout for waiting on store-gateway to become desired state in the ring. # CLI flag: -store-gateway.sharding-ring.wait-instance-state-timeout diff --git a/integration/e2ecortex/services.go b/integration/e2ecortex/services.go index 727c60dd11a..3ef57e7cb9a 100644 --- a/integration/e2ecortex/services.go +++ b/integration/e2ecortex/services.go @@ -204,6 +204,41 @@ func NewIngesterWithConfigFile(name string, store RingStore, address, configFile ) } +func NewParquetConverter(name string, store RingStore, address string, flags map[string]string, image string) *CortexService { + return NewParquetConverterWithConfigFile(name, store, address, "", flags, image) +} + +func NewParquetConverterWithConfigFile(name string, store RingStore, address, configFile string, flags map[string]string, image string) *CortexService { + if configFile != "" { + flags["-config.file"] = filepath.Join(e2e.ContainerSharedDir, configFile) + } + + // Configure the ingesters ring backend + flags["-ring.store"] = string(store) + switch store { + case RingStoreConsul: + flags["-consul.hostname"] = address + case RingStoreEtcd: + flags["-etcd.endpoints"] = address + } + + if image == "" { + image = GetDefaultImage() + } + + return NewCortexService( + name, + image, + e2e.NewCommandWithoutEntrypoint("cortex", e2e.BuildArgs(e2e.MergeFlags(map[string]string{ + "-target": "parquet-converter", + "-log.level": "warn", + }, flags))...), + e2e.NewHTTPReadinessProbe(httpPort, "/ready", 200, 299), + httpPort, + grpcPort, + ) +} + func NewQueryFrontend(name string, flags map[string]string, image string) *CortexService { return NewQueryFrontendWithConfigFile(name, "", flags, image) } diff --git a/integration/parquet_gateway_test.go b/integration/parquet_gateway_test.go new file mode 100644 index 00000000000..9446ff59893 --- /dev/null +++ b/integration/parquet_gateway_test.go @@ -0,0 +1,592 @@ +//go:build integration_querier +// +build integration_querier + +package integration + +import ( + "context" + "fmt" + "math/rand" + "path/filepath" + "strconv" + "strings" + "testing" + "time" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/prompb" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/block/metadata" + + "github.com/cortexproject/cortex/integration/e2e" + e2ecache "github.com/cortexproject/cortex/integration/e2e/cache" + e2edb "github.com/cortexproject/cortex/integration/e2e/db" + "github.com/cortexproject/cortex/integration/e2ecortex" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/log" + cortex_testutil "github.com/cortexproject/cortex/pkg/util/test" +) + +func TestParquetGatewayWithBlocksStorageRunningInMicroservicesMode(t *testing.T) { + tests := map[string]struct { + blocksShardingStrategy string // Empty means sharding is disabled. + tenantShardSize int + parquetLabelsCache string + chunkCacheBackend string + bucketIndexEnabled bool + }{ + "blocks sharding disabled, memcached parquet label cache, memcached chunks cache": { + blocksShardingStrategy: "", + parquetLabelsCache: tsdb.CacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding disabled, multilevel parquet label cache (inmemory, memcached)": { + blocksShardingStrategy: "", + parquetLabelsCache: fmt.Sprintf("%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached), + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding disabled, redis parquet label cache, redis chunks cache": { + blocksShardingStrategy: "", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks sharding disabled, multilevel parquet label cache cache (inmemory, redis), redis chunks cache": { + blocksShardingStrategy: "", + parquetLabelsCache: fmt.Sprintf("%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendRedis), + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks default sharding, inmemory parquet label cache": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendInMemory, + }, + "blocks default sharding, memcached parquet label cache, memcached chunks cache": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks shuffle sharding, memcached parquet label cache, memcached chunks cache": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + parquetLabelsCache: tsdb.CacheBackendMemcached, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks default sharding, inmemory parquet label cache, bucket index enabled": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, memcached parquet label cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + parquetLabelsCache: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks default sharding, redis parquet label cache, redis chunks cache, bucket index enabled": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, redis parquet label cache, redis chunks cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, + }, + "blocks sharding disabled, redis parquet label cache, in-memory chunks cache, bucket index enabled": { + blocksShardingStrategy: "", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks default sharding, redis parquet label cache, in-memory chunk cache": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "blocks shuffle sharding, redis parquet label cache, in-memory chunk cache, bucket index enabled": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: tsdb.CacheBackendInMemory, + bucketIndexEnabled: true, + }, + "block sharding disabled, redis parquet label cache, multi-level chunk cache (in-memory, memcached, redis), bucket index enabled": { + blocksShardingStrategy: "", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block default sharding, redis parquet label cache, multi-level chunk cache (in-memory, memcached, redis), bucket index enabled": { + blocksShardingStrategy: "default", + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + "block shuffle sharding, redis parquet label cache, multi-level chunk cache ((in-memory, memcached, redis), bucket index enabled)": { + blocksShardingStrategy: "shuffle-sharding", + tenantShardSize: 1, + parquetLabelsCache: tsdb.CacheBackendRedis, + chunkCacheBackend: fmt.Sprintf("%v,%v,%v", tsdb.CacheBackendInMemory, tsdb.CacheBackendMemcached, tsdb.CacheBackendRedis), + bucketIndexEnabled: true, + }, + } + + for testName, testCfg := range tests { + t.Run(testName, func(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + memcached := e2ecache.NewMemcached() + redis := e2ecache.NewRedis() + require.NoError(t, s.StartAndWaitReady(consul, memcached, redis)) + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags(BlocksStorageFlags(), map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.parquet-labels-cache.backend": testCfg.parquetLabelsCache, + "-blocks-storage.bucket-store.chunks-cache.backend": testCfg.chunkCacheBackend, + "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingStrategy != ""), + "-store-gateway.sharding-strategy": testCfg.blocksShardingStrategy, + "-store-gateway.tenant-shard-size": fmt.Sprintf("%d", testCfg.tenantShardSize), + "-querier.query-store-for-labels-enabled": "true", + // Enable parquet converter + "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + // compactor + "-compactor.cleanup-interval": "1s", + "-compactor.block-ranges": "1ms,12h", // to convert all blocks to parquet blocks + }) + + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(minio)) + + // Add the cache address to the flags. + if strings.Contains(testCfg.parquetLabelsCache, tsdb.CacheBackendMemcached) { + flags["-blocks-storage.bucket-store.parquet-labels-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + } + if strings.Contains(testCfg.parquetLabelsCache, tsdb.CacheBackendRedis) { + flags["-blocks-storage.bucket-store.parquet-labels-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) { + flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) { + flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + + // Start Cortex components. + distributor := e2ecortex.NewDistributor("distributor", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + ingester := e2ecortex.NewIngester("ingester", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway1 := e2ecortex.NewStoreGateway("store-gateway-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateway2 := e2ecortex.NewStoreGateway("store-gateway-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + storeGateways := e2ecortex.NewCompositeCortexService(storeGateway1, storeGateway2) + require.NoError(t, s.StartAndWaitReady(distributor, ingester, parquetConverter, storeGateway1, storeGateway2)) + + // Start the querier with configuring store-gateway addresses if sharding is disabled. + if testCfg.blocksShardingStrategy == "" { + flags = mergeFlags(flags, map[string]string{ + "-querier.store-gateway-addresses": strings.Join([]string{storeGateway1.NetworkGRPCEndpoint(), storeGateway2.NetworkGRPCEndpoint()}, ","), + }) + } + querier := e2ecortex.NewQuerier("querier", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(querier)) + + if testCfg.bucketIndexEnabled { + // Start the compactor to have the bucket index created before querying. + compactor := e2ecortex.NewCompactor("compactor", consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(compactor)) + } + + // Wait until both the distributor and querier have updated the ring. The querier will also watch + // the store-gateway ring if blocks sharding is enabled. + require.NoError(t, distributor.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + if testCfg.blocksShardingStrategy != "" { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(float64(512+(512*storeGateways.NumInstances()))), "cortex_ring_tokens_total")) + } else { + require.NoError(t, querier.WaitSumMetrics(e2e.Equals(512), "cortex_ring_tokens_total")) + } + + c, err := e2ecortex.NewClient(distributor.HTTPEndpoint(), querier.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Prepare test data similar to parquet_querier_test.go + ctx := context.Background() + rnd := rand.New(rand.NewSource(time.Now().Unix())) + dir := filepath.Join(s.SharedDir(), "data") + numSeries := 10 + numSamples := 60 + lbls := make([]labels.Labels, 0, numSeries*2) + scrapeInterval := time.Minute + statusCodes := []string{"200", "400", "404", "500", "502"} + now := time.Now() + start := now.Add(-time.Hour * 24) + end := now.Add(-time.Hour) + + for i := 0; i < numSeries; i++ { + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_a", "job", "test", "series", strconv.Itoa(i%3), "status_code", statusCodes[i%5])) + lbls = append(lbls, labels.FromStrings(labels.MetricName, "test_series_b", "job", "test", "series", strconv.Itoa((i+1)%3), "status_code", statusCodes[(i+1)%5])) + } + + // Create a block with test data + id, err := e2e.CreateBlock(ctx, rnd, dir, lbls, numSamples, start.UnixMilli(), end.UnixMilli(), scrapeInterval.Milliseconds(), 10) + require.NoError(t, err) + + // Upload the block to storage + storage, err := e2ecortex.NewS3ClientForMinio(minio, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, err) + bkt := bucket.NewUserBucketClient("user-1", storage.GetBucket(), nil) + + err = block.Upload(ctx, log.Logger, bkt, filepath.Join(dir, id.String()), metadata.NoneFunc) + require.NoError(t, err) + + // Push some series to Cortex for real-time data + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2nd series is in the head. + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series_removed_total")) + + // Push another series to further compact another block and delete the first block + // due to expired retention. + series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2) + series3, expectedVector3 := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(1), "cortex_ingester_memory_series")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(3), "cortex_ingester_memory_series_created_total")) + require.NoError(t, ingester.WaitSumMetrics(e2e.Equals(2), "cortex_ingester_memory_series_removed_total")) + + if testCfg.bucketIndexEnabled { + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + foundBucketIndex := false + + err := bkt.Iter(context.Background(), "", func(name string) error { + if name == "bucket-index.json.gz" { + foundBucketIndex = true + } + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return foundBucketIndex + }) + } + + // Wait until we convert the blocks to parquet + cortex_testutil.Poll(t, 30*time.Second, true, func() interface{} { + found := false + + err := bkt.Iter(context.Background(), "", func(name string) error { + if name == fmt.Sprintf("parquet-markers/%v-parquet-converter-mark.json", id.String()) { + found = true + } + + return nil + }, objstore.WithRecursiveIter()) + require.NoError(t, err) + return found + }) + + // Check how many tenants have been discovered and synced by store-gateways. + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_discovered")) + if testCfg.blocksShardingStrategy == "shuffle-sharding" { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1)), "cortex_bucket_stores_tenants_synced")) + } else { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Equals(float64(1*storeGateways.NumInstances())), "cortex_bucket_stores_tenants_synced")) + } + + // Wait until the parquet-converter convert blocks + require.NoError(t, parquetConverter.WaitSumMetricsWithOptions(e2e.Equals(float64(3)), []string{"cortex_parquet_converter_blocks_converted_total"}, e2e.WaitMissingMetrics)) + + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). + result, err := c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + result, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector2, result.(model.Vector)) + + result, err = c.Query("series_3", series3Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector3, result.(model.Vector)) + + // Query the pre-uploaded test data + result, err = c.QueryRange("test_series_a", start, end, scrapeInterval) + require.NoError(t, err) + require.Equal(t, model.ValMatrix, result.Type()) + // Should have some results from the pre-uploaded data + assert.Greater(t, len(result.(model.Matrix)), 0) + + if strings.Contains(testCfg.parquetLabelsCache, tsdb.CacheBackendInMemory) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_inmemory_requests_total")) + } + if strings.Contains(testCfg.parquetLabelsCache, tsdb.CacheBackendMemcached) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_memcached_operations_total")) + } + if strings.Contains(testCfg.parquetLabelsCache, tsdb.CacheBackendRedis) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_redis_requests_total")) + } + + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendInMemory) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_inmemory_requests_total")) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendMemcached) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_memcached_operations_total")) + } + if strings.Contains(testCfg.chunkCacheBackend, tsdb.CacheBackendRedis) { + require.NoError(t, storeGateways.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_redis_requests_total")) + } + + // Query metadata. + testMetadataQueriesWithBlocksStorage(t, c, series1[0], series2[0], series3[0], blockRangePeriod) + + // Ensure no service-specific metrics prefix is used by the wrong service. + assertServiceMetricsPrefixes(t, Distributor, distributor) + assertServiceMetricsPrefixes(t, Ingester, ingester) + assertServiceMetricsPrefixes(t, Querier, querier) + assertServiceMetricsPrefixes(t, StoreGateway, storeGateway1) + assertServiceMetricsPrefixes(t, StoreGateway, storeGateway2) + }) + } +} + +func TestParquetGatewayWithBlocksStorageRunningInSingleBinaryMode(t *testing.T) { + tests := map[string]struct { + blocksShardingEnabled bool + chunkCacheBackend string + bucketIndexEnabled bool + }{ + "blocks sharding enabled, inmemory chunks cache": { + blocksShardingEnabled: true, + chunkCacheBackend: tsdb.CacheBackendInMemory, + }, + "blocks sharding disabled, memcached chunks cache": { + blocksShardingEnabled: false, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding enabled, memcached chunks cache": { + blocksShardingEnabled: true, + chunkCacheBackend: tsdb.CacheBackendMemcached, + }, + "blocks sharding enabled, memcached chunk cache, bucket index enabled": { + blocksShardingEnabled: true, + chunkCacheBackend: tsdb.CacheBackendMemcached, + bucketIndexEnabled: true, + }, + "blocks sharding disabled, redis chunks cache": { + blocksShardingEnabled: false, + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks sharding enabled, redis chunks cache": { + blocksShardingEnabled: true, + chunkCacheBackend: tsdb.CacheBackendRedis, + }, + "blocks sharding enabled, redis chunks cache, bucket index enabled": { + blocksShardingEnabled: true, + chunkCacheBackend: tsdb.CacheBackendRedis, + bucketIndexEnabled: true, + }, + } + + for testName, testCfg := range tests { + t.Run(testName, func(t *testing.T) { + const blockRangePeriod = 5 * time.Second + + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, bucketName) + memcached := e2ecache.NewMemcached() + redis := e2ecache.NewRedis() + require.NoError(t, s.StartAndWaitReady(consul, minio, memcached, redis)) + + // Setting the replication factor equal to the number of Cortex replicas + // make sure each replica creates the same blocks, so the total number of + // blocks is stable and easy to assert on. + const seriesReplicationFactor = 2 + + // Configure the blocks storage to frequently compact TSDB head + // and ship blocks to the storage. + flags := mergeFlags( + BlocksStorageFlags(), + AlertmanagerLocalFlags(), + map[string]string{ + "-blocks-storage.tsdb.block-ranges-period": blockRangePeriod.String(), + "-blocks-storage.tsdb.ship-interval": "1s", + "-blocks-storage.bucket-store.sync-interval": "1s", + "-blocks-storage.tsdb.retention-period": ((blockRangePeriod * 2) - 1).String(), + "-blocks-storage.bucket-store.chunks-cache.backend": testCfg.chunkCacheBackend, + "-blocks-storage.bucket-store.bucket-index.enabled": strconv.FormatBool(testCfg.bucketIndexEnabled), + "-blocks-storage.bucket-store.bucket-store-type": "parquet", + "-querier.query-store-for-labels-enabled": "true", + // Enable parquet converter + "-parquet-converter.enabled": "true", + "-parquet-converter.conversion-interval": "1s", + "-parquet-converter.ring.consul.hostname": consul.NetworkHTTPEndpoint(), + // Ingester. + "-ring.store": "consul", + "-consul.hostname": consul.NetworkHTTPEndpoint(), + // Distributor. + "-distributor.replication-factor": strconv.FormatInt(seriesReplicationFactor, 10), + // Store-gateway. + "-store-gateway.sharding-enabled": strconv.FormatBool(testCfg.blocksShardingEnabled), + "-store-gateway.sharding-ring.store": "consul", + "-store-gateway.sharding-ring.consul.hostname": consul.NetworkHTTPEndpoint(), + "-store-gateway.sharding-ring.replication-factor": "1", + // alert manager + "-alertmanager.web.external-url": "http://localhost/alertmanager", + // compactor + "-compactor.cleanup-interval": "1s", + "-compactor.block-ranges": "1ms,12h", // to convert all blocks to parquet blocks + }, + ) + require.NoError(t, writeFileToSharedDir(s, "alertmanager_configs/user-1.yaml", []byte(cortexAlertmanagerUserConfigYaml))) + + // Add the cache address to the flags. + switch testCfg.chunkCacheBackend { + case tsdb.CacheBackendMemcached: + flags["-blocks-storage.bucket-store.chunks-cache.memcached.addresses"] = "dns+" + memcached.NetworkEndpoint(e2ecache.MemcachedPort) + case tsdb.CacheBackendRedis: + flags["-blocks-storage.bucket-store.chunks-cache.redis.addresses"] = redis.NetworkEndpoint(e2ecache.RedisPort) + } + + // Start Cortex replicas. + cortex1 := e2ecortex.NewSingleBinary("cortex-1", flags, "") + cortex2 := e2ecortex.NewSingleBinary("cortex-2", flags, "") + cluster := e2ecortex.NewCompositeCortexService(cortex1, cortex2) + require.NoError(t, s.StartAndWaitReady(cortex1, cortex2)) + + parquetConverter := e2ecortex.NewParquetConverter("parquet-converter", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flags, "") + require.NoError(t, s.StartAndWaitReady(parquetConverter)) + + // Wait until Cortex replicas have updated the ring state. + for _, replica := range cluster.Instances() { + numTokensPerInstance := 512 // Ingesters ring. + if testCfg.blocksShardingEnabled { + numTokensPerInstance += 512 * 2 // Store-gateway ring (read both by the querier and store-gateway). + } + require.NoError(t, replica.WaitSumMetrics(e2e.Equals(float64((numTokensPerInstance)*cluster.NumInstances())), "cortex_ring_tokens_total")) + } + + c, err := e2ecortex.NewClient(cortex1.HTTPEndpoint(), cortex2.HTTPEndpoint(), "", "", "user-1") + require.NoError(t, err) + + // Push some series to Cortex. + series1Timestamp := time.Now() + series2Timestamp := series1Timestamp.Add(blockRangePeriod * 2) + series1, expectedVector1 := generateSeries("series_1", series1Timestamp, prompb.Label{Name: "series_1", Value: "series_1"}) + series2, expectedVector2 := generateSeries("series_2", series2Timestamp, prompb.Label{Name: "series_2", Value: "series_2"}) + + res, err := c.Push(series1) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + res, err = c.Push(series2) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // Wait until the TSDB head is compacted and shipped to the storage. + // The shipped block contains the 1st series, while the 2nd series is in the head. + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_created_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) + + // Push another series to further compact another block and delete the first block + // due to expired retention. + series3Timestamp := series2Timestamp.Add(blockRangePeriod * 2) + series3, expectedVector3 := generateSeries("series_3", series3Timestamp, prompb.Label{Name: "series_3", Value: "series_3"}) + + res, err = c.Push(series3) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_shipper_uploads_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(1*cluster.NumInstances())), "cortex_ingester_memory_series")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(3*cluster.NumInstances())), "cortex_ingester_memory_series_created_total")) + require.NoError(t, cluster.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_ingester_memory_series_removed_total")) + + // Wait until the parquet-converter convert blocks + require.NoError(t, parquetConverter.WaitSumMetrics(e2e.Equals(float64(2*cluster.NumInstances())), "cortex_parquet_converter_blocks_converted_total")) + + // Query back the series (1 only in the storage, 1 only in the ingesters, 1 on both). + result, err := c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + result, err = c.Query("series_2", series2Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector2, result.(model.Vector)) + + result, err = c.Query("series_3", series3Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector3, result.(model.Vector)) + + // Query back again the 1st series from storage. + result, err = c.Query("series_1", series1Timestamp) + require.NoError(t, err) + require.Equal(t, model.ValVector, result.Type()) + assert.Equal(t, expectedVector1, result.(model.Vector)) + + switch testCfg.chunkCacheBackend { + case tsdb.CacheBackendInMemory: + require.NoError(t, cluster.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_inmemory_requests_total")) + case tsdb.CacheBackendMemcached: + require.NoError(t, cluster.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_memcached_operations_total")) + case tsdb.CacheBackendRedis: + require.NoError(t, cluster.WaitSumMetrics(e2e.Greater(float64(0)), "thanos_cache_redis_requests_total")) + } + + // Query metadata. + testMetadataQueriesWithBlocksStorage(t, c, series1[0], series2[0], series3[0], blockRangePeriod) + }) + } +} diff --git a/pkg/cortex/cortex_test.go b/pkg/cortex/cortex_test.go index 6cac224319d..fce486be3b6 100644 --- a/pkg/cortex/cortex_test.go +++ b/pkg/cortex/cortex_test.go @@ -86,6 +86,7 @@ func TestCortex(t *testing.T) { IndexCache: tsdb.IndexCacheConfig{ Backend: tsdb.IndexCacheBackendInMemory, }, + BucketStoreType: string(tsdb.TSDBBucketStore), }, UsersScanner: tsdb.UsersScannerConfig{ Strategy: tsdb.UserScanStrategyList, diff --git a/pkg/storage/tsdb/config.go b/pkg/storage/tsdb/config.go index b51ad077bd4..2e2ba515b3a 100644 --- a/pkg/storage/tsdb/config.go +++ b/pkg/storage/tsdb/config.go @@ -61,6 +61,7 @@ var ( ErrBlockDiscoveryStrategy = errors.New("invalid block discovery strategy") ErrInvalidTokenBucketBytesLimiterMode = errors.New("invalid token bucket bytes limiter mode") ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio = errors.New("lazy expanded posting group max key series ratio needs to be equal or greater than 0") + ErrInvalidBucketStoreType = errors.New("invalid bucket store type") ) // BlocksStorageConfig holds the config information for the blocks storage. @@ -292,6 +293,7 @@ type BucketStoreConfig struct { IgnoreBlocksBefore time.Duration `yaml:"ignore_blocks_before"` BucketIndex BucketIndexConfig `yaml:"bucket_index"` BlockDiscoveryStrategy string `yaml:"block_discovery_strategy"` + BucketStoreType string `yaml:"bucket_store_type"` // Chunk pool. MaxChunkPoolBytes uint64 `yaml:"max_chunk_pool_bytes"` @@ -378,6 +380,7 @@ func (cfg *BucketStoreConfig) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.LazyExpandedPostingGroupMaxKeySeriesRatio, "blocks-storage.bucket-store.lazy-expanded-posting-group-max-key-series-ratio", 100, "Mark posting group as lazy if it fetches more keys than R * max series the query should fetch. With R set to 100, a posting group which fetches 100K keys will be marked as lazy if the current query only fetches 1000 series. This config is only valid if lazy expanded posting is enabled. 0 disables the limit.") f.IntVar(&cfg.SeriesBatchSize, "blocks-storage.bucket-store.series-batch-size", store.SeriesBatchSize, "Controls how many series to fetch per batch in Store Gateway. Default value is 10000.") f.StringVar(&cfg.BlockDiscoveryStrategy, "blocks-storage.bucket-store.block-discovery-strategy", string(ConcurrentDiscovery), "One of "+strings.Join(supportedBlockDiscoveryStrategies, ", ")+". When set to concurrent, stores will concurrently issue one call per directory to discover active blocks in the bucket. The recursive strategy iterates through all objects in the bucket, recursively traversing into each directory. This avoids N+1 calls at the expense of having slower bucket iterations. bucket_index strategy can be used in Compactor only and utilizes the existing bucket index to fetch block IDs to sync. This avoids iterating the bucket but can be impacted by delays of cleaner creating bucket index.") + f.StringVar(&cfg.BucketStoreType, "blocks-storage.bucket-store.bucket-store-type", "tsdb", "Type of bucket store to use (tsdb or parquet).") f.StringVar(&cfg.TokenBucketBytesLimiter.Mode, "blocks-storage.bucket-store.token-bucket-bytes-limiter.mode", string(TokenBucketBytesLimiterDisabled), fmt.Sprintf("Token bucket bytes limiter mode. Supported values are: %s", strings.Join(supportedTokenBucketBytesLimiterModes, ", "))) f.Int64Var(&cfg.TokenBucketBytesLimiter.InstanceTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.instance-token-bucket-size", int64(820*units.Mebibyte), "Instance token bucket size") f.Int64Var(&cfg.TokenBucketBytesLimiter.UserTokenBucketSize, "blocks-storage.bucket-store.token-bucket-bytes-limiter.user-token-bucket-size", int64(615*units.Mebibyte), "User token bucket size") @@ -415,6 +418,9 @@ func (cfg *BucketStoreConfig) Validate() error { if !slices.Contains(supportedTokenBucketBytesLimiterModes, cfg.TokenBucketBytesLimiter.Mode) { return ErrInvalidTokenBucketBytesLimiterMode } + if !slices.Contains(supportedBucketStoreTypes, cfg.BucketStoreType) { + return ErrInvalidBucketStoreType + } if cfg.LazyExpandedPostingGroupMaxKeySeriesRatio < 0 { return ErrInvalidLazyExpandedPostingGroupMaxKeySeriesRatio } @@ -450,6 +456,19 @@ var supportedBlockDiscoveryStrategies = []string{ string(BucketIndexDiscovery), } +// BucketStoreType represents the type of bucket store +type BucketStoreType string + +const ( + TSDBBucketStore BucketStoreType = "tsdb" + ParquetBucketStore BucketStoreType = "parquet" +) + +var supportedBucketStoreTypes = []string{ + string(TSDBBucketStore), + string(ParquetBucketStore), +} + type TokenBucketBytesLimiterMode string const ( diff --git a/pkg/storegateway/bucket_store_metrics.go b/pkg/storegateway/bucket_store_metrics.go index 4938e73b411..0744a0ec7ae 100644 --- a/pkg/storegateway/bucket_store_metrics.go +++ b/pkg/storegateway/bucket_store_metrics.go @@ -2,10 +2,40 @@ package storegateway import ( "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" "github.com/cortexproject/cortex/pkg/util" ) +type CortexBucketStoreMetrics struct { + syncTimes prometheus.Histogram + syncLastSuccess prometheus.Gauge + tenantsDiscovered prometheus.Gauge + tenantsSynced prometheus.Gauge +} + +func NewCortexBucketStoreMetrics(reg prometheus.Registerer) *CortexBucketStoreMetrics { + return &CortexBucketStoreMetrics{ + syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ + Name: "cortex_bucket_stores_blocks_sync_seconds", + Help: "The total time it takes to perform a sync stores", + Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900}, + }), + syncLastSuccess: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds", + Help: "Unix timestamp of the last successful blocks sync.", + }), + tenantsDiscovered: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_discovered", + Help: "Number of tenants discovered in the bucket.", + }), + tenantsSynced: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ + Name: "cortex_bucket_stores_tenants_synced", + Help: "Number of tenants synced.", + }), + } +} + // BucketStoreMetrics aggregates metrics exported by Thanos Bucket Store // and re-exports those aggregates as Cortex metrics. type BucketStoreMetrics struct { @@ -60,7 +90,7 @@ type BucketStoreMetrics struct { } func NewBucketStoreMetrics() *BucketStoreMetrics { - return &BucketStoreMetrics{ + m := &BucketStoreMetrics{ regs: util.NewUserRegistries(), blockLoads: prometheus.NewDesc( @@ -233,6 +263,8 @@ func NewBucketStoreMetrics() *BucketStoreMetrics { "Total number of series size in bytes overfetched due to posting lazy expansion.", nil, nil), } + + return m } func (m *BucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { diff --git a/pkg/storegateway/bucket_store_metrics_test.go b/pkg/storegateway/bucket_store_metrics_test.go index 2b087b89b36..dc661cbe291 100644 --- a/pkg/storegateway/bucket_store_metrics_test.go +++ b/pkg/storegateway/bucket_store_metrics_test.go @@ -11,6 +11,45 @@ import ( "github.com/stretchr/testify/require" ) +func TestCortexBucketStoreMetrics(t *testing.T) { + t.Parallel() + reg := prometheus.NewPedanticRegistry() + + metrics := NewCortexBucketStoreMetrics(reg) + metrics.syncTimes.Observe(0.1) + metrics.syncLastSuccess.Set(1759923308) + metrics.tenantsSynced.Set(1) + metrics.tenantsDiscovered.Set(1) + + err := testutil.GatherAndCompare(reg, bytes.NewBufferString(` + # HELP cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds Unix timestamp of the last successful blocks sync. + # TYPE cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds gauge + cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds 1.759923308e+09 + # HELP cortex_bucket_stores_blocks_sync_seconds The total time it takes to perform a sync stores + # TYPE cortex_bucket_stores_blocks_sync_seconds histogram + cortex_bucket_stores_blocks_sync_seconds_bucket{le="0.1"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="1"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="10"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="30"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="60"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="120"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="300"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="600"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="900"} 1 + cortex_bucket_stores_blocks_sync_seconds_bucket{le="+Inf"} 1 + cortex_bucket_stores_blocks_sync_seconds_sum 0.1 + cortex_bucket_stores_blocks_sync_seconds_count 1 + # HELP cortex_bucket_stores_tenants_discovered Number of tenants discovered in the bucket. + # TYPE cortex_bucket_stores_tenants_discovered gauge + cortex_bucket_stores_tenants_discovered 1 + # HELP cortex_bucket_stores_tenants_synced Number of tenants synced. + # TYPE cortex_bucket_stores_tenants_synced gauge + cortex_bucket_stores_tenants_synced 1 +`, + )) + require.NoError(t, err) +} + func TestBucketStoreMetrics(t *testing.T) { t.Parallel() mainReg := prometheus.NewPedanticRegistry() diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index b9da057ae23..d59332238da 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -44,16 +44,24 @@ import ( "github.com/cortexproject/cortex/pkg/util/validation" ) -// BucketStores is a multi-tenant wrapper of Thanos BucketStore. -type BucketStores struct { - logger log.Logger - cfg tsdb.BlocksStorageConfig - limits *validation.Overrides - bucket objstore.Bucket - logLevel logging.Level - bucketStoreMetrics *BucketStoreMetrics - metaFetcherMetrics *MetadataFetcherMetrics - shardingStrategy ShardingStrategy +// BucketStores defines the methods that any bucket stores implementation must provide +type BucketStores interface { + storepb.StoreServer + SyncBlocks(ctx context.Context) error + InitialSync(ctx context.Context) error +} + +// ThanosBucketStores is a multi-tenant wrapper of Thanos BucketStore. +type ThanosBucketStores struct { + logger log.Logger + cfg tsdb.BlocksStorageConfig + limits *validation.Overrides + bucket objstore.Bucket + logLevel logging.Level + bucketStoreMetrics *BucketStoreMetrics + cortexBucketStoreMetrics *CortexBucketStoreMetrics + metaFetcherMetrics *MetadataFetcherMetrics + shardingStrategy ShardingStrategy // Index cache shared across all tenants. indexCache storecache.IndexCache @@ -74,7 +82,7 @@ type BucketStores struct { storesMu sync.RWMutex stores map[string]*store.BucketStore - // Keeps the last sync error for the bucket store for each tenant. + // Keeps the last sync error for the bucket store for each tenant. storesErrorsMu sync.RWMutex storesErrors map[string]error @@ -86,20 +94,25 @@ type BucketStores struct { userScanner users.Scanner // Keeps number of inflight requests - inflightRequestCnt int - inflightRequestMu sync.RWMutex - - // Metrics. - syncTimes prometheus.Histogram - syncLastSuccess prometheus.Gauge - tenantsDiscovered prometheus.Gauge - tenantsSynced prometheus.Gauge + inflightRequests *util.InflightRequestTracker } var ErrTooManyInflightRequests = status.Error(codes.ResourceExhausted, "too many inflight requests in store gateway") // NewBucketStores makes a new BucketStores. -func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*BucketStores, error) { +func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (BucketStores, error) { + switch cfg.BucketStore.BucketStoreType { + case string(tsdb.ParquetBucketStore): + return newParquetBucketStores(cfg, shardingStrategy, bucketClient, limits, logger, reg) + case string(tsdb.TSDBBucketStore): + return newThanosBucketStores(cfg, shardingStrategy, bucketClient, limits, logLevel, logger, reg) + default: + return nil, fmt.Errorf("unsupported bucket store type: %s", cfg.BucketStore.BucketStoreType) + } +} + +// newThanosBucketStores creates a new TSDB-based bucket stores +func newThanosBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logLevel logging.Level, logger log.Logger, reg prometheus.Registerer) (*ThanosBucketStores, error) { matchers := tsdb.NewMatchers() cachingBucket, err := tsdb.CreateCachingBucket(cfg.BucketStore.ChunksCache, cfg.BucketStore.MetadataCache, tsdb.ParquetLabelsCacheConfig{}, matchers, bucketClient, logger, reg) if err != nil { @@ -114,37 +127,22 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra Help: "Number of maximum concurrent queries allowed.", }).Set(float64(cfg.BucketStore.MaxConcurrent)) - u := &BucketStores{ - logger: logger, - cfg: cfg, - limits: limits, - bucket: cachingBucket, - shardingStrategy: shardingStrategy, - stores: map[string]*store.BucketStore{}, - storesErrors: map[string]error{}, - logLevel: logLevel, - bucketStoreMetrics: NewBucketStoreMetrics(), - metaFetcherMetrics: NewMetadataFetcherMetrics(), - queryGate: queryGate, - partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), - userTokenBuckets: make(map[string]*util.TokenBucket), - syncTimes: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{ - Name: "cortex_bucket_stores_blocks_sync_seconds", - Help: "The total time it takes to perform a sync stores", - Buckets: []float64{0.1, 1, 10, 30, 60, 120, 300, 600, 900}, - }), - syncLastSuccess: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_bucket_stores_blocks_last_successful_sync_timestamp_seconds", - Help: "Unix timestamp of the last successful blocks sync.", - }), - tenantsDiscovered: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_bucket_stores_tenants_discovered", - Help: "Number of tenants discovered in the bucket.", - }), - tenantsSynced: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "cortex_bucket_stores_tenants_synced", - Help: "Number of tenants synced.", - }), + u := &ThanosBucketStores{ + logger: logger, + cfg: cfg, + limits: limits, + bucket: cachingBucket, + shardingStrategy: shardingStrategy, + stores: map[string]*store.BucketStore{}, + storesErrors: map[string]error{}, + logLevel: logLevel, + bucketStoreMetrics: NewBucketStoreMetrics(), + cortexBucketStoreMetrics: NewCortexBucketStoreMetrics(reg), + metaFetcherMetrics: NewMetadataFetcherMetrics(), + queryGate: queryGate, + partitioner: newGapBasedPartitioner(cfg.BucketStore.PartitionerMaxGapBytes, reg), + userTokenBuckets: make(map[string]*util.TokenBucket), + inflightRequests: util.NewInflightRequestTracker(), } u.userScanner, err = users.NewScanner(cfg.UsersScanner, bucketClient, logger, reg) if err != nil { @@ -187,7 +185,7 @@ func NewBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStra } // InitialSync does an initial synchronization of blocks for all users. -func (u *BucketStores) InitialSync(ctx context.Context) error { +func (u *ThanosBucketStores) InitialSync(ctx context.Context) error { level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error { @@ -202,13 +200,13 @@ func (u *BucketStores) InitialSync(ctx context.Context) error { } // SyncBlocks synchronizes the stores state with the Bucket store for every user. -func (u *BucketStores) SyncBlocks(ctx context.Context) error { +func (u *ThanosBucketStores) SyncBlocks(ctx context.Context) error { return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, s *store.BucketStore) error { return s.SyncBlocks(ctx) }) } -func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { +func (u *ThanosBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *store.BucketStore) error) error { retries := backoff.New(ctx, backoff.Config{ MinBackoff: 1 * time.Second, MaxBackoff: 10 * time.Second, @@ -232,11 +230,11 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co return lastErr } -func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) { +func (u *ThanosBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *store.BucketStore) error) (returnErr error) { defer func(start time.Time) { - u.syncTimes.Observe(time.Since(start).Seconds()) + u.cortexBucketStoreMetrics.syncTimes.Observe(time.Since(start).Seconds()) if returnErr == nil { - u.syncLastSuccess.SetToCurrentTime() + u.cortexBucketStoreMetrics.syncLastSuccess.SetToCurrentTime() } }(time.Now()) @@ -261,8 +259,8 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte includeUserIDs[userID] = struct{}{} } - u.tenantsDiscovered.Set(float64(len(userIDs))) - u.tenantsSynced.Set(float64(len(includeUserIDs))) + u.cortexBucketStoreMetrics.tenantsDiscovered.Set(float64(len(userIDs))) + u.cortexBucketStoreMetrics.tenantsSynced.Set(float64(len(includeUserIDs))) // Create a pool of workers which will synchronize blocks. The pool size // is limited in order to avoid to concurrently sync a lot of tenants in @@ -330,7 +328,7 @@ func (u *BucketStores) syncUsersBlocks(ctx context.Context, f func(context.Conte } // Series makes a series request to the underlying user bucket store. -func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { +func (u *ThanosBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { spanLog, spanCtx := spanlogger.New(srv.Context(), "BucketStores.Series") defer spanLog.Finish() @@ -356,12 +354,12 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests if maxInflightRequests > 0 { - if u.getInflightRequestCnt() >= maxInflightRequests { + if u.inflightRequests.Count() >= maxInflightRequests { return ErrTooManyInflightRequests } - u.incrementInflightRequestCnt() - defer u.decrementInflightRequestCnt() + u.inflightRequests.Inc() + defer u.inflightRequests.Dec() } err = store.Series(req, spanSeriesServer{ @@ -372,26 +370,8 @@ func (u *BucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_Seri return err } -func (u *BucketStores) getInflightRequestCnt() int { - u.inflightRequestMu.RLock() - defer u.inflightRequestMu.RUnlock() - return u.inflightRequestCnt -} - -func (u *BucketStores) incrementInflightRequestCnt() { - u.inflightRequestMu.Lock() - u.inflightRequestCnt++ - u.inflightRequestMu.Unlock() -} - -func (u *BucketStores) decrementInflightRequestCnt() { - u.inflightRequestMu.Lock() - u.inflightRequestCnt-- - u.inflightRequestMu.Unlock() -} - // LabelNames implements the Storegateway proto service. -func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { +func (u *ThanosBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelNames") defer spanLog.Finish() @@ -421,7 +401,7 @@ func (u *BucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRe } // LabelValues implements the Storegateway proto service. -func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { +func (u *ThanosBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { spanLog, spanCtx := spanlogger.New(ctx, "BucketStores.LabelValues") defer spanLog.Finish() @@ -450,7 +430,7 @@ func (u *BucketStores) LabelValues(ctx context.Context, req *storepb.LabelValues // scanUsers in the bucket and return the list of found users. It includes active and deleting users // but not deleted users. -func (u *BucketStores) scanUsers(ctx context.Context) ([]string, error) { +func (u *ThanosBucketStores) scanUsers(ctx context.Context) ([]string, error) { activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx) if err != nil { return nil, err @@ -477,13 +457,13 @@ func deduplicateUsers(users []string) []string { return uniqueUsers } -func (u *BucketStores) getStore(userID string) *store.BucketStore { +func (u *ThanosBucketStores) getStore(userID string) *store.BucketStore { u.storesMu.RLock() defer u.storesMu.RUnlock() return u.stores[userID] } -func (u *BucketStores) getStoreError(userID string) error { +func (u *ThanosBucketStores) getStoreError(userID string) error { u.storesErrorsMu.RLock() defer u.storesErrorsMu.RUnlock() return u.storesErrors[userID] @@ -499,7 +479,7 @@ var ( // If bucket store doesn't exist, returns errBucketStoreNotFound. // If bucket store is not empty, returns errBucketStoreNotEmpty. // Otherwise returns error from closing the bucket store. -func (u *BucketStores) closeEmptyBucketStore(userID string) error { +func (u *ThanosBucketStores) closeEmptyBucketStore(userID string) error { u.storesMu.Lock() unlockInDefer := true defer func() { @@ -537,11 +517,11 @@ func isEmptyBucketStore(bs *store.BucketStore) bool { return min == math.MaxInt64 && max == math.MinInt64 } -func (u *BucketStores) syncDirForUser(userID string) string { +func (u *ThanosBucketStores) syncDirForUser(userID string) string { return filepath.Join(u.cfg.BucketStore.SyncDir, userID) } -func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { +func (u *ThanosBucketStores) getOrCreateStore(userID string) (*store.BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID) if bs != nil { @@ -721,7 +701,7 @@ func (u *BucketStores) getOrCreateStore(userID string) (*store.BucketStore, erro // deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current // shard. -func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) { +func (u *ThanosBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) { files, err := os.ReadDir(u.cfg.BucketStore.SyncDir) if err != nil { return @@ -760,13 +740,13 @@ func (u *BucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[str } } -func (u *BucketStores) getUserTokenBucket(userID string) *util.TokenBucket { +func (u *ThanosBucketStores) getUserTokenBucket(userID string) *util.TokenBucket { u.userTokenBucketsMu.RLock() defer u.userTokenBucketsMu.RUnlock() return u.userTokenBuckets[userID] } -func (u *BucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { +func (u *ThanosBucketStores) getTokensToRetrieve(tokens uint64, dataType store.StoreDataType) int64 { tokensToRetrieve := float64(tokens) switch dataType { case store.PostingsFetched: diff --git a/pkg/storegateway/bucket_stores_test.go b/pkg/storegateway/bucket_stores_test.go index 831b7afb2b4..aa1b7bfffe9 100644 --- a/pkg/storegateway/bucket_stores_test.go +++ b/pkg/storegateway/bucket_stores_test.go @@ -133,17 +133,19 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Should set the error on user-1 require.NoError(t, stores.InitialSync(ctx)) if tc.mockInitialSync { - s, ok := status.FromError(stores.storesErrors["user-1"]) + thanosStores := stores.(*ThanosBucketStores) + s, ok := status.FromError(thanosStores.storesErrors["user-1"]) require.True(t, ok) require.Equal(t, s.Code(), codes.PermissionDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) } require.NoError(t, stores.SyncBlocks(context.Background())) if tc.mockInitialSync { - s, ok := status.FromError(stores.storesErrors["user-1"]) + thanosStores := stores.(*ThanosBucketStores) + s, ok := status.FromError(thanosStores.storesErrors["user-1"]) require.True(t, ok) require.Equal(t, s.Code(), codes.PermissionDenied) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) } mBucket.GetFailures = tc.GetFailures @@ -169,8 +171,9 @@ func TestBucketStores_CustomerKeyError(t *testing.T) { // Cleaning the error mBucket.GetFailures = map[string]error{} require.NoError(t, stores.SyncBlocks(context.Background())) - require.ErrorIs(t, stores.storesErrors["user-1"], nil) - require.ErrorIs(t, stores.storesErrors["user-2"], nil) + thanosStores := stores.(*ThanosBucketStores) + require.ErrorIs(t, thanosStores.storesErrors["user-1"], nil) + require.ErrorIs(t, thanosStores.storesErrors["user-2"], nil) _, _, err = querySeries(stores, "user-1", "series", 0, 100) require.NoError(t, err) _, _, err = querySeries(stores, "user-2", "series", 0, 100) @@ -260,7 +263,8 @@ func TestBucketStores_InitialSync(t *testing.T) { "cortex_bucket_stores_gate_queries_in_flight", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.cortexBucketStoreMetrics.syncLastSuccess), float64(0)) } func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { @@ -320,7 +324,8 @@ func TestBucketStores_InitialSyncShouldRetryOnFailure(t *testing.T) { "cortex_bucket_store_blocks_loaded", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.cortexBucketStoreMetrics.syncLastSuccess), float64(0)) } func TestBucketStores_SyncBlocks(t *testing.T) { @@ -390,7 +395,8 @@ func TestBucketStores_SyncBlocks(t *testing.T) { "cortex_bucket_stores_gate_queries_in_flight", )) - assert.Greater(t, testutil.ToFloat64(stores.syncLastSuccess), float64(0)) + thanosStores := stores.(*ThanosBucketStores) + assert.Greater(t, testutil.ToFloat64(thanosStores.cortexBucketStoreMetrics.syncLastSuccess), float64(0)) } func TestBucketStores_syncUsersBlocks(t *testing.T) { @@ -443,7 +449,8 @@ func TestBucketStores_syncUsersBlocks(t *testing.T) { // Sync user stores and count the number of times the callback is called. var storesCount atomic.Int32 - err = stores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { + thanosStores := stores.(*ThanosBucketStores) + err = thanosStores.syncUsersBlocks(context.Background(), func(ctx context.Context, bs *store.BucketStore) error { storesCount.Inc() return nil }) @@ -474,7 +481,7 @@ func TestBucketStores_scanUsers(t *testing.T) { t.Run(testName, func(t *testing.T) { t.Parallel() - stores := &BucketStores{ + stores := &ThanosBucketStores{ userScanner: testData.scanner, } @@ -574,9 +581,11 @@ func TestBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *t require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) - stores.inflightRequestMu.Lock() - stores.inflightRequestCnt = 10 - stores.inflightRequestMu.Unlock() + thanosStores := stores.(*ThanosBucketStores) + // Set inflight requests to the limit + for range 10 { + thanosStores.inflightRequests.Inc() + } series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) assert.ErrorIs(t, err, ErrTooManyInflightRequests) assert.Empty(t, series) @@ -595,9 +604,11 @@ func TestBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabl require.NoError(t, err) require.NoError(t, stores.InitialSync(context.Background())) - stores.inflightRequestMu.Lock() - stores.inflightRequestCnt = 10 // max_inflight_request is set to 0 by default = disabled - stores.inflightRequestMu.Unlock() + thanosStores := stores.(*ThanosBucketStores) + // Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled) + for range 10 { + thanosStores.inflightRequests.Inc() + } series, _, err := querySeries(stores, "user_id", "series_1", 0, 100) require.NoError(t, err) assert.Equal(t, 1, len(series)) @@ -715,7 +726,7 @@ func generateStorageBlock(t *testing.T, storageDir, userID string, metricName st require.NoError(t, db.Snapshot(userDir, true)) } -func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) { +func querySeries(stores BucketStores, userID, metricName string, minT, maxT int64) ([]*storepb.Series, annotations.Annotations, error) { req := &storepb.SeriesRequest{ MinTime: minT, MaxTime: maxT, @@ -734,7 +745,7 @@ func querySeries(stores *BucketStores, userID, metricName string, minT, maxT int return srv.SeriesSet, srv.Warnings, err } -func queryLabelsNames(stores *BucketStores, userID, metricName string, start, end int64) (*storepb.LabelNamesResponse, error) { +func queryLabelsNames(stores BucketStores, userID, metricName string, start, end int64) (*storepb.LabelNamesResponse, error) { req := &storepb.LabelNamesRequest{ Start: start, End: end, @@ -750,7 +761,7 @@ func queryLabelsNames(stores *BucketStores, userID, metricName string, start, en return stores.LabelNames(ctx, req) } -func queryLabelsValues(stores *BucketStores, userID, labelName, metricName string, start, end int64) (*storepb.LabelValuesResponse, error) { +func queryLabelsValues(stores BucketStores, userID, labelName, metricName string, start, end int64) (*storepb.LabelValuesResponse, error) { req := &storepb.LabelValuesRequest{ Start: start, End: end, @@ -910,32 +921,34 @@ func TestBucketStores_tokenBuckets(t *testing.T) { reg := prometheus.NewPedanticRegistry() stores, err := NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.NotNil(t, stores.instanceTokenBucket) + thanosStores := stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.instanceTokenBucket) assert.NoError(t, stores.InitialSync(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.NotNil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-2")) sharding.users = []string{user1} assert.NoError(t, stores.SyncBlocks(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) sharding.users = []string{} assert.NoError(t, stores.SyncBlocks(ctx)) - assert.Nil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDryRun) sharding.users = []string{user1, user2} reg = prometheus.NewPedanticRegistry() stores, err = NewBucketStores(cfg, &sharding, objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.NotNil(t, stores.instanceTokenBucket) + thanosStores = stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.instanceTokenBucket) assert.NoError(t, stores.InitialSync(ctx)) - assert.NotNil(t, stores.getUserTokenBucket("user-1")) - assert.NotNil(t, stores.getUserTokenBucket("user-2")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-1")) + assert.NotNil(t, thanosStores.getUserTokenBucket("user-2")) cfg.BucketStore.TokenBucketBytesLimiter.Mode = string(cortex_tsdb.TokenBucketBytesLimiterDisabled) sharding.users = []string{user1, user2} @@ -944,9 +957,10 @@ func TestBucketStores_tokenBuckets(t *testing.T) { assert.NoError(t, err) assert.NoError(t, stores.InitialSync(ctx)) - assert.Nil(t, stores.instanceTokenBucket) - assert.Nil(t, stores.getUserTokenBucket("user-1")) - assert.Nil(t, stores.getUserTokenBucket("user-2")) + thanosStores = stores.(*ThanosBucketStores) + assert.Nil(t, thanosStores.instanceTokenBucket) + assert.Nil(t, thanosStores.getUserTokenBucket("user-1")) + assert.Nil(t, thanosStores.getUserTokenBucket("user-2")) } func TestBucketStores_getTokensToRetrieve(t *testing.T) { @@ -966,12 +980,13 @@ func TestBucketStores_getTokensToRetrieve(t *testing.T) { stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) assert.NoError(t, err) - assert.Equal(t, int64(2), stores.getTokensToRetrieve(2, store.PostingsFetched)) - assert.Equal(t, int64(4), stores.getTokensToRetrieve(2, store.PostingsTouched)) - assert.Equal(t, int64(6), stores.getTokensToRetrieve(2, store.SeriesFetched)) - assert.Equal(t, int64(8), stores.getTokensToRetrieve(2, store.SeriesTouched)) - assert.Equal(t, int64(0), stores.getTokensToRetrieve(2, store.ChunksFetched)) - assert.Equal(t, int64(1), stores.getTokensToRetrieve(2, store.ChunksTouched)) + thanosStores := stores.(*ThanosBucketStores) + assert.Equal(t, int64(2), thanosStores.getTokensToRetrieve(2, store.PostingsFetched)) + assert.Equal(t, int64(4), thanosStores.getTokensToRetrieve(2, store.PostingsTouched)) + assert.Equal(t, int64(6), thanosStores.getTokensToRetrieve(2, store.SeriesFetched)) + assert.Equal(t, int64(8), thanosStores.getTokensToRetrieve(2, store.SeriesTouched)) + assert.Equal(t, int64(0), thanosStores.getTokensToRetrieve(2, store.ChunksFetched)) + assert.Equal(t, int64(1), thanosStores.getTokensToRetrieve(2, store.ChunksTouched)) } func getUsersInDir(t *testing.T, dir string) []string { diff --git a/pkg/storegateway/gateway.go b/pkg/storegateway/gateway.go index 96eb8c31cca..54c6dad09bf 100644 --- a/pkg/storegateway/gateway.go +++ b/pkg/storegateway/gateway.go @@ -118,7 +118,7 @@ type StoreGateway struct { gatewayCfg Config storageCfg cortex_tsdb.BlocksStorageConfig logger log.Logger - stores *BucketStores + stores BucketStores // Ring used for sharding blocks. ringLifecycler *ring.BasicLifecycler diff --git a/pkg/storegateway/gateway_ring.go b/pkg/storegateway/gateway_ring.go index 798d1221a2c..3c9ea79f55b 100644 --- a/pkg/storegateway/gateway_ring.go +++ b/pkg/storegateway/gateway_ring.go @@ -111,8 +111,8 @@ func (cfg *RingConfig) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.DetailedMetricsEnabled, ringFlagsPrefix+"detailed-metrics-enabled", true, "Set to true to enable ring detailed metrics. These metrics provide detailed information, such as token count and ownership per tenant. Disabling them can significantly decrease the number of metrics emitted.") // Wait stability flags. - f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", time.Minute, "Minimum time to wait for ring stability at startup. 0 to disable.") - f.DurationVar(&cfg.WaitStabilityMaxDuration, ringFlagsPrefix+"wait-stability-max-duration", 5*time.Minute, "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.") + f.DurationVar(&cfg.WaitStabilityMinDuration, ringFlagsPrefix+"wait-stability-min-duration", 0, "Minimum time to wait for ring stability at startup. 0 to disable.") + f.DurationVar(&cfg.WaitStabilityMaxDuration, ringFlagsPrefix+"wait-stability-max-duration", 5*time.Second, "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.") f.DurationVar(&cfg.FinalSleep, ringFlagsPrefix+"final-sleep", 0*time.Second, "The sleep seconds when store-gateway is shutting down. Need to be close to or larger than KV Store information propagation delay") diff --git a/pkg/storegateway/gateway_test.go b/pkg/storegateway/gateway_test.go index 6dcc8cd9913..69f1e65b7d1 100644 --- a/pkg/storegateway/gateway_test.go +++ b/pkg/storegateway/gateway_test.go @@ -183,10 +183,11 @@ func TestStoreGateway_InitialSyncWithDefaultShardingEnabled(t *testing.T) { assert.Equal(t, RingNumTokens, len(g.ringLifecycler.GetTokens())) assert.Subset(t, g.ringLifecycler.GetTokens(), testData.initialTokens) - assert.NotNil(t, g.stores.getStore("user-1")) - assert.NotNil(t, g.stores.getStore("user-2")) - assert.Nil(t, g.stores.getStore("user-disabled")) - assert.Nil(t, g.stores.getStore("user-unknown")) + thanosStores := g.stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.getStore("user-1")) + assert.NotNil(t, thanosStores.getStore("user-2")) + assert.Nil(t, thanosStores.getStore("user-disabled")) + assert.Nil(t, thanosStores.getStore("user-unknown")) }) } } @@ -217,10 +218,11 @@ func TestStoreGateway_InitialSyncWithShardingDisabled(t *testing.T) { bucketClient.MockExists(path.Join("user-disabled", "markers", cortex_tsdb.TenantDeletionMarkFile), false, nil) require.NoError(t, services.StartAndAwaitRunning(ctx, g)) - assert.NotNil(t, g.stores.getStore("user-1")) - assert.NotNil(t, g.stores.getStore("user-2")) - assert.Nil(t, g.stores.getStore("user-disabled")) - assert.Nil(t, g.stores.getStore("user-unknown")) + thanosStores := g.stores.(*ThanosBucketStores) + assert.NotNil(t, thanosStores.getStore("user-1")) + assert.NotNil(t, thanosStores.getStore("user-2")) + assert.Nil(t, thanosStores.getStore("user-disabled")) + assert.Nil(t, thanosStores.getStore("user-unknown")) } func TestStoreGateway_InitialSyncFailure(t *testing.T) { diff --git a/pkg/storegateway/parquet_bucket_store.go b/pkg/storegateway/parquet_bucket_store.go new file mode 100644 index 00000000000..23269751705 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_store.go @@ -0,0 +1,268 @@ +package storegateway + +import ( + "context" + "fmt" + "strings" + + "github.com/go-kit/log" + "github.com/gogo/protobuf/types" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/hintspb" + "github.com/thanos-io/thanos/pkg/store/labelpb" + "github.com/thanos-io/thanos/pkg/store/storepb" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/util/validation" +) + +type parquetBucketStore struct { + logger log.Logger + bucket objstore.Bucket + limits *validation.Overrides + concurrency int + + chunksDecoder *schema.PrometheusParquetChunksDecoder + + matcherCache storecache.MatchersCache +} + +func (p *parquetBucketStore) Close() error { + return p.bucket.Close() +} + +func (p *parquetBucketStore) SyncBlocks(ctx context.Context) error { + // TODO: Implement it + return nil +} + +func (p *parquetBucketStore) InitialSync(ctx context.Context) error { + // TODO: Implement it + return nil +} + +func (p *parquetBucketStore) findParquetBlocks(ctx context.Context, blockMatchers []storepb.LabelMatcher) ([]*parquetBlock, error) { + if len(blockMatchers) != 1 || blockMatchers[0].Type != storepb.LabelMatcher_RE || blockMatchers[0].Name != block.BlockIDLabel { + return nil, status.Error(codes.InvalidArgument, "only one block matcher is supported") + } + + blockIDs := strings.Split(blockMatchers[0].Value, "|") + blocks := make([]*parquetBlock, 0, len(blockIDs)) + bucketOpener := parquet_storage.NewParquetBucketOpener(p.bucket) + noopQuota := search.NewQuota(search.NoopQuotaLimitFunc(ctx)) + for _, blockID := range blockIDs { + block, err := p.newParquetBlock(ctx, blockID, bucketOpener, bucketOpener, p.chunksDecoder, noopQuota, noopQuota, noopQuota) + if err != nil { + return nil, err + } + blocks = append(blocks, block) + } + + return blocks, nil +} + +// Series implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) (err error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return status.Error(codes.InvalidArgument, err.Error()) + } + + ctx := srv.Context() + resHints := &hintspb.SeriesResponseHints{} + var anyHints *types.Any + + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.SeriesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal series request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + ctx = injectShardInfoIntoContext(ctx, req.ShardInfo) + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return fmt.Errorf("failed to find parquet shards: %w", err) + } + + seriesSet := make([]prom_storage.ChunkSeriesSet, len(shards)) + errGroup, ctx := errgroup.WithContext(srv.Context()) + errGroup.SetLimit(p.concurrency) + + for i, shard := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: shard.name, + }) + errGroup.Go(func() error { + ss, err := shard.Query(ctx, req.MinTime, req.MaxTime, req.SkipChunks, matchers) + seriesSet[i] = ss + return err + }) + } + + if err = errGroup.Wait(); err != nil { + return err + } + + ss := convert.NewMergeChunkSeriesSet(seriesSet, labels.Compare, prom_storage.NewConcatenatingChunkSeriesMerger()) + for ss.Next() { + cs := ss.At() + cIter := cs.Iterator(nil) + chunks := make([]storepb.AggrChunk, 0) + for cIter.Next() { + chunk := cIter.At() + chunks = append(chunks, storepb.AggrChunk{ + MinTime: chunk.MinTime, + MaxTime: chunk.MaxTime, + Raw: &storepb.Chunk{ + Type: chunkToStoreEncoding(chunk.Chunk.Encoding()), + Data: chunk.Chunk.Bytes(), + }, + }) + } + + if err = srv.Send(storepb.NewSeriesResponse(&storepb.Series{ + Labels: labelpb.ZLabelsFromPromLabels(cs.Labels()), + Chunks: chunks, + })); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response").Error()) + return + } + } + + if anyHints, err = types.MarshalAny(resHints); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "marshal series response hints").Error()) + return + } + + if err = srv.Send(storepb.NewHintsSeriesResponse(anyHints)); err != nil { + err = status.Error(codes.Unknown, errors.Wrap(err, "send series response hints").Error()) + return + } + + return nil +} + +// LabelNames implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + resHints := &hintspb.LabelNamesResponseHints{} + + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.LabelNamesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal label names request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return nil, fmt.Errorf("failed to find parquet shards: %w", err) + } + + resNameSets := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.concurrency) + + for i, s := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + errGroup.Go(func() error { + r, err := s.LabelNames(ctx, req.Limit, matchers) + resNameSets[i] = r + return err + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + anyHints, err := types.MarshalAny(resHints) + if err != nil { + return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label names response hints").Error()) + } + result := util.MergeUnsortedSlices(int(req.Limit), resNameSets...) + + return &storepb.LabelNamesResponse{ + Names: result, + Hints: anyHints, + }, nil +} + +// LabelValues implements the store interface for a single parquet bucket store +func (p *parquetBucketStore) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + matchers, err := storecache.MatchersToPromMatchersCached(p.matcherCache, req.Matchers...) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + resHints := &hintspb.LabelValuesResponseHints{} + var blockMatchers []storepb.LabelMatcher + if req.Hints != nil { + reqHints := &hintspb.LabelValuesRequestHints{} + if err := types.UnmarshalAny(req.Hints, reqHints); err != nil { + return nil, status.Error(codes.InvalidArgument, errors.Wrap(err, "unmarshal label values request hints").Error()) + } + blockMatchers = reqHints.BlockMatchers + } + + // Find parquet shards for the time range + shards, err := p.findParquetBlocks(ctx, blockMatchers) + if err != nil { + return nil, fmt.Errorf("failed to find parquet shards: %w", err) + } + + resNameValues := make([][]string, len(shards)) + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(p.concurrency) + + for i, s := range shards { + resHints.QueriedBlocks = append(resHints.QueriedBlocks, hintspb.Block{ + Id: s.name, + }) + errGroup.Go(func() error { + r, err := s.LabelValues(ctx, req.Label, req.Limit, matchers) + resNameValues[i] = r + return err + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + anyHints, err := types.MarshalAny(resHints) + if err != nil { + return nil, status.Error(codes.Unknown, errors.Wrap(err, "marshal label values response hints").Error()) + } + result := util.MergeUnsortedSlices(int(req.Limit), resNameValues...) + + return &storepb.LabelValuesResponse{ + Values: result, + Hints: anyHints, + }, nil +} diff --git a/pkg/storegateway/parquet_bucket_store_metrics.go b/pkg/storegateway/parquet_bucket_store_metrics.go new file mode 100644 index 00000000000..33daf9a5a73 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_store_metrics.go @@ -0,0 +1,29 @@ +package storegateway + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/cortexproject/cortex/pkg/util" +) + +type ParquetBucketStoreMetrics struct { + regs *util.UserRegistries + + // TODO: Add some metrics +} + +func NewParquetBucketStoreMetrics() *ParquetBucketStoreMetrics { + m := &ParquetBucketStoreMetrics{ + regs: util.NewUserRegistries(), + } + + return m +} + +func (m *ParquetBucketStoreMetrics) AddUserRegistry(user string, reg *prometheus.Registry) { + m.regs.AddUserRegistry(user, reg) +} + +func (m *ParquetBucketStoreMetrics) RemoveUserRegistry(user string) { + m.regs.RemoveUserRegistry(user, false) +} diff --git a/pkg/storegateway/parquet_bucket_stores.go b/pkg/storegateway/parquet_bucket_stores.go new file mode 100644 index 00000000000..62ca3c11f43 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_stores.go @@ -0,0 +1,785 @@ +package storegateway + +import ( + "context" + "fmt" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/parquet-go/parquet-go" + "github.com/pkg/errors" + "github.com/prometheus-community/parquet-common/convert" + "github.com/prometheus-community/parquet-common/schema" + "github.com/prometheus-community/parquet-common/search" + parquet_storage "github.com/prometheus-community/parquet-common/storage" + "github.com/prometheus-community/parquet-common/util" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + prom_storage "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb/chunkenc" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/thanos-io/objstore" + storecache "github.com/thanos-io/thanos/pkg/store/cache" + "github.com/thanos-io/thanos/pkg/store/storepb" + "github.com/weaveworks/common/httpgrpc" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/querysharding" + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/storage/tsdb/users" + cortex_util "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/backoff" + cortex_errors "github.com/cortexproject/cortex/pkg/util/errors" + "github.com/cortexproject/cortex/pkg/util/spanlogger" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +// ParquetBucketStores is a multi-tenant wrapper for parquet bucket stores +type ParquetBucketStores struct { + logger log.Logger + cfg tsdb.BlocksStorageConfig + limits *validation.Overrides + bucket objstore.Bucket + + storesMu sync.RWMutex + stores map[string]*parquetBucketStore + + // Keeps the last sync error for the bucket store for each tenant. + storesErrorsMu sync.RWMutex + storesErrors map[string]error + + chunksDecoder *schema.PrometheusParquetChunksDecoder + + matcherCache storecache.MatchersCache + + inflightRequests *cortex_util.InflightRequestTracker + + parquetBucketStoreMetrics *ParquetBucketStoreMetrics + cortexBucketStoreMetrics *CortexBucketStoreMetrics + userScanner users.Scanner + shardingStrategy ShardingStrategy + + userTokenBucketsMu sync.RWMutex + userTokenBuckets map[string]*cortex_util.TokenBucket +} + +// newParquetBucketStores creates a new multi-tenant parquet bucket stores +func newParquetBucketStores(cfg tsdb.BlocksStorageConfig, shardingStrategy ShardingStrategy, bucketClient objstore.InstrumentedBucket, limits *validation.Overrides, logger log.Logger, reg prometheus.Registerer) (*ParquetBucketStores, error) { + // Create caching bucket client for parquet bucket stores + cachingBucket, err := createCachingBucketClientForParquet(cfg, bucketClient, "parquet-storegateway", logger, reg) + if err != nil { + return nil, err + } + + u := &ParquetBucketStores{ + logger: logger, + cfg: cfg, + limits: limits, + bucket: cachingBucket, + stores: map[string]*parquetBucketStore{}, + storesErrors: map[string]error{}, + chunksDecoder: schema.NewPrometheusParquetChunksDecoder(chunkenc.NewPool()), + inflightRequests: cortex_util.NewInflightRequestTracker(), + cortexBucketStoreMetrics: NewCortexBucketStoreMetrics(reg), + shardingStrategy: shardingStrategy, + userTokenBuckets: make(map[string]*cortex_util.TokenBucket), + parquetBucketStoreMetrics: NewParquetBucketStoreMetrics(), + } + u.userScanner, err = users.NewScanner(cfg.UsersScanner, bucketClient, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "failed to create users scanner") + } + + if cfg.BucketStore.MatchersCacheMaxItems > 0 { + r := prometheus.NewRegistry() + reg.MustRegister(tsdb.NewMatchCacheMetrics("cortex_storegateway", r, logger)) + u.matcherCache, err = storecache.NewMatchersCache(storecache.WithSize(cfg.BucketStore.MatchersCacheMaxItems), storecache.WithPromRegistry(r)) + if err != nil { + return nil, err + } + } else { + u.matcherCache = storecache.NoopMatchersCache + } + + return u, nil +} + +// Series implements BucketStores +func (u *ParquetBucketStores) Series(req *storepb.SeriesRequest, srv storepb.Store_SeriesServer) error { + spanLog, spanCtx := spanlogger.New(srv.Context(), "ParquetBucketStores.Series") + defer spanLog.Finish() + + userID := getUserIDFromGRPCContext(spanCtx) + if userID == "" { + return fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return status.Error(codes.Internal, err.Error()) + } + + maxInflightRequests := u.cfg.BucketStore.MaxInflightRequests + if maxInflightRequests > 0 { + if u.inflightRequests.Count() >= maxInflightRequests { + return ErrTooManyInflightRequests + } + + u.inflightRequests.Inc() + defer u.inflightRequests.Dec() + } + + return store.Series(req, spanSeriesServer{ + Store_SeriesServer: srv, + ctx: spanCtx, + }) +} + +// LabelNames implements BucketStores +func (u *ParquetBucketStores) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) { + userID := getUserIDFromGRPCContext(ctx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return nil, err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return store.LabelNames(ctx, req) +} + +// LabelValues implements BucketStores +func (u *ParquetBucketStores) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) { + userID := getUserIDFromGRPCContext(ctx) + if userID == "" { + return nil, fmt.Errorf("no userID") + } + + err := u.getStoreError(userID) + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + if err != nil { + if cortex_errors.ErrorIs(err, userBkt.IsAccessDeniedErr) { + return nil, httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + } + + return nil, err + } + + store, err := u.getOrCreateStore(userID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + return store.LabelValues(ctx, req) +} + +// SyncBlocks implements BucketStores +func (u *ParquetBucketStores) SyncBlocks(ctx context.Context) error { + return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, p *parquetBucketStore) error { + return p.SyncBlocks(ctx) + }) +} + +// InitialSync implements BucketStores +func (u *ParquetBucketStores) InitialSync(ctx context.Context) error { + level.Info(u.logger).Log("msg", "synchronizing Parquet blocks for all users") + + if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, p *parquetBucketStore) error { + return p.InitialSync(ctx) + }); err != nil { + level.Warn(u.logger).Log("msg", "failed to synchronize Parquet blocks", "err", err) + return err + } + + level.Info(u.logger).Log("msg", "successfully synchronized Parquet blocks for all users") + return nil +} + +func (u *ParquetBucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *parquetBucketStore) error) error { + retries := backoff.New(ctx, backoff.Config{ + MinBackoff: 1 * time.Second, + MaxBackoff: 10 * time.Second, + MaxRetries: 3, + }) + + var lastErr error + for retries.Ongoing() { + lastErr = u.syncUsersBlocks(ctx, f) + if lastErr == nil { + return nil + } + + retries.Wait() + } + + if lastErr == nil { + return retries.Err() + } + + return lastErr +} + +func (u *ParquetBucketStores) syncUsersBlocks(ctx context.Context, f func(context.Context, *parquetBucketStore) error) (returnErr error) { + defer func(start time.Time) { + u.cortexBucketStoreMetrics.syncTimes.Observe(time.Since(start).Seconds()) + if returnErr == nil { + u.cortexBucketStoreMetrics.syncLastSuccess.SetToCurrentTime() + } + }(time.Now()) + + type job struct { + userID string + store *parquetBucketStore + } + + wg := &sync.WaitGroup{} + jobs := make(chan job) + errs := tsdb_errors.NewMulti() + errsMx := sync.Mutex{} + + // Scan users in the bucket. + userIDs, err := u.scanUsers(ctx) + if err != nil { + return err + } + + includeUserIDs := make(map[string]struct{}) + for _, userID := range u.shardingStrategy.FilterUsers(ctx, userIDs) { + includeUserIDs[userID] = struct{}{} + } + + u.cortexBucketStoreMetrics.tenantsDiscovered.Set(float64(len(userIDs))) + u.cortexBucketStoreMetrics.tenantsSynced.Set(float64(len(includeUserIDs))) + + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for job := range jobs { + if err := f(ctx, job.store); err != nil { + if errors.Is(err, bucket.ErrCustomerManagedKeyAccessDenied) { + u.storesErrorsMu.Lock() + u.storesErrors[job.userID] = httpgrpc.Errorf(int(codes.PermissionDenied), "store error: %s", err) + u.storesErrorsMu.Unlock() + } else { + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to synchronize Parquet blocks for user %s", job.userID)) + errsMx.Unlock() + } + } else { + u.storesErrorsMu.Lock() + delete(u.storesErrors, job.userID) + u.storesErrorsMu.Unlock() + } + } + }() + } + + // Lazily create a bucket store for each new user found + // and submit a sync job for each user. + for _, userID := range userIDs { + // If we don't have a store for the tenant yet, then we should skip it if it's not + // included in the store-gateway shard. If we already have it, we need to sync it + // anyway to make sure all its blocks are unloaded and metrics updated correctly + // (but bucket API calls are skipped thanks to the objstore client adapter). + if _, included := includeUserIDs[userID]; !included && u.getStore(userID) == nil { + continue + } + + bs, err := u.getOrCreateStore(userID) + if err != nil { + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + + continue + } + + select { + case jobs <- job{userID: userID, store: bs}: + // Nothing to do. Will loop to push more jobs. + case <-ctx.Done(): + return ctx.Err() + } + } + + // Wait until all workers completed. + close(jobs) + wg.Wait() + + u.deleteLocalFilesForExcludedTenants(includeUserIDs) + + return errs.Err() +} + +// deleteLocalFilesForExcludedTenants removes local "sync" directories for tenants that are not included in the current +// shard. +func (u *ParquetBucketStores) deleteLocalFilesForExcludedTenants(includeUserIDs map[string]struct{}) { + files, err := os.ReadDir(u.cfg.BucketStore.SyncDir) + if err != nil { + return + } + + for _, f := range files { + if !f.IsDir() { + continue + } + + userID := f.Name() + if _, included := includeUserIDs[userID]; included { + // Preserve directory for users owned by this shard. + continue + } + + err := u.closeEmptyBucketStore(userID) + switch { + case errors.Is(err, errBucketStoreNotEmpty): + continue + case errors.Is(err, errBucketStoreNotFound): + // This is OK, nothing was closed. + case err == nil: + level.Info(u.logger).Log("msg", "closed bucket store for user", "user", userID) + default: + level.Warn(u.logger).Log("msg", "failed to close bucket store for user", "user", userID, "err", err) + } + + userSyncDir := u.syncDirForUser(userID) + err = os.RemoveAll(userSyncDir) + if err == nil { + level.Info(u.logger).Log("msg", "deleted user sync directory", "dir", userSyncDir) + } else { + level.Warn(u.logger).Log("msg", "failed to delete user sync directory", "dir", userSyncDir, "err", err) + } + } +} + +// closeEmptyBucketStore closes bucket store for given user, if it is empty, +// and removes it from bucket stores map and metrics. +// If bucket store doesn't exist, returns errBucketStoreNotFound. +// Otherwise returns error from closing the bucket store. +func (u *ParquetBucketStores) closeEmptyBucketStore(userID string) error { + u.storesMu.Lock() + unlockInDefer := true + defer func() { + if unlockInDefer { + u.storesMu.Unlock() + } + }() + + bs := u.stores[userID] + if bs == nil { + return errBucketStoreNotFound + } + + delete(u.stores, userID) + unlockInDefer = false + u.storesMu.Unlock() + + if u.cfg.BucketStore.TokenBucketBytesLimiter.Mode != string(tsdb.TokenBucketBytesLimiterDisabled) { + u.userTokenBucketsMu.Lock() + delete(u.userTokenBuckets, userID) + u.userTokenBucketsMu.Unlock() + } + + u.parquetBucketStoreMetrics.RemoveUserRegistry(userID) + return bs.Close() +} + +func (u *ParquetBucketStores) syncDirForUser(userID string) string { + return filepath.Join(u.cfg.BucketStore.SyncDir, userID) +} + +// scanUsers in the bucket and return the list of found users. It includes active and deleting users +// but not deleted users. +func (u *ParquetBucketStores) scanUsers(ctx context.Context) ([]string, error) { + activeUsers, deletingUsers, _, err := u.userScanner.ScanUsers(ctx) + if err != nil { + return nil, err + } + users := make([]string, 0, len(activeUsers)+len(deletingUsers)) + users = append(users, activeUsers...) + users = append(users, deletingUsers...) + users = deduplicateUsers(users) + + return users, err +} + +func (u *ParquetBucketStores) getStore(userID string) *parquetBucketStore { + u.storesMu.RLock() + defer u.storesMu.RUnlock() + return u.stores[userID] +} + +func (u *ParquetBucketStores) getStoreError(userID string) error { + u.storesErrorsMu.RLock() + defer u.storesErrorsMu.RUnlock() + return u.storesErrors[userID] +} + +// getOrCreateStore gets or creates a parquet bucket store for the given user +func (u *ParquetBucketStores) getOrCreateStore(userID string) (*parquetBucketStore, error) { + u.storesMu.RLock() + store, exists := u.stores[userID] + u.storesMu.RUnlock() + + if exists { + return store, nil + } + + u.storesMu.Lock() + defer u.storesMu.Unlock() + + // Double-check after acquiring write lock + if store, exists = u.stores[userID]; exists { + return store, nil + } + + // Check if there was an error creating this store + if err, exists := u.storesErrors[userID]; exists { + return nil, err + } + + // Create new store + userLogger := log.With(u.logger, "user", userID) + store, err := u.createParquetBucketStore(userID, userLogger) + if err != nil { + u.storesErrors[userID] = err + return nil, err + } + + u.stores[userID] = store + reg := prometheus.NewRegistry() + u.parquetBucketStoreMetrics.AddUserRegistry(userID, reg) + return store, nil +} + +// createParquetBucketStore creates a new parquet bucket store for a user +func (u *ParquetBucketStores) createParquetBucketStore(userID string, userLogger log.Logger) (*parquetBucketStore, error) { + level.Info(userLogger).Log("msg", "creating parquet bucket store") + + // Create user-specific bucket client + userBucket := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + + store := &parquetBucketStore{ + logger: userLogger, + bucket: userBucket, + limits: u.limits, + concurrency: 4, // TODO: make this configurable + chunksDecoder: u.chunksDecoder, + matcherCache: u.matcherCache, + } + + return store, nil +} + +type parquetBlock struct { + name string + shard parquet_storage.ParquetShard + m *search.Materializer + concurrency int +} + +func (p *parquetBucketStore) newParquetBlock(ctx context.Context, name string, labelsFileOpener, chunksFileOpener parquet_storage.ParquetOpener, d *schema.PrometheusParquetChunksDecoder, rowCountQuota *search.Quota, chunkBytesQuota *search.Quota, dataBytesQuota *search.Quota) (*parquetBlock, error) { + shard, err := parquet_storage.NewParquetShardOpener( + context.WithoutCancel(ctx), + name, + labelsFileOpener, + chunksFileOpener, + 0, + parquet_storage.WithFileOptions( + parquet.SkipMagicBytes(true), + parquet.ReadBufferSize(100*1024), + parquet.SkipBloomFilters(true), + parquet.OptimisticRead(true), + ), + ) + if err != nil { + return nil, errors.Wrapf(err, "failed to open parquet shard. block: %v", name) + } + + s, err := shard.TSDBSchema() + if err != nil { + return nil, err + } + m, err := search.NewMaterializer(s, d, shard, p.concurrency, rowCountQuota, chunkBytesQuota, dataBytesQuota, search.NoopMaterializedSeriesFunc, materializedLabelsFilterCallback) + if err != nil { + return nil, err + } + + return &parquetBlock{ + shard: shard, + m: m, + concurrency: p.concurrency, + name: name, + }, nil +} + +type contextKey int + +var ( + shardInfoCtxKey contextKey = 1 +) + +func injectShardInfoIntoContext(ctx context.Context, si *storepb.ShardInfo) context.Context { + return context.WithValue(ctx, shardInfoCtxKey, si) +} + +func extractShardInfoFromContext(ctx context.Context) (*storepb.ShardInfo, bool) { + if si := ctx.Value(shardInfoCtxKey); si != nil { + return si.(*storepb.ShardInfo), true + } + + return nil, false +} + +func materializedLabelsFilterCallback(ctx context.Context, _ *prom_storage.SelectHints) (search.MaterializedLabelsFilter, bool) { + shardInfo, exists := extractShardInfoFromContext(ctx) + if !exists { + return nil, false + } + sm := shardInfo.Matcher(&querysharding.Buffers) + if !sm.IsSharded() { + return nil, false + } + return &shardMatcherLabelsFilter{shardMatcher: sm}, true +} + +type shardMatcherLabelsFilter struct { + shardMatcher *storepb.ShardMatcher +} + +func (f *shardMatcherLabelsFilter) Filter(lbls labels.Labels) bool { + return f.shardMatcher.MatchesLabels(lbls) +} + +func (f *shardMatcherLabelsFilter) Close() { + f.shardMatcher.Close() +} + +func (b *parquetBlock) Query(ctx context.Context, mint, maxt int64, skipChunks bool, matchers []*labels.Matcher) (prom_storage.ChunkSeriesSet, error) { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + rowGroupCount := len(b.shard.LabelsFile().RowGroups()) + results := make([][]prom_storage.ChunkSeries, rowGroupCount) + for i := range results { + results[i] = make([]prom_storage.ChunkSeries, 0, 1024/rowGroupCount) + } + + for rgi := range rowGroupCount { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + + if len(rr) == 0 { + return nil + } + + seriesSetIter, err := b.m.Materialize(ctx, nil, rgi, mint, maxt, skipChunks, rr) + if err != nil { + return err + } + defer func() { _ = seriesSetIter.Close() }() + for seriesSetIter.Next() { + results[rgi] = append(results[rgi], seriesSetIter.At()) + } + sort.Sort(byLabels(results[rgi])) + return seriesSetIter.Err() + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + totalResults := 0 + for _, res := range results { + totalResults += len(res) + } + + resultsFlattened := make([]prom_storage.ChunkSeries, 0, totalResults) + for _, res := range results { + resultsFlattened = append(resultsFlattened, res...) + } + sort.Sort(byLabels(resultsFlattened)) + + return convert.NewChunksSeriesSet(resultsFlattened), nil +} + +func (b *parquetBlock) LabelNames(ctx context.Context, limit int64, matchers []*labels.Matcher) ([]string, error) { + if len(matchers) == 0 { + return b.m.MaterializeAllLabelNames(), nil + } + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for rgi := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelNames(ctx, rgi, rr) + if err != nil { + return err + } + results[rgi] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +func (b *parquetBlock) LabelValues(ctx context.Context, name string, limit int64, matchers []*labels.Matcher) ([]string, error) { + if len(matchers) == 0 { + return b.allLabelValues(ctx, name, limit) + } + + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for rgi := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + cs, err := search.MatchersToConstraints(matchers...) + if err != nil { + return err + } + err = search.Initialize(b.shard.LabelsFile(), cs...) + if err != nil { + return err + } + rr, err := search.Filter(ctx, b.shard, rgi, cs...) + if err != nil { + return err + } + series, err := b.m.MaterializeLabelValues(ctx, name, rgi, rr) + if err != nil { + return err + } + results[rgi] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +func (b *parquetBlock) allLabelValues(ctx context.Context, name string, limit int64) ([]string, error) { + errGroup, ctx := errgroup.WithContext(ctx) + errGroup.SetLimit(b.concurrency) + + results := make([][]string, len(b.shard.LabelsFile().RowGroups())) + + for i := range b.shard.LabelsFile().RowGroups() { + errGroup.Go(func() error { + series, err := b.m.MaterializeAllLabelValues(ctx, name, i) + if err != nil { + return err + } + results[i] = series + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return util.MergeUnsortedSlices(int(limit), results...), nil +} + +type byLabels []prom_storage.ChunkSeries + +func (b byLabels) Len() int { return len(b) } +func (b byLabels) Swap(i, j int) { b[i], b[j] = b[j], b[i] } +func (b byLabels) Less(i, j int) bool { return labels.Compare(b[i].Labels(), b[j].Labels()) < 0 } + +func chunkToStoreEncoding(in chunkenc.Encoding) storepb.Chunk_Encoding { + switch in { + case chunkenc.EncXOR: + return storepb.Chunk_XOR + case chunkenc.EncHistogram: + return storepb.Chunk_HISTOGRAM + case chunkenc.EncFloatHistogram: + return storepb.Chunk_FLOAT_HISTOGRAM + default: + panic("unknown chunk encoding") + } +} + +// createCachingBucketClientForParquet creates a caching bucket client for parquet bucket stores +func createCachingBucketClientForParquet(storageCfg tsdb.BlocksStorageConfig, bucketClient objstore.InstrumentedBucket, name string, logger log.Logger, reg prometheus.Registerer) (objstore.Bucket, error) { + // Create caching bucket using the existing infrastructure + matchers := tsdb.NewMatchers() + cachingBucket, err := tsdb.CreateCachingBucket(storageCfg.BucketStore.ChunksCache, storageCfg.BucketStore.MetadataCache, storageCfg.BucketStore.ParquetLabelsCache, matchers, bucketClient, logger, reg) + if err != nil { + return nil, errors.Wrap(err, "create caching bucket for parquet") + } + return cachingBucket, nil +} diff --git a/pkg/storegateway/parquet_bucket_stores_test.go b/pkg/storegateway/parquet_bucket_stores_test.go new file mode 100644 index 00000000000..3786190bbf5 --- /dev/null +++ b/pkg/storegateway/parquet_bucket_stores_test.go @@ -0,0 +1,326 @@ +package storegateway + +import ( + "context" + "errors" + "testing" + + "github.com/go-kit/log" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunkenc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "github.com/thanos-io/thanos/pkg/store/storepb" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/cortexproject/cortex/pkg/storage/bucket" + "github.com/cortexproject/cortex/pkg/storage/bucket/filesystem" + cortex_tsdb "github.com/cortexproject/cortex/pkg/storage/tsdb" + "github.com/cortexproject/cortex/pkg/util/validation" +) + +func TestParquetBucketStores_Series_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: "test_metric", + }}, + } + + srv := newBucketStoreSeriesServer(context.Background()) + err := stores.Series(req, srv) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStores_Series_StoreCreationError(t *testing.T) { + // Create a mock bucket client + mockBucket := &bucket.ClientMock{} + + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + bucket: mockBucket, + stores: make(map[string]*parquetBucketStore), + storesErrors: make(map[string]error), + } + + // Simulate a store creation error + stores.storesErrors["user-1"] = errors.New("store creation failed") + + req := &storepb.SeriesRequest{ + MinTime: 0, + MaxTime: 100, + Matchers: []storepb.LabelMatcher{{ + Type: storepb.LabelMatcher_EQ, + Name: labels.MetricName, + Value: "test_metric", + }}, + } + + ctx := setUserIDToGRPCContext(context.Background(), "user-1") + srv := newBucketStoreSeriesServer(ctx) + err := stores.Series(req, srv) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "store creation failed") +} + +func TestParquetBucketStores_LabelNames_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.LabelNamesRequest{ + Start: 0, + End: 100, + } + + _, err := stores.LabelNames(context.Background(), req) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStores_LabelValues_NoUserID(t *testing.T) { + stores := &ParquetBucketStores{ + logger: log.NewNopLogger(), + } + + req := &storepb.LabelValuesRequest{ + Start: 0, + End: 100, + Label: "__name__", + } + + _, err := stores.LabelValues(context.Background(), req) + + assert.Error(t, err) + assert.Contains(t, err.Error(), "no userID") +} + +func TestParquetBucketStore_FindParquetBlocks_InvalidMatchers(t *testing.T) { + store := &parquetBucketStore{ + logger: log.NewNopLogger(), + } + + // Test with no matchers + _, err := store.findParquetBlocks(context.Background(), nil) + assert.Error(t, err) + s, ok := status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with multiple matchers + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: block.BlockIDLabel, Value: "block1"}, + {Type: storepb.LabelMatcher_RE, Name: block.BlockIDLabel, Value: "block2"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with wrong matcher type + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_EQ, Name: block.BlockIDLabel, Value: "block1"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) + + // Test with wrong matcher name + _, err = store.findParquetBlocks(context.Background(), []storepb.LabelMatcher{ + {Type: storepb.LabelMatcher_RE, Name: "wrong_name", Value: "block1"}, + }) + assert.Error(t, err) + s, ok = status.FromError(err) + assert.True(t, ok) + assert.Equal(t, codes.InvalidArgument, s.Code()) +} + +func TestChunkToStoreEncoding(t *testing.T) { + tests := []struct { + name string + encoding chunkenc.Encoding + expected storepb.Chunk_Encoding + }{ + { + name: "XOR encoding", + encoding: chunkenc.EncXOR, + expected: storepb.Chunk_XOR, + }, + { + name: "Histogram encoding", + encoding: chunkenc.EncHistogram, + expected: storepb.Chunk_HISTOGRAM, + }, + { + name: "Float histogram encoding", + encoding: chunkenc.EncFloatHistogram, + expected: storepb.Chunk_FLOAT_HISTOGRAM, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := chunkToStoreEncoding(tt.encoding) + assert.Equal(t, tt.expected, result) + }) + } +} + +func TestParquetBucketStoresWithCaching(t *testing.T) { + // Create a temporary directory for the test + tempDir := t.TempDir() + + // Create storage configuration with caching enabled + storageCfg := cortex_tsdb.BlocksStorageConfig{ + UsersScanner: cortex_tsdb.UsersScannerConfig{ + Strategy: cortex_tsdb.UserScanStrategyList, + }, + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + ChunksCache: cortex_tsdb.ChunksCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + MetadataCache: cortex_tsdb.MetadataCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + ParquetLabelsCache: cortex_tsdb.ParquetLabelsCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + }, + } + + // Create a mock bucket client + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + // Create limits + limits := validation.NewOverrides(validation.Limits{}, nil) + + // Create parquet bucket stores with caching + parquetStores, err := newParquetBucketStores(storageCfg, NewNoShardingStrategy(log.NewNopLogger(), nil), bucketClient, limits, log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + require.NotNil(t, parquetStores) + + // Verify that the bucket is a caching bucket (it should be wrapped) + // The caching bucket should be different from the original bucket client + require.NotEqual(t, bucketClient, parquetStores.bucket) +} + +func TestCreateCachingBucketClientForParquet(t *testing.T) { + // Create a temporary directory for the test + tempDir := t.TempDir() + + // Create storage configuration with caching enabled + storageCfg := cortex_tsdb.BlocksStorageConfig{ + Bucket: bucket.Config{ + Backend: "filesystem", + Filesystem: filesystem.Config{ + Directory: tempDir, + }, + }, + BucketStore: cortex_tsdb.BucketStoreConfig{ + ChunksCache: cortex_tsdb.ChunksCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + MetadataCache: cortex_tsdb.MetadataCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + ParquetLabelsCache: cortex_tsdb.ParquetLabelsCacheConfig{ + BucketCacheBackend: cortex_tsdb.BucketCacheBackend{ + Backend: "inmemory", + }, + }, + }, + } + + // Create a mock bucket client + bucketClient, err := bucket.NewClient(context.Background(), storageCfg.Bucket, nil, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + + // Create caching bucket client + cachingBucket, err := createCachingBucketClientForParquet(storageCfg, bucketClient, "test", log.NewNopLogger(), prometheus.NewRegistry()) + require.NoError(t, err) + require.NotNil(t, cachingBucket) + + // Verify that the caching bucket is different from the original bucket client + require.NotEqual(t, bucketClient, cachingBucket) +} + +func TestParquetBucketStores_Series_ShouldReturnErrorIfMaxInflightRequestIsReached(t *testing.T) { + cfg := prepareStorageConfig(t) + cfg.BucketStore.BucketStoreType = string(cortex_tsdb.ParquetBucketStore) + cfg.BucketStore.MaxInflightRequests = 10 + reg := prometheus.NewPedanticRegistry() + storageDir := t.TempDir() + generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) + bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) + require.NoError(t, err) + + stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) + require.NoError(t, err) + require.NoError(t, stores.InitialSync(context.Background())) + + parquetStores := stores.(*ParquetBucketStores) + // Set inflight requests to the limit + for range 10 { + parquetStores.inflightRequests.Inc() + } + series, warnings, err := querySeries(stores, "user_id", "series_1", 0, 100) + assert.ErrorIs(t, err, ErrTooManyInflightRequests) + assert.Empty(t, series) + assert.Empty(t, warnings) +} + +//func TestParquetBucketStores_Series_ShouldNotCheckMaxInflightRequestsIfTheLimitIsDisabled(t *testing.T) { +// cfg := prepareStorageConfig(t) +// cfg.BucketStore.BucketStoreType = string(ParquetBucketStore) +// reg := prometheus.NewPedanticRegistry() +// storageDir := t.TempDir() +// generateStorageBlock(t, storageDir, "user_id", "series_1", 0, 100, 15) +// bucket, err := filesystem.NewBucketClient(filesystem.Config{Directory: storageDir}) +// require.NoError(t, err) +// +// stores, err := NewBucketStores(cfg, NewNoShardingStrategy(log.NewNopLogger(), nil), objstore.WithNoopInstr(bucket), defaultLimitsOverrides(t), mockLoggingLevel(), log.NewNopLogger(), reg) +// require.NoError(t, err) +// require.NoError(t, stores.InitialSync(context.Background())) +// +// parquetStores := stores.(*ParquetBucketStores) +// // Set inflight requests to the limit (max_inflight_request is set to 0 by default = disabled) +// for i := 0; i < 10; i++ { +// parquetStores.inflightRequests.Inc() +// } +// series, _, err := querySeriesWithBlockIDs(stores, "user_id", "series_1", 0, 100) +// require.NoError(t, err) +// assert.Equal(t, 1, len(series)) +//} diff --git a/pkg/util/inflight.go b/pkg/util/inflight.go new file mode 100644 index 00000000000..3ad1326b3c0 --- /dev/null +++ b/pkg/util/inflight.go @@ -0,0 +1,30 @@ +package util + +import "sync" + +type InflightRequestTracker struct { + mu sync.RWMutex + cnt int +} + +func NewInflightRequestTracker() *InflightRequestTracker { + return &InflightRequestTracker{} +} + +func (t *InflightRequestTracker) Inc() { + t.mu.Lock() + t.cnt++ + t.mu.Unlock() +} + +func (t *InflightRequestTracker) Dec() { + t.mu.Lock() + t.cnt-- + t.mu.Unlock() +} + +func (t *InflightRequestTracker) Count() int { + t.mu.RLock() + defer t.mu.RUnlock() + return t.cnt +} diff --git a/schemas/cortex-config-schema.json b/schemas/cortex-config-schema.json index 1ae848a9c45..edd5029a9ae 100644 --- a/schemas/cortex-config-schema.json +++ b/schemas/cortex-config-schema.json @@ -1065,6 +1065,12 @@ }, "type": "object" }, + "bucket_store_type": { + "default": "tsdb", + "description": "Type of bucket store to use (tsdb or parquet).", + "type": "string", + "x-cli-flag": "blocks-storage.bucket-store.bucket-store-type" + }, "chunks_cache": { "properties": { "attributes_ttl": { @@ -8281,14 +8287,14 @@ "x-format": "duration" }, "wait_stability_max_duration": { - "default": "5m0s", + "default": "5s", "description": "Maximum time to wait for ring stability at startup. If the store-gateway ring keeps changing after this period of time, the store-gateway will start anyway.", "type": "string", "x-cli-flag": "store-gateway.sharding-ring.wait-stability-max-duration", "x-format": "duration" }, "wait_stability_min_duration": { - "default": "1m0s", + "default": "0s", "description": "Minimum time to wait for ring stability at startup. 0 to disable.", "type": "string", "x-cli-flag": "store-gateway.sharding-ring.wait-stability-min-duration",