Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ require (
github.com/tsenart/vegeta v12.7.0+incompatible
github.com/uber/jaeger-client-go v2.30.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible
github.com/valkey-io/valkey-go v1.0.70
github.com/valkey-io/valkey-go/valkeyhook v1.0.70
github.com/valkey-io/valkey-go/valkeyotel v1.0.70
github.com/yookoala/gofast v0.8.0
github.com/yuin/gopher-lua v1.1.1
go.opentelemetry.io/contrib/exporters/autoexport v0.64.0
Expand Down Expand Up @@ -145,7 +148,6 @@ require (
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/olekukonko/tablewriter v0.0.5 // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.19.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/peterh/liner v1.2.2 // indirect
Expand Down
14 changes: 12 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,8 @@ github.com/onsi/ginkgo v1.16.5/go.mod h1:+E8gABHa3K6zRBolWtd+ROzc/U5bkGt0FwiG042
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.19.0 h1:4ieX6qQjPP/BfC3mpsAtIGGlxTWPeA3Inl/7DtXw1tw=
github.com/onsi/gomega v1.19.0/go.mod h1:LY+I3pBVzYsTBU1AnDwOSxaYi9WoWiqgwooUqq9yPro=
github.com/onsi/gomega v1.38.3 h1:eTX+W6dobAYfFeGC2PV6RwXRu/MyT+cQguijutvkpSM=
github.com/onsi/gomega v1.38.3/go.mod h1:ZCU1pkQcXDO5Sl9/VVEGlDyp+zm0m1cmeG5TOzLgdh4=
github.com/open-policy-agent/opa v1.7.1 h1:bhA2UGq5oS25471WB9aCJBWEp5/7WK+Nyb2PMAChQIg=
github.com/open-policy-agent/opa v1.7.1/go.mod h1:7cPuErOAt7k/oVWAVJnxqAC6mwArrAazkvk0RXiih2A=
github.com/open-policy-agent/opa-envoy-plugin v1.7.1-envoy h1:CCsfxxbtnG4ArLEDBJM1XREwUQJuqHLMyXK5iV7l1mM=
Expand Down Expand Up @@ -459,6 +459,14 @@ github.com/uber/jaeger-client-go v2.30.0+incompatible h1:D6wyKGCecFaSRUpo8lCVbaO
github.com/uber/jaeger-client-go v2.30.0+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.4.1+incompatible h1:td4jdvLcExb4cBISKIpHuGoVXh+dVKhn2Um6rjCsSsg=
github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
github.com/valkey-io/valkey-go v1.0.70 h1:mjYNT8qiazxDAJ0QNQ8twWT/YFOkOoRd40ERV2mB49Y=
github.com/valkey-io/valkey-go v1.0.70/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k=
github.com/valkey-io/valkey-go/mock v1.0.70 h1:aQjdy8Fd9g5Z/44jvnU/Tm2G3br9Rhww2LPfPIEQOSU=
github.com/valkey-io/valkey-go/mock v1.0.70/go.mod h1:R57vpSbCoVCqAIjNdQ5WMbd+MdYQOjK6B9FFTn2RI6A=
github.com/valkey-io/valkey-go/valkeyhook v1.0.70 h1:fEAwj+qyXXCEDpUZiJFkCNBL4hU21riJu3FWpe5urig=
github.com/valkey-io/valkey-go/valkeyhook v1.0.70/go.mod h1:KQgzBgc+nQZkTbTs4dLkkQW1W9UV8aRu+BndJQwk7w0=
github.com/valkey-io/valkey-go/valkeyotel v1.0.70 h1:aY9MdaSJjfhEj7hfKdIUMPXG1vhKlG2kSOSyd8kR+SE=
github.com/valkey-io/valkey-go/valkeyotel v1.0.70/go.mod h1:nhvqinpErLFm1ZGyhC0R704JC7PxBJ7IXJAjV++AEic=
github.com/vektah/gqlparser/v2 v2.5.30 h1:EqLwGAFLIzt1wpx1IPpY67DwUujF1OfzgEyDsLrN6kE=
github.com/vektah/gqlparser/v2 v2.5.30/go.mod h1:D1/VCZtV3LPnQrcPBeR/q5jkSQIPti0uYCP/RI0gIeo=
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb h1:zGWFAtiMcyryUHoUjUJX0/lt1H2+i2Ka2n+D3DImSNo=
Expand Down Expand Up @@ -544,6 +552,8 @@ go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y=
go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0=
Expand Down
85 changes: 85 additions & 0 deletions net/helper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package net

import (
"sync"
"testing"

"github.com/zalando/skipper/net/valkeytest"
)

type addressUpdater struct {
mu sync.Mutex
addrs []string
n int
}

// update returns non empty subsequences of addrs,
// e.g. for [foo bar baz] it returns:
// 1: [foo]
// 2: [foo bar]
// 3: [foo bar baz]
// 4: [foo]
// 5: [foo bar]
// 6: [foo bar baz]
// ...
func (u *addressUpdater) update() ([]string, error) {
u.mu.Lock()
defer u.mu.Unlock()

result := u.addrs[0 : 1+u.n%len(u.addrs)]
u.n++
return result, nil
}

func (u *addressUpdater) calls() int {
u.mu.Lock()
defer u.mu.Unlock()

return u.n
}

func TestAddressUpdater(t *testing.T) {
valkeyAddr, done := valkeytest.NewTestValkey(t)
defer done()
valkeyAddr2, done2 := valkeytest.NewTestValkey(t)
defer done2()

updater := &addressUpdater{addrs: []string{valkeyAddr, valkeyAddr2}}

if n := updater.calls(); n != 0 {
t.Fatalf("Failed to get result from calls() want 0, got: %d", n)
}

addr, err := updater.update()
if err != nil {
t.Fatalf("Failed to update: %v", err)
}
if n := len(addr); n != 1 {
t.Fatalf("Failed to get addr len of 1: %d", n)
}
if n := updater.calls(); n != 1 {
t.Fatalf("Failed to get result from calls() want 1, got: %d", n)
}

addr, err = updater.update()
if err != nil {
t.Fatalf("Failed to update: %v", err)
}
if n := len(addr); n != 2 {
t.Fatalf("Failed to get addr len of 2: %d", n)
}
if n := updater.calls(); n != 2 {
t.Fatalf("Failed to get result from calls() want 2, got: %d", n)
}

addr, err = updater.update()
if err != nil {
t.Fatalf("Failed to update: %v", err)
}
if n := len(addr); n != 1 {
t.Fatalf("Failed to get addr len of 1: %d", n)
}
if n := updater.calls(); n != 3 {
t.Fatalf("Failed to get result from calls() want 3, got: %d", n)
}
}
15 changes: 7 additions & 8 deletions net/redisclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,15 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/redis/go-redis/v9"
"github.com/redis/go-redis/v9/maintnotifications"
"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/metrics"

xxhash "github.com/cespare/xxhash/v2"
rendezvous "github.com/dgryski/go-rendezvous"

jump "github.com/dgryski/go-jump"

"github.com/dchest/siphash"
jump "github.com/dgryski/go-jump"
mpchash "github.com/dgryski/go-mpchash"
rendezvous "github.com/dgryski/go-rendezvous"

"github.com/zalando/skipper/logging"
"github.com/zalando/skipper/metrics"
)

// RedisOptions is used to configure the redis.Ring
Expand Down Expand Up @@ -189,7 +188,7 @@ func (w rendezvousVnodes) Get(key string) string {
func NewRendezvousVnodes(shards []string) redis.ConsistentHash {
vshards := make([]string, vnodePerShard*len(shards))
table := make(map[string]string)
for i := 0; i < vnodePerShard; i++ {
for i := range vnodePerShard {
for j, shard := range shards {
vshard := fmt.Sprintf("%s%d", shard, i) // suffix
table[vshard] = shard
Expand Down Expand Up @@ -335,7 +334,7 @@ func (r *RedisRingClient) startUpdater(ctx context.Context) {

addrs, err := r.options.AddrUpdater()
if err != nil {
r.log.Errorf("Failed to start redis updater: %v", err)
r.log.Errorf("Failed to run redis updater: %v", err)
continue
}
if !hasAll(addrs, old) {
Expand Down
32 changes: 0 additions & 32 deletions net/redisclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package net
import (
"context"
"fmt"
"sync"
"testing"
"testing/synctest"
"time"
Expand Down Expand Up @@ -87,37 +86,6 @@ func Test_hasAll(t *testing.T) {
}
}

type addressUpdater struct {
addrs []string
mu sync.Mutex
n int
}

// update returns non empty subsequences of addrs,
// e.g. for [foo bar baz] it returns:
// 1: [foo]
// 2: [foo bar]
// 3: [foo bar baz]
// 4: [foo]
// 5: [foo bar]
// 6: [foo bar baz]
// ...
func (u *addressUpdater) update() ([]string, error) {
u.mu.Lock()
defer u.mu.Unlock()

result := u.addrs[0 : 1+u.n%len(u.addrs)]
u.n++
return result, nil
}

func (u *addressUpdater) calls() int {
u.mu.Lock()
defer u.mu.Unlock()

return u.n
}

func TestRedisClient(t *testing.T) {
tracer, err := basic.InitTracer([]string{"recorder=in-memory"})
if err != nil {
Expand Down
Loading