Skip to content

Commit f88eddb

Browse files
authored
Enable 'snappy-block' compression on ingester clients by default (#6148)
* Enable 'snappy-block' compression on ingester clients by default Signed-off-by: alanprot <[email protected]> * changelog Signed-off-by: alanprot <[email protected]> * lint Signed-off-by: alanprot <[email protected]> * fix backward compatibility test Signed-off-by: alanprot <[email protected]> --------- Signed-off-by: alanprot <[email protected]> Signed-off-by: Alan Protasio <[email protected]>
1 parent efc0bdf commit f88eddb

File tree

9 files changed

+85
-28
lines changed

9 files changed

+85
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
* [CHANGE] OTLP: Set `AddMetricSuffixes` to true to always enable metric name normalization. #6136
99
* [CHANGE] Querier: Deprecate and enable by default `querier.ingester-metadata-streaming` flag. #6147
1010
* [CHANGE] QueryFrontend/QueryScheduler: Deprecate `-querier.max-outstanding-requests-per-tenant` and `-query-scheduler.max-outstanding-requests-per-tenant` flags. Use frontend.max-outstanding-requests-per-tenant instead. #6146
11+
* [CHANGE] Ingesters: Enable 'snappy-block' compression on ingester clients by default. #6148
1112
* [FEATURE] Ingester/Distributor: Experimental: Enable native histogram ingestion via `-blocks-storage.tsdb.enable-native-histograms` flag. #5986 #6010 #6020
1213
* [FEATURE] Querier: Enable querying native histogram chunks. #5944 #6031
1314
* [FEATURE] Query Frontend: Support native histogram in query frontend response. #5996 #6043

docs/configuration/config-file-reference.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3020,7 +3020,7 @@ grpc_client_config:
30203020
# Use compression when sending messages. Supported values are: 'gzip',
30213021
# 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)
30223022
# CLI flag: -ingester.client.grpc-compression
3023-
[grpc_compression: <string> | default = ""]
3023+
[grpc_compression: <string> | default = "snappy-block"]
30243024
30253025
# Rate limit for gRPC client; 0 means disabled.
30263026
# CLI flag: -ingester.client.grpc-client-rate-limit

integration/backward_compatibility_test.go

Lines changed: 74 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,53 @@ import (
1818
"github.com/cortexproject/cortex/integration/e2ecortex"
1919
)
2020

21+
type versionsImagesFlags struct {
22+
flagsForOldImage func(map[string]string) map[string]string
23+
flagsForNewImage func(map[string]string) map[string]string
24+
}
25+
2126
var (
2227
// If you change the image tag, remember to update it in the preloading done
2328
// by GitHub Actions too (see .github/workflows/test-build-deploy.yml).
24-
previousVersionImages = map[string]func(map[string]string) map[string]string{
25-
"quay.io/cortexproject/cortex:v1.13.1": func(m map[string]string) map[string]string {
26-
m["-ingester.stream-chunks-when-using-blocks"] = "true"
27-
return m
29+
previousVersionImages = map[string]*versionsImagesFlags{
30+
"quay.io/cortexproject/cortex:v1.13.1": {
31+
flagsForOldImage: func(m map[string]string) map[string]string {
32+
m["-ingester.stream-chunks-when-using-blocks"] = "true"
33+
return m
34+
},
35+
flagsForNewImage: func(m map[string]string) map[string]string {
36+
m["-ingester.client.grpc-compression"] = "snappy"
37+
return m
38+
},
39+
},
40+
"quay.io/cortexproject/cortex:v1.13.2": {
41+
flagsForOldImage: func(m map[string]string) map[string]string {
42+
m["-ingester.stream-chunks-when-using-blocks"] = "true"
43+
return m
44+
},
45+
flagsForNewImage: func(m map[string]string) map[string]string {
46+
m["-ingester.client.grpc-compression"] = "snappy"
47+
return m
48+
},
49+
},
50+
"quay.io/cortexproject/cortex:v1.14.0": {
51+
flagsForOldImage: func(m map[string]string) map[string]string {
52+
return m
53+
},
54+
flagsForNewImage: func(m map[string]string) map[string]string {
55+
m["-ingester.client.grpc-compression"] = "snappy"
56+
return m
57+
},
2858
},
29-
"quay.io/cortexproject/cortex:v1.13.2": func(m map[string]string) map[string]string {
30-
m["-ingester.stream-chunks-when-using-blocks"] = "true"
31-
return m
59+
"quay.io/cortexproject/cortex:v1.14.1": {
60+
flagsForOldImage: func(m map[string]string) map[string]string {
61+
return m
62+
},
63+
flagsForNewImage: func(m map[string]string) map[string]string {
64+
m["-ingester.client.grpc-compression"] = "snappy"
65+
return m
66+
},
3267
},
33-
"quay.io/cortexproject/cortex:v1.14.0": nil,
34-
"quay.io/cortexproject/cortex:v1.14.1": nil,
3568
"quay.io/cortexproject/cortex:v1.15.0": nil,
3669
"quay.io/cortexproject/cortex:v1.15.1": nil,
3770
"quay.io/cortexproject/cortex:v1.15.2": nil,
@@ -44,27 +77,41 @@ var (
4477
)
4578

4679
func TestBackwardCompatibilityWithBlocksStorage(t *testing.T) {
47-
for previousImage, flagsFn := range previousVersionImages {
80+
for previousImage, imagesFlags := range previousVersionImages {
4881
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
4982
flags := blocksStorageFlagsWithFlushOnShutdown()
50-
if flagsFn != nil {
51-
flags = flagsFn(flags)
83+
var flagsForNewImage func(map[string]string) map[string]string
84+
if imagesFlags != nil {
85+
if imagesFlags.flagsForOldImage != nil {
86+
flags = imagesFlags.flagsForOldImage(flags)
87+
}
88+
89+
if imagesFlags.flagsForNewImage != nil {
90+
flagsForNewImage = imagesFlags.flagsForNewImage
91+
}
5292
}
5393

54-
runBackwardCompatibilityTestWithBlocksStorage(t, previousImage, flags)
94+
runBackwardCompatibilityTestWithBlocksStorage(t, previousImage, flags, flagsForNewImage)
5595
})
5696
}
5797
}
5898

5999
func TestNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T) {
60-
for previousImage, flagsFn := range previousVersionImages {
100+
for previousImage, imagesFlags := range previousVersionImages {
61101
t.Run(fmt.Sprintf("Backward compatibility upgrading from %s", previousImage), func(t *testing.T) {
62102
flags := blocksStorageFlagsWithFlushOnShutdown()
63-
if flagsFn != nil {
64-
flags = flagsFn(flags)
103+
var flagsForNewImage func(map[string]string) map[string]string
104+
if imagesFlags != nil {
105+
if imagesFlags.flagsForOldImage != nil {
106+
flags = imagesFlags.flagsForOldImage(flags)
107+
}
108+
109+
if imagesFlags.flagsForNewImage != nil {
110+
flagsForNewImage = imagesFlags.flagsForNewImage
111+
}
65112
}
66113

67-
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage, flags)
114+
runNewDistributorsCanPushToOldIngestersWithReplication(t, previousImage, flags, flagsForNewImage)
68115
})
69116
}
70117
}
@@ -75,7 +122,7 @@ func blocksStorageFlagsWithFlushOnShutdown() map[string]string {
75122
})
76123
}
77124

78-
func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage string, flagsForOldImage map[string]string) {
125+
func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage string, flagsForOldImage map[string]string, flagsForNewImageFn func(map[string]string) map[string]string) {
79126
s, err := e2e.NewScenario(networkName)
80127
require.NoError(t, err)
81128
defer s.Close()
@@ -87,6 +134,10 @@ func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage s
87134

88135
flagsForNewImage := blocksStorageFlagsWithFlushOnShutdown()
89136

137+
if flagsForNewImageFn != nil {
138+
flagsForNewImage = flagsForNewImageFn(flagsForNewImage)
139+
}
140+
90141
// Start other Cortex components (ingester running on previous version).
91142
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForOldImage, previousImage)
92143
distributor := e2ecortex.NewDistributor("distributor", "consul", consul.NetworkHTTPEndpoint(), flagsForNewImage, "")
@@ -127,7 +178,7 @@ func runBackwardCompatibilityTestWithBlocksStorage(t *testing.T, previousImage s
127178
}
128179

129180
// Check for issues like https://github.com/cortexproject/cortex/issues/2356
130-
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string, flagsForPreviousImage map[string]string) {
181+
func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previousImage string, flagsForPreviousImage map[string]string, flagsForNewImageFn func(map[string]string) map[string]string) {
131182
s, err := e2e.NewScenario(networkName)
132183
require.NoError(t, err)
133184
defer s.Close()
@@ -141,6 +192,10 @@ func runNewDistributorsCanPushToOldIngestersWithReplication(t *testing.T, previo
141192
"-distributor.replication-factor": "3",
142193
})
143194

195+
if flagsForNewImageFn != nil {
196+
flagsForNewImage = flagsForNewImageFn(flagsForNewImage)
197+
}
198+
144199
// Start other Cortex components (ingester running on previous version).
145200
ingester1 := e2ecortex.NewIngester("ingester-1", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)
146201
ingester2 := e2ecortex.NewIngester("ingester-2", e2ecortex.RingStoreConsul, consul.NetworkHTTPEndpoint(), flagsForPreviousImage, previousImage)

pkg/frontend/v2/frontend.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
5757
f.StringVar(&cfg.Addr, "frontend.instance-addr", "", "IP address to advertise to querier (via scheduler) (resolved via interfaces by default).")
5858
f.IntVar(&cfg.Port, "frontend.instance-port", 0, "Port to advertise to querier (via scheduler) (defaults to server.grpc-listen-port).")
5959

60-
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", f)
60+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("frontend.grpc-client-config", "", f)
6161
}
6262

6363
// Frontend implements GrpcRoundTripper. It queues HTTP requests,

pkg/ingester/client/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66

77
"github.com/cortexproject/cortex/pkg/cortexpb"
88
"github.com/cortexproject/cortex/pkg/util/grpcclient"
9+
"github.com/cortexproject/cortex/pkg/util/grpcencoding/snappyblock"
910

1011
"github.com/go-kit/log"
1112
"github.com/pkg/errors"
@@ -116,7 +117,7 @@ type Config struct {
116117

117118
// RegisterFlags registers configuration settings used by the ingester client config.
118119
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
119-
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", f)
120+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("ingester.client", snappyblock.Name, f)
120121
f.Int64Var(&cfg.MaxInflightPushRequests, "ingester.client.max-inflight-push-requests", 0, "Max inflight push requests that this ingester client can handle. This limit is per-ingester-client. Additional requests will be rejected. 0 = unlimited.")
121122
}
122123

pkg/querier/worker/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
4545
f.BoolVar(&cfg.MatchMaxConcurrency, "querier.worker-match-max-concurrent", false, "Force worker concurrency to match the -querier.max-concurrent option. Overrides querier.worker-parallelism.")
4646
f.StringVar(&cfg.QuerierID, "querier.id", "", "Querier ID, sent to frontend service to identify requests from the same querier. Defaults to hostname.")
4747

48-
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", f)
48+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("querier.frontend-client", "", f)
4949
}
5050

