Skip to content

Commit 32b1b40

Browse files
authored
Use kv package from github.com/grafana/dskit (#4436)
Signed-off-by: Arve Knudsen <[email protected]>
1 parent 6f67beb commit 32b1b40

File tree

86 files changed

+756
-2479
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

86 files changed

+756
-2479
lines changed

docs/configuration/config-file-reference.md

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3810,9 +3810,7 @@ The `memberlist_config` configures the Gossip memberlist.
38103810
[compression_enabled: <boolean> | default = true]
38113811
38123812
# Other cluster members to join. Can be specified multiple times. It can be an
3813-
# IP, hostname or an entry specified in the DNS Service Discovery format (see
3814-
# https://cortexmetrics.io/docs/configuration/arguments/#dns-service-discovery
3815-
# for more details).
3813+
# IP, hostname or an entry specified in the DNS Service Discovery format.
38163814
# CLI flag: -memberlist.join
38173815
[join_members: <list of string> | default = []]
38183816

go.mod

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ require (
1111
github.com/NYTimes/gziphandler v1.1.1
1212
github.com/alecthomas/units v0.0.0-20210208195552-ff826a37aa15
1313
github.com/alicebob/miniredis/v2 v2.14.3
14-
github.com/armon/go-metrics v0.3.6
1514
github.com/aws/aws-sdk-go v1.38.68
1615
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
1716
github.com/cespare/xxhash v1.1.0
@@ -30,12 +29,8 @@ require (
3029
github.com/golang/protobuf v1.5.2
3130
github.com/golang/snappy v0.0.4
3231
github.com/gorilla/mux v1.8.0
33-
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208
32+
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327
3433
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
35-
github.com/hashicorp/consul/api v1.9.1
36-
github.com/hashicorp/go-cleanhttp v0.5.1
37-
github.com/hashicorp/go-sockaddr v1.0.2
38-
github.com/hashicorp/memberlist v0.2.3
3934
github.com/json-iterator/go v1.1.11
4035
github.com/lib/pq v1.3.0
4136
github.com/minio/minio-go/v7 v7.0.10
@@ -59,9 +54,6 @@ require (
5954
github.com/uber/jaeger-client-go v2.29.1+incompatible
6055
github.com/weaveworks/common v0.0.0-20210722103813-e649eff5ab4a
6156
go.etcd.io/bbolt v1.3.6
62-
go.etcd.io/etcd v3.3.25+incompatible
63-
go.etcd.io/etcd/client/v3 v3.5.0
64-
go.etcd.io/etcd/server/v3 v3.5.0
6557
go.uber.org/atomic v1.9.0
6658
golang.org/x/net v0.0.0-20210610132358-84b48f89b13b
6759
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -962,8 +962,8 @@ github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY
962962
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
963963
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
964964
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
965-
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208 h1:Yc+q7s/wcyd8bvPTQygY1iQTxxr931cLISsRhYltrJM=
966-
github.com/grafana/dskit v0.0.0-20210824090727-039d9afd9208/go.mod h1:uF46UNN1/feB1egpq8UGbBBKvJjGgZauW7pcVbeFLLM=
965+
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327 h1:THdW9RnugPdLwW8RmHB/xOcKf267QunSH1mDuaJkhWk=
966+
github.com/grafana/dskit v0.0.0-20210827060659-9daca2f00327/go.mod h1:+T2iuDOzx/BSQJSvli9FUvLM5HnV8aDPmXM8KWuVj3M=
967967
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=
968968
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85/go.mod h1:crI9WX6p0IhrqB+DqIUHulRW853PaNFf7o4UprV//3I=
969969
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=

integration/getting_started_with_gossiped_ring_test.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,10 @@ func TestGettingStartedWithGossipedRing(t *testing.T) {
125125
// single ingester and so we have 1 block shipped from ingesters and loaded by both store-gateways.
126126
require.NoError(t, cortex1.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))
127127
require.NoError(t, cortex2.WaitSumMetrics(e2e.Equals(1), "cortex_bucket_store_blocks_loaded"))
128+
129+
// Make sure that no DNS failures occurred.
130+
// No actual DNS lookups are necessarily performed, so we can't really assert on that.
131+
mlMatcher := labels.MustNewMatcher(labels.MatchEqual, "name", "memberlist")
132+
require.NoError(t, cortex1.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_dns_failures_total"}, e2e.WithLabelMatchers(mlMatcher)))
133+
require.NoError(t, cortex2.WaitSumMetricsWithOptions(e2e.Equals(0), []string{"cortex_dns_failures_total"}, e2e.WithLabelMatchers(mlMatcher)))
128134
}

integration/kv_test.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ import (
1010
"testing"
1111
"time"
1212

13+
"github.com/go-kit/kit/log"
14+
"github.com/grafana/dskit/kv"
15+
"github.com/grafana/dskit/kv/consul"
16+
"github.com/grafana/dskit/kv/etcd"
1317
"github.com/prometheus/client_golang/prometheus"
1418
dto "github.com/prometheus/client_model/go"
1519
"github.com/stretchr/testify/assert"
1620
"github.com/stretchr/testify/require"
1721

1822
"github.com/cortexproject/cortex/integration/e2e"
1923
e2edb "github.com/cortexproject/cortex/integration/e2e/db"
20-
"github.com/cortexproject/cortex/pkg/ring/kv"
21-
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
22-
"github.com/cortexproject/cortex/pkg/ring/kv/etcd"
2324
)
2425

2526
func TestKVList(t *testing.T) {
@@ -117,7 +118,9 @@ func TestKVWatchAndDelete(t *testing.T) {
117118
})
118119
}
119120

120-
func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
121+
func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client {
122+
t.Helper()
123+
121124
etcdSvc := e2edb.NewETCD()
122125
require.NoError(t, scenario.StartAndWaitReady(etcdSvc))
123126

@@ -131,13 +134,15 @@ func setupEtcd(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer)
131134
MaxRetries: 5,
132135
},
133136
},
134-
}, stringCodec{}, reg)
137+
}, stringCodec{}, reg, logger)
135138
require.NoError(t, err)
136139

