diff --git a/config/config-consul.yaml b/config/config-consul.yaml index b457cc50..28f5e337 100644 --- a/config/config-consul.yaml +++ b/config/config-consul.yaml @@ -16,8 +16,7 @@ # under the License. # -addr: "127.0.0.1:8500" - +addr: "127.0.0.1:9379" # Which store engine should be used by controller # options: etcd, zookeeper, raft, consul @@ -40,7 +39,6 @@ controller: failover: ping_interval_seconds: 3 min_alive_size: 5 - # Uncomment this part to save logs to filename instead of stdout #log: # level: info @@ -48,4 +46,4 @@ controller: # max_backups: 10 # max_age: 7 # max_size: 100 -# compress: false \ No newline at end of file +# compress: false diff --git a/store/engine/consul/consul.go b/store/engine/consul/consul.go index f1223ae3..17262e16 100644 --- a/store/engine/consul/consul.go +++ b/store/engine/consul/consul.go @@ -162,6 +162,7 @@ func (c *Consul) IsReady(ctx context.Context) bool { } func (c *Consul) Get(ctx context.Context, key string) ([]byte, error) { + key = sanitizeKey(key) rsp, _, err := c.client.KV().Get(key, nil) if err != nil { return nil, err @@ -173,6 +174,7 @@ func (c *Consul) Get(ctx context.Context, key string) ([]byte, error) { } func (c *Consul) Exists(ctx context.Context, key string) (bool, error) { + key = sanitizeKey(key) _, err := c.Get(ctx, key) if err != nil { if errors.Is(err, consts.ErrNotFound) { @@ -184,6 +186,7 @@ func (c *Consul) Exists(ctx context.Context, key string) (bool, error) { } func (c *Consul) Set(ctx context.Context, key string, value []byte) error { + key = sanitizeKey(key) kvPair := &api.KVPair{ Key: key, Value: value, @@ -193,11 +196,13 @@ func (c *Consul) Set(ctx context.Context, key string, value []byte) error { } func (c *Consul) Delete(ctx context.Context, key string) error { + key = sanitizeKey(key) _, err := c.client.KV().Delete(key, nil) return err } func (c *Consul) List(ctx context.Context, prefix string) ([]engine.Entry, error) { + prefix = sanitizeKey(prefix) rsp, _, err := c.client.KV().List(prefix, nil) if err != nil { return nil, err @@ -209,7 +214,7 @@ func (c *Consul) List(ctx context.Context, prefix string) ([]engine.Entry, error if string(kv.Key) == prefix { continue } - key := strings.TrimLeft(string(kv.Key[prefixLen+1]), "/") + key := strings.TrimLeft(string(kv.Key[prefixLen+1:]), "/") if strings.ContainsRune(key, '/') { continue } @@ -236,7 +241,6 @@ func (c *Consul) electLoop() { TTL: fmt.Sprintf("%v", sessionTTL), LockDelay: lockDelay, }, nil) - if err != nil { logger.Get().With( zap.Error(err), @@ -312,3 +316,10 @@ func (c *Consul) Close() error { c.client = nil return nil } + +func sanitizeKey(key string) string { + if len(key) > 0 && key[0] == '/' { + key = strings.TrimPrefix(key, "/") + } + return key +} diff --git a/store/engine/consul/consul_test.go b/store/engine/consul/consul_test.go index 8a5b67c2..9fb0f6fc 100644 --- a/store/engine/consul/consul_test.go +++ b/store/engine/consul/consul_test.go @@ -47,7 +47,7 @@ func TestBasicOperations(t *testing.T) { }() ctx := context.Background() - keys := []string{"a/b/c0", "a/b/c1", "a/b/c2"} + keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"} value := []byte("v") for _, key := range keys { require.NoError(t, persist.Set(ctx, key, value)) @@ -55,7 +55,7 @@ func TestBasicOperations(t *testing.T) { require.NoError(t, err) require.Equal(t, value, gotValue) } - entries, err := persist.List(ctx, "a/b") + entries, err := persist.List(ctx, "/a/b") require.NoError(t, err) require.Equal(t, len(keys), len(entries)) for _, key := range keys {