5151
func (cfg *Config) Validate(log log.Logger) error {

pkg/ruler/ruler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func (cfg *Config) Validate(limits validation.Limits, log log.Logger) error {
175175

176176
// RegisterFlags adds the flags required to config this to the given FlagSet
177177
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
178-
cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", f)
178+
cfg.ClientTLSConfig.RegisterFlagsWithPrefix("ruler.client", "", f)
179179
cfg.Ring.RegisterFlags(f)
180180
cfg.Notifier.RegisterFlags(f)
181181

pkg/scheduler/scheduler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ type Config struct {
9191
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
9292
flagext.DeprecatedFlag(f, "query-scheduler.max-outstanding-requests-per-tenant", "Deprecated: Use frontend.max-outstanding-requests-per-tenant instead.", util_log.Logger)
9393
f.DurationVar(&cfg.QuerierForgetDelay, "query-scheduler.querier-forget-delay", 0, "If a querier disconnects without sending notification about graceful shutdown, the query-scheduler will keep the querier in the tenant's shard until the forget delay has passed. This feature is useful to reduce the blast radius when shuffle-sharding is enabled.")
94-
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", f)
94+
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("query-scheduler.grpc-client-config", "", f)
9595
}
9696

9797
// NewScheduler creates a new Scheduler.

pkg/util/grpcclient/grpcclient.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,14 +36,14 @@ type Config struct {
3636

3737
// RegisterFlags registers flags.
3838
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
39-
cfg.RegisterFlagsWithPrefix("", f)
39+
cfg.RegisterFlagsWithPrefix("", "", f)
4040
}
4141

4242
// RegisterFlagsWithPrefix registers flags with prefix.
43-
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
43+
func (cfg *Config) RegisterFlagsWithPrefix(prefix, defaultGrpcCompression string, f *flag.FlagSet) {
4444
f.IntVar(&cfg.MaxRecvMsgSize, prefix+".grpc-max-recv-msg-size", 100<<20, "gRPC client max receive message size (bytes).")
4545
f.IntVar(&cfg.MaxSendMsgSize, prefix+".grpc-max-send-msg-size", 16<<20, "gRPC client max send message size (bytes).")
46-
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", "", "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
46+
f.StringVar(&cfg.GRPCCompression, prefix+".grpc-compression", defaultGrpcCompression, "Use compression when sending messages. Supported values are: 'gzip', 'snappy', 'snappy-block' ,'zstd' and '' (disable compression)")
4747
f.Float64Var(&cfg.RateLimit, prefix+".grpc-client-rate-limit", 0., "Rate limit for gRPC client; 0 means disabled.")
4848
f.IntVar(&cfg.RateLimitBurst, prefix+".grpc-client-rate-limit-burst", 0, "Rate limit burst for gRPC client.")
4949
f.BoolVar(&cfg.BackoffOnRatelimits, prefix+".backoff-on-ratelimits", false, "Enable backoff and retry when we hit ratelimits.")

0 commit comments

Comments
 (0)