diff --git a/go.mod b/go.mod index 0bfdc299..b739a8f0 100644 --- a/go.mod +++ b/go.mod @@ -61,6 +61,7 @@ require ( github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.9 // indirect github.com/leodido/go-urn v1.4.0 // indirect + github.com/lib/pq v1.10.9 github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-runewidth v0.0.16 // indirect diff --git a/go.sum b/go.sum index 5548735e..696dbfeb 100644 --- a/go.sum +++ b/go.sum @@ -183,6 +183,11 @@ github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-colorable v0.1.4/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4= github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= diff --git a/scripts/docker/docker-compose.yml b/scripts/docker/docker-compose.yml index 7a7450ba..587aa5d4 100644 --- a/scripts/docker/docker-compose.yml +++ b/scripts/docker/docker-compose.yml @@ -76,3 +76,15 @@ services: ports: - "8500:8500" command: "agent -dev -client=0.0.0.0" + + postgres0: + build: ./pg-dockerfile + container_name: postgres0 + environment: + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + POSTGRES_DB: testdb + ports: + - "5432:5432" + volumes: + - ./pg-init-scripts:/docker-entrypoint-initdb.d diff --git a/scripts/docker/pg-dockerfile/Dockerfile b/scripts/docker/pg-dockerfile/Dockerfile new file mode 100644 index 00000000..bf9c45d1 --- /dev/null +++ b/scripts/docker/pg-dockerfile/Dockerfile @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +FROM postgres:16 + +RUN apt-get update && apt-get install -y \ + postgresql-server-dev-16 \ + build-essential \ + libpq-dev \ + wget + +RUN wget https://github.com/citusdata/pg_cron/archive/refs/tags/v1.6.5.tar.gz \ + && tar -xvzf v1.6.5.tar.gz \ + && cd pg_cron-1.6.5 \ + && make && make install \ + && cd .. && rm -rf v1.6.5.tar.gz pg_cron-1.6.5 + +RUN echo "shared_preload_libraries = 'pg_cron'" >> /usr/share/postgresql/postgresql.conf.sample \ + && echo "cron.database_name = 'testdb'" >> /usr/share/postgresql/postgresql.conf.sample \ No newline at end of file diff --git a/scripts/docker/pg-init-scripts/init.sql b/scripts/docker/pg-init-scripts/init.sql new file mode 100644 index 00000000..db518086 --- /dev/null +++ b/scripts/docker/pg-init-scripts/init.sql @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +CREATE TABLE locks ( + name TEXT PRIMARY KEY, + leaderID TEXT NOT NULL +); + +CREATE TABLE kv ( + key TEXT PRIMARY KEY, + value BYTEA +); + +CREATE OR REPLACE FUNCTION notify_changes() +RETURNS TRIGGER AS $$ +BEGIN + IF TG_OP = 'INSERT' THEN + PERFORM cron.schedule('delete_' || NEW.name, '6 seconds', FORMAT('DELETE FROM locks WHERE name = %L', NEW.name)); + PERFORM pg_notify('lock_change', 'INSERT:' || NEW.leaderID::text); + END IF; + + IF TG_OP = 'DELETE' THEN + PERFORM cron.unschedule('delete_' || OLD.name); + PERFORM pg_notify('lock_change', 'DELETE:' || OLD.leaderID::text); + END IF; + + RETURN NULL; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER lock_change_trigger +AFTER INSERT OR DELETE ON locks +FOR EACH ROW EXECUTE FUNCTION notify_changes(); + +CREATE EXTENSION IF NOT EXISTS pg_cron; \ No newline at end of file diff --git a/store/engine/postgresql/postgresql.go b/store/engine/postgresql/postgresql.go new file mode 100644 index 00000000..8caadd5f --- /dev/null +++ b/store/engine/postgresql/postgresql.go @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package postgresql + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/apache/kvrocks-controller/consts" + "github.com/apache/kvrocks-controller/logger" + "github.com/apache/kvrocks-controller/store/engine" + "github.com/lib/pq" + "go.uber.org/zap" +) + +const ( + // Need to modify the cron schedule timeout accordingly in init.sql before changing the lockTTL + lockTTL = 6 * time.Second + listenerMinReconnectInterval = 10 * time.Second + listenerMaxReconnectInterval = 1 * time.Minute + defaultElectPath = "/kvrocks/controller/leader" +) + +type Config struct { + Addrs []string `yaml:"addrs"` + Username string `yaml:"username"` + Password string `yaml:"password"` + DBName string `yaml:"db_name"` + NotifyChannel string `yaml:"notify_channel"` + ElectPath string `yaml:"elect_path"` +} + +type Postgresql struct { + db *sql.DB + listener *pq.Listener + + leaderMu sync.Mutex + leaderID string + myID string + electPath string + isReady atomic.Bool + + quitCh chan struct{} + wg sync.WaitGroup + lockReleaseCh chan bool + leaderChangeCh chan bool +} + +func New(id string, cfg *Config) (*Postgresql, error) { + if len(id) == 0 { + return nil, errors.New("id must NOT be a empty string") + } + + connStr := fmt.Sprintf("postgres://%s:%s@%s/%s?sslmode=disable", cfg.Username, cfg.Password, cfg.Addrs[0], cfg.DBName) + db, err := sql.Open("postgres", connStr) + if err != nil { + return nil, err + } + + listener := pq.NewListener(connStr, listenerMinReconnectInterval, listenerMaxReconnectInterval, nil) + err = listener.Listen(cfg.NotifyChannel) + if err != nil { + return nil, err + } + + electPath := defaultElectPath + if cfg.ElectPath != "" { + electPath = defaultElectPath + } + + p := &Postgresql{ + myID: id, + electPath: electPath, + db: db, + listener: listener, + quitCh: make(chan struct{}), + lockReleaseCh: make(chan bool), + leaderChangeCh: make(chan bool), + } + err = p.initLeaderId() + if err != nil { + return nil, err + } + p.isReady.Store(false) + p.wg.Add(2) + go p.electLoop() + go p.observeLeaderEvent() + return p, nil +} + +func (p *Postgresql) ID() string { + return p.myID +} + +func (p *Postgresql) Leader() string { + p.leaderMu.Lock() + defer p.leaderMu.Unlock() + return p.leaderID +} + +func (p *Postgresql) LeaderChange() <-chan bool { + return p.leaderChangeCh +} + +func (p *Postgresql) IsReady(ctx context.Context) bool { + for { + select { + case <-p.quitCh: + return false + case <-time.After(100 * time.Millisecond): + if p.isReady.Load() { + return true + } + case <-ctx.Done(): + return p.isReady.Load() + } + } +} + +func (p *Postgresql) Get(ctx context.Context, key string) ([]byte, error) { + var value []byte + query := "SELECT value FROM kv WHERE key = $1" + + row := p.db.QueryRow(query, key) + err := row.Scan(&value) + if errors.Is(err, sql.ErrNoRows) { + return nil, consts.ErrNotFound + } + if err != nil { + return nil, err + } + return value, nil +} + +func (p *Postgresql) Exists(ctx context.Context, key string) (bool, error) { + _, err := p.Get(ctx, key) + if err != nil { + if errors.Is(err, consts.ErrNotFound) { + return false, nil + } + return false, err + } + return true, nil +} + +func (p *Postgresql) Set(ctx context.Context, key string, value []byte) error { + query := "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value" + _, err := p.db.Exec(query, key, value) + return err +} + +func (p *Postgresql) Delete(ctx context.Context, key string) error { + query := "DELETE FROM kv WHERE key = $1" + _, err := p.db.Exec(query, key) + return err +} + +func (p *Postgresql) List(ctx context.Context, prefix string) ([]engine.Entry, error) { + prefixWithWildcard := prefix + "%" + query := "SELECT key, value from kv WHERE key LIKE $1" + rows, err := p.db.Query(query, prefixWithWildcard) + if err != nil { + return nil, err + } + defer rows.Close() + + prefixLen := len(prefix) + entries := make([]engine.Entry, 0) + for rows.Next() { + var key string + var value []byte + + err := rows.Scan(&key, &value) + if err != nil { + return nil, err + } + + if key == prefix { + continue + } + + key = strings.TrimLeft(key[prefixLen+1:], "/") + if strings.ContainsRune(key, '/') { + continue + } + entries = append(entries, engine.Entry{ + Key: key, + Value: value, + }) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return entries, nil +} + +func (p *Postgresql) electLoop() { + defer p.wg.Done() + for { + select { + case <-p.quitCh: + return + default: + } + + query := "INSERT INTO locks (name, leaderID) VALUES ($1, $2) ON CONFLICT DO NOTHING" + _, err := p.db.Exec(query, p.electPath, p.myID) + if err != nil { + time.Sleep(lockTTL / 3) + continue + } + + select { + case <-p.lockReleaseCh: + continue + case <-p.quitCh: + return + } + } +} + +func (p *Postgresql) observeLeaderEvent() { + defer p.wg.Done() + + for { + select { + case <-p.quitCh: + return + case notification := <-p.listener.Notify: + p.isReady.Store(true) + + data := strings.SplitN(notification.Extra, ":", 2) + if len(data) != 2 { + logger.Get().With( + zap.Error(fmt.Errorf("failed to parse notification data: expected two parts separated by a colon")), + ).Error("Failed to parse notification data") + } + + operation := data[0] + leaderID := data[1] + + if operation == "INSERT" { + p.leaderMu.Lock() + p.leaderID = leaderID + p.leaderMu.Unlock() + p.leaderChangeCh <- true + } else { + p.lockReleaseCh <- true + } + } + } +} + +func (p *Postgresql) initLeaderId() error { + var leaderId string + query := "SELECT leaderID FROM locks WHERE name = $1" + row := p.db.QueryRow(query, p.electPath) + err := row.Scan(&leaderId) + if errors.Is(err, sql.ErrNoRows) { + p.leaderID = "" + return nil + } + if err != nil { + return err + } + p.leaderID = leaderId + return nil +} + +func (p *Postgresql) Close() error { + close(p.quitCh) + p.wg.Wait() + p.listener.Close() + return p.db.Close() +} diff --git a/store/engine/postgresql/postgresql_test.go b/store/engine/postgresql/postgresql_test.go new file mode 100644 index 00000000..57617f44 --- /dev/null +++ b/store/engine/postgresql/postgresql_test.go @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package postgresql + +import ( + "context" + "testing" + "time" + + "github.com/apache/kvrocks-controller/util" + + "github.com/stretchr/testify/require" +) + +const ( + addr = "127.0.0.1:5432" + notifyChannel = "lock_change" + username = "postgres" + password = "postgres" + dbName = "testdb" +) + +func TestBasicOperations(t *testing.T) { + id := util.RandString(40) + testElectPath := util.RandString(32) + persist, err := New(id, &Config{ + Username: username, + Password: password, + DBName: dbName, + NotifyChannel: notifyChannel, + ElectPath: testElectPath, + Addrs: []string{addr}, + }) + require.NoError(t, err) + defer persist.Close() + go func() { + for range persist.LeaderChange() { + // do nothing + } + }() + + ctx := context.Background() + keys := []string{"/a/b/c0", "/a/b/c1", "/a/b/c2"} + value := []byte("v") + for _, key := range keys { + require.NoError(t, persist.Set(ctx, key, value)) + gotValue, err := persist.Get(ctx, key) + require.NoError(t, err) + require.Equal(t, value, gotValue) + } + entries, err := persist.List(ctx, "/a/b") + require.NoError(t, err) + require.Equal(t, len(keys), len(entries)) + for _, key := range keys { + require.NoError(t, persist.Delete(ctx, key)) + } +} + +func TestElect(t *testing.T) { + endpoints := []string{addr} + + testElectPath := util.RandString(32) + id0 := util.RandString(40) + node0, err := New(id0, &Config{ + Username: username, + Password: password, + DBName: dbName, + NotifyChannel: notifyChannel, + ElectPath: testElectPath, + Addrs: endpoints, + }) + require.NoError(t, err) + require.Eventuallyf(t, func() bool { + return node0.Leader() == node0.myID + }, 10*time.Second, 100*time.Millisecond, "node0 should be the leader") + + id1 := util.RandString(40) + node1, err := New(id1, &Config{ + Username: username, + Password: password, + DBName: dbName, + NotifyChannel: notifyChannel, + ElectPath: testElectPath, + Addrs: endpoints, + }) + require.NoError(t, err) + require.Eventuallyf(t, func() bool { + return node1.Leader() == node0.myID + }, 10*time.Second, 100*time.Millisecond, "node1's leader should be the node0") + + go func() { + for { + select { + case <-node0.LeaderChange(): + // do nothing + case <-node1.LeaderChange(): + // do nothing + } + } + }() + + require.NoError(t, node0.Close()) + + require.Eventuallyf(t, func() bool { + return node1.Leader() == node1.myID + }, 15*time.Second, 100*time.Millisecond, "node1 should be the leader") +}