137140
return etcdKv
138141
}
139142

140-
func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client {
143+
func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client {
144+
t.Helper()
145+
141146
consulSvc := e2edb.NewConsul()
142147
require.NoError(t, scenario.StartAndWaitReady(consulSvc))
143148

@@ -152,14 +157,14 @@ func setupConsul(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer
152157
WatchKeyRateLimit: 1,
153158
},
154159
},
155-
}, stringCodec{}, reg)
160+
}, stringCodec{}, reg, logger)
156161
require.NoError(t, err)
157162

158163
return consulKv
159164
}
160165

161166
func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
162-
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client{
167+
setupFns := map[string]func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client{
163168
"etcd": setupEtcd,
164169
"consul": setupConsul,
165170
}
@@ -171,13 +176,13 @@ func testKVs(t *testing.T, testFn func(t *testing.T, client kv.Client, reg *prom
171176
}
172177
}
173178

174-
func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
179+
func testKVScenario(t *testing.T, kvSetupFn func(t *testing.T, scenario *e2e.Scenario, reg prometheus.Registerer, logger log.Logger) kv.Client, testFn func(t *testing.T, client kv.Client, reg *prometheus.Registry)) {
175180
s, err := e2e.NewScenario(networkName)
176181
require.NoError(t, err)
177182
defer s.Close()
178183

179184
reg := prometheus.NewRegistry()
180-
client := kvSetupFn(t, s, reg)
185+
client := kvSetupFn(t, s, prometheus.WrapRegistererWithPrefix("cortex_", reg), log.NewNopLogger())
181186
testFn(t, client, reg)
182187
}
183188

pkg/alertmanager/alertmanager_ring.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,9 @@ import (
88

99
"github.com/go-kit/kit/log/level"
1010
"github.com/grafana/dskit/flagext"
11+
"github.com/grafana/dskit/kv"
1112

1213
"github.com/cortexproject/cortex/pkg/ring"
13-
"github.com/cortexproject/cortex/pkg/ring/kv"
1414
util_log "github.com/cortexproject/cortex/pkg/util/log"
1515
)
1616

pkg/alertmanager/api_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,14 @@ import (
1818
"github.com/prometheus/client_golang/prometheus"
1919
commoncfg "github.com/prometheus/common/config"
2020
"github.com/stretchr/testify/assert"
21+
"github.com/stretchr/testify/require"
2122
"github.com/thanos-io/thanos/pkg/objstore"
23+
"github.com/weaveworks/common/user"
2224
"gopkg.in/yaml.v2"
2325

2426
"github.com/cortexproject/cortex/pkg/alertmanager/alertspb"
2527
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
2628
util_log "github.com/cortexproject/cortex/pkg/util/log"
27-
28-
"github.com/stretchr/testify/require"
29-
"github.com/weaveworks/common/user"
3029
)
3130

3231
func TestAMConfigValidationAPI(t *testing.T) {

pkg/alertmanager/distributor_test.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,22 +13,23 @@ import (
1313
"testing"
1414
"time"
1515

16+
"github.com/go-kit/kit/log"
1617
"github.com/grafana/dskit/flagext"
17-
18-
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
19-
"github.com/cortexproject/cortex/pkg/ring"
20-
"github.com/cortexproject/cortex/pkg/ring/kv"
21-
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
22-
util_log "github.com/cortexproject/cortex/pkg/util/log"
23-
"github.com/cortexproject/cortex/pkg/util/test"
24-
18+
"github.com/grafana/dskit/kv"
19+
"github.com/grafana/dskit/kv/consul"
2520
"github.com/grafana/dskit/services"
2621
"github.com/prometheus/client_golang/prometheus"
22+
"github.com/stretchr/testify/assert"
2723
"github.com/stretchr/testify/require"
2824
"github.com/weaveworks/common/httpgrpc"
2925
"github.com/weaveworks/common/user"
3026
"google.golang.org/grpc"
3127
"google.golang.org/grpc/health/grpc_health_v1"
28+
29+
"github.com/cortexproject/cortex/pkg/alertmanager/alertmanagerpb"
30+
"github.com/cortexproject/cortex/pkg/ring"
31+
util_log "github.com/cortexproject/cortex/pkg/util/log"
32+
"github.com/cortexproject/cortex/pkg/util/test"
3233
)
3334

3435
func TestDistributor_DistributeRequest(t *testing.T) {
@@ -336,7 +337,9 @@ func prepare(t *testing.T, numAM, numHappyAM, replicationFactor int, responseBod
336337
amByAddr[a.myAddr] = ams[i]
337338
}
338339

339-
kvStore := consul.NewInMemoryClient(ring.GetCodec())
340+
kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
341+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
342+
340343
err := kvStore.CAS(context.Background(), RingKey,
341344
func(_ interface{}) (interface{}, bool, error) {
342345
return &ring.Desc{

pkg/alertmanager/multitenant.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/go-kit/kit/log"
1717
"github.com/go-kit/kit/log/level"
1818
"github.com/grafana/dskit/flagext"
19+
"github.com/grafana/dskit/kv"
1920
"github.com/grafana/dskit/services"
2021
"github.com/pkg/errors"
2122
"github.com/prometheus/alertmanager/cluster"
@@ -34,7 +35,6 @@ import (
3435
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
3536
"github.com/cortexproject/cortex/pkg/ring"
3637
"github.com/cortexproject/cortex/pkg/ring/client"
37-
"github.com/cortexproject/cortex/pkg/ring/kv"
3838
"github.com/cortexproject/cortex/pkg/tenant"
3939
"github.com/cortexproject/cortex/pkg/util"
4040
"github.com/cortexproject/cortex/pkg/util/concurrency"
@@ -344,7 +344,8 @@ func NewMultitenantAlertmanager(cfg *MultitenantAlertmanagerConfig, store alerts
344344
ringStore, err = kv.NewClient(
345345
cfg.ShardingRing.KVStore,
346346
ring.GetCodec(),
347-
kv.RegistererWithKVName(registerer, "alertmanager"),
347+
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("cortex_", registerer), "alertmanager"),
348+
logger,
348349
)
349350
if err != nil {
350351
return nil, errors.Wrap(err, "create KV store client")

pkg/alertmanager/multitenant_test.go

Lines changed: 28 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121

2222
"github.com/go-kit/kit/log"
2323
"github.com/grafana/dskit/flagext"
24+
"github.com/grafana/dskit/kv/consul"
2425
"github.com/grafana/dskit/services"
2526
"github.com/prometheus/alertmanager/cluster/clusterpb"
2627
"github.com/prometheus/alertmanager/notify"
@@ -43,7 +44,6 @@ import (
4344
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore"
4445
"github.com/cortexproject/cortex/pkg/alertmanager/alertstore/bucketclient"
4546
"github.com/cortexproject/cortex/pkg/ring"
46-
"github.com/cortexproject/cortex/pkg/ring/kv/consul"
4747
"github.com/cortexproject/cortex/pkg/storage/bucket"
4848
"github.com/cortexproject/cortex/pkg/util"
4949
"github.com/cortexproject/cortex/pkg/util/concurrency"
@@ -611,7 +611,9 @@ func TestMultitenantAlertmanager_deleteUnusedLocalUserState(t *testing.T) {
611611
func TestMultitenantAlertmanager_zoneAwareSharding(t *testing.T) {
612612
ctx := context.Background()
613613
alertStore := prepareInMemoryAlertStore()
614-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
614+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
615+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
616+
615617
const (
616618
user1 = "user1"
617619
user2 = "user2"
@@ -689,7 +691,8 @@ func TestMultitenantAlertmanager_deleteUnusedRemoteUserState(t *testing.T) {
689691
)
690692

691693
alertStore := prepareInMemoryAlertStore()
692-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
694+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
695+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
693696

694697
createInstance := func(i int) *MultitenantAlertmanager {
695698
reg := prometheus.NewPedanticRegistry()
@@ -1005,7 +1008,8 @@ func TestMultitenantAlertmanager_InitialSyncWithSharding(t *testing.T) {
10051008
ctx := context.Background()
10061009
amConfig := mockAlertmanagerConfig(t)
10071010
amConfig.ShardingEnabled = true
1008-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1011+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1012+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
10091013

10101014
// Use an alert store with a mocked backend.
10111015
bkt := &bucket.ClientMock{}
@@ -1109,7 +1113,9 @@ func TestMultitenantAlertmanager_PerTenantSharding(t *testing.T) {
11091113
for _, tt := range tc {
11101114
t.Run(tt.name, func(t *testing.T) {
11111115
ctx := context.Background()
1112-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1116+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1117+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1118+
11131119
alertStore := prepareInMemoryAlertStore()
11141120

11151121
var instances []*MultitenantAlertmanager
@@ -1296,7 +1302,9 @@ func TestMultitenantAlertmanager_SyncOnRingTopologyChanges(t *testing.T) {
12961302
amConfig.ShardingRing.RingCheckPeriod = 100 * time.Millisecond
12971303
amConfig.PollInterval = time.Hour // Don't trigger the periodic check.
12981304

1299-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1305+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1306+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1307+
13001308
alertStore := prepareInMemoryAlertStore()
13011309

13021310
reg := prometheus.NewPedanticRegistry()
@@ -1347,7 +1355,9 @@ func TestMultitenantAlertmanager_RingLifecyclerShouldAutoForgetUnhealthyInstance
13471355
amConfig.ShardingRing.HeartbeatPeriod = 100 * time.Millisecond
13481356
amConfig.ShardingRing.HeartbeatTimeout = heartbeatTimeout
13491357

1350-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1358+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1359+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1360+
13511361
alertStore := prepareInMemoryAlertStore()
13521362

13531363
am, err := createMultitenantAlertmanager(amConfig, nil, nil, alertStore, ringStore, nil, log.NewNopLogger(), nil)
@@ -1379,7 +1389,8 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {
13791389
ctx := context.Background()
13801390
amConfig := mockAlertmanagerConfig(t)
13811391
amConfig.ShardingEnabled = true
1382-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1392+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1393+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
13831394

13841395
// Mock the store to fail listing configs.
13851396
bkt := &bucket.ClientMock{}
@@ -1401,7 +1412,9 @@ func TestMultitenantAlertmanager_InitialSyncFailureWithSharding(t *testing.T) {
14011412

14021413
func TestAlertmanager_ReplicasPosition(t *testing.T) {
14031414
ctx := context.Background()
1404-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1415+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1416+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1417+
14051418
mockStore := prepareInMemoryAlertStore()
14061419
require.NoError(t, mockStore.SetAlertConfig(ctx, alertspb.AlertConfigDesc{
14071420
User: "user-1",
@@ -1500,7 +1513,9 @@ func TestAlertmanager_StateReplicationWithSharding(t *testing.T) {
15001513
for _, tt := range tc {
15011514
t.Run(tt.name, func(t *testing.T) {
15021515
ctx := context.Background()
1503-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1516+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1517+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1518+
15041519
mockStore := prepareInMemoryAlertStore()
15051520
clientPool := newPassthroughAlertmanagerClientPool()
15061521
externalURL := flagext.URLValue{}
@@ -1693,7 +1708,9 @@ func TestAlertmanager_StateReplicationWithSharding_InitialSyncFromPeers(t *testi
16931708
for _, tt := range tc {
16941709
t.Run(tt.name, func(t *testing.T) {
16951710
ctx := context.Background()
1696-
ringStore := consul.NewInMemoryClient(ring.GetCodec())
1711+
ringStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger())
1712+
t.Cleanup(func() { assert.NoError(t, closer.Close()) })
1713+
16971714
mockStore := prepareInMemoryAlertStore()
16981715
clientPool := newPassthroughAlertmanagerClientPool()
16991716
externalURL := flagext.URLValue{}

0 commit comments

Comments
 (0)