From a1bb3e7f18ec0ea23d5cc64df8c35e90da46b88b Mon Sep 17 00:00:00 2001 From: hudeng Date: Thu, 22 Sep 2022 16:08:27 +0800 Subject: [PATCH 1/2] feat: database backend add ssdb support Change-Id: I054c5fc9b02f613601781de8613d684faa0ea7f2 --- AUTHORS | 1 + context/context.go | 32 ++++- database/ssdb/batch.go | 129 ++++++++++++++++++ database/ssdb/database.go | 62 +++++++++ database/ssdb/database_test.go | 233 +++++++++++++++++++++++++++++++++ database/ssdb/log.go | 12 ++ database/ssdb/storage.go | 183 ++++++++++++++++++++++++++ database/ssdb/transaction.go | 188 ++++++++++++++++++++++++++ go.mod | 3 + go.sum | 6 +- utils/config.go | 8 ++ utils/config_test.go | 7 +- 12 files changed, 860 insertions(+), 4 deletions(-) create mode 100644 database/ssdb/batch.go create mode 100644 database/ssdb/database.go create mode 100644 database/ssdb/database_test.go create mode 100644 database/ssdb/log.go create mode 100644 database/ssdb/storage.go create mode 100644 database/ssdb/transaction.go diff --git a/AUTHORS b/AUTHORS index e97af144bb..8ec83fff2f 100644 --- a/AUTHORS +++ b/AUTHORS @@ -52,3 +52,4 @@ List of contributors, in chronological order: * Steven Stone (https://github.com/smstone) * Josh Bayfield (https://github.com/jbayfield) * Boxjan (https://github.com/boxjan) +* goldendeng (https://github.com/hudeng-go) diff --git a/context/context.go b/context/context.go index d80528a544..647fefdfd6 100644 --- a/context/context.go +++ b/context/context.go @@ -3,13 +3,16 @@ package context import ( gocontext "context" + "errors" "fmt" "math/rand" + "net/url" "os" "os/signal" "path/filepath" "runtime" "runtime/pprof" + "strconv" "strings" "sync" "time" @@ -19,6 +22,7 @@ import ( "github.com/aptly-dev/aptly/console" "github.com/aptly-dev/aptly/database" "github.com/aptly-dev/aptly/database/goleveldb" + "github.com/aptly-dev/aptly/database/ssdb" "github.com/aptly-dev/aptly/deb" "github.com/aptly-dev/aptly/files" "github.com/aptly-dev/aptly/http" @@ -27,6 +31,7 @@ import ( "github.com/aptly-dev/aptly/swift" "github.com/aptly-dev/aptly/task" "github.com/aptly-dev/aptly/utils" + "github.com/seefan/gossdb/v2/conf" "github.com/smira/commander" "github.com/smira/flag" ) @@ -287,7 +292,32 @@ func (context *AptlyContext) _database() (database.Storage, error) { if context.database == nil { var err error - context.database, err = goleveldb.NewDB(context.dbPath()) + if context.config().DatabaseBackend.Type == "leveldb" { + if context.config().DatabaseBackend.DbPath != "" { + dbPath := filepath.Join(context.config().RootDir, context.config().DatabaseBackend.DbPath) + context.database, err = goleveldb.NewDB(dbPath) + } else { + return nil, errors.New("leveldb databaseBackend config invalid") + } + } else if context.config().DatabaseBackend.Type == "ssdb" { + var cfg conf.Config + u, e := url.Parse(context.config().DatabaseBackend.URL) + + if e != nil { + return nil, e + } + cfg.Port, e = strconv.Atoi(u.Port()) + cfg.Host = strings.Split(u.Host, ":")[0] + if e != nil { + return nil, e + } + password, _ := u.User.Password() + cfg.Password = password + context.database, err = ssdb.NewOpenDB(&cfg) + } else { + context.database, err = goleveldb.NewDB(context.dbPath()) + } + if err != nil { return nil, fmt.Errorf("can't instantiate database: %s", err) } diff --git a/database/ssdb/batch.go b/database/ssdb/batch.go new file mode 100644 index 0000000000..84f4a37a06 --- /dev/null +++ b/database/ssdb/batch.go @@ -0,0 +1,129 @@ +package ssdb + +import ( + "fmt" + + "github.com/aptly-dev/aptly/database" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +const ( + delOpt = "del" +) + +type bWriteData struct { + key []byte + value []byte + opts string + err error +} + +type Batch struct { + cfg *conf.Config + // key-value chan + w chan bWriteData + p map[string]interface{} + d []string + db *pool.Client +} + +// func internalOpenBatch... +func internalOpenBatch(t database.Storage) *Batch { + b := &Batch{ + w: make(chan bWriteData), + p: make(map[string]interface{}), + } + b.run() + + return b +} + +func (b *Batch) run() { + go func() { + for { + select { + case w, ok := <-b.w: + { + if !ok { + ssdbLog("ssdb batch write chan closed") + return + } + + if w.opts == "write" { + ssdbLog("ssdb batch write") + var err error + if len(b.p) > 0 && len(b.d) == 0 { + err = b.db.MultiSet(b.p) + ssdbLog("ssdb batch set errinfo: ", err) + } else if len(b.d) > 0 && len(b.p) == 0 { + err = b.db.MultiDel(b.d...) + ssdbLog("ssdb batch del errinfo: ", err) + } else if len(b.p) == 0 && len(b.d) == 0 { + err = nil + } else { + err = fmt.Errorf("ssdb batch does not support both put and delete operations") + } + ssdbLog("ssdb batch write errinfo: ", err) + b.w <- bWriteData{ + err: err, + } + ssdbLog("ssdb batch write end") + } else { + ssdbLog("ssdb batch", w.opts) + if w.opts == "put" { + b.p[string(w.key)] = w.value + } else if w.opts == delOpt { + b.d = append(b.d, string(w.key)) + } + } + } + } + } + }() +} + +func (b *Batch) stop() { + ssdbLog("ssdb batch stop") + close(b.w) +} + +func (b *Batch) Put(key, value []byte) (err error) { + // err = b.db.Set(string(key), string(value)) + w := bWriteData{ + key: key, + value: value, + opts: "put", + } + + b.w <- w + return nil +} + +func (b *Batch) Delete(key []byte) (err error) { + /* err = b.db.Del(string(key)) + return */ + w := bWriteData{ + key: key, + opts: delOpt, + } + + b.w <- w + return nil +} + +func (b *Batch) Write() (err error) { + defer b.stop() + w := bWriteData{ + opts: "write", + } + + b.w <- w + result := <-b.w + return result.err +} + +// batch should implement database.Batch +var ( + _ database.Batch = &Batch{} +) diff --git a/database/ssdb/database.go b/database/ssdb/database.go new file mode 100644 index 0000000000..1710333f2d --- /dev/null +++ b/database/ssdb/database.go @@ -0,0 +1,62 @@ +package ssdb + +import ( + "os" + "strconv" + + "github.com/aptly-dev/aptly/database" + "github.com/seefan/gossdb/v2" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +var defaultBufSize = 102400 +var defaultPoolSize = 1 + +func internalOpen(cfg *conf.Config) (*pool.Client, error) { + ssdbLog("internalOpen") + + cfg.ReadBufferSize = defaultBufSize + cfg.WriteBufferSize = defaultBufSize + cfg.MaxPoolSize = defaultPoolSize + cfg.PoolSize = defaultPoolSize + cfg.MinPoolSize = defaultPoolSize + cfg.MaxWaitSize = 100 * defaultPoolSize + cfg.RetryEnabled = true + + //override by env + if os.Getenv("SSDB_READBUFFERSIZE") != "" { + readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE")) + if err != nil { + cfg.ReadBufferSize = readBufSize + } + } + + if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" { + writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE")) + if err != nil { + cfg.WriteBufferSize = writeBufSize + } + } + + var cfgs = []*conf.Config{cfg} + err := gossdb.Start(cfgs...) + if err != nil { + return nil, err + } + + return gossdb.NewClient() +} + +func NewDB(cfg *conf.Config) (database.Storage, error) { + return &Storage{cfg: cfg}, nil +} + +func NewOpenDB(cfg *conf.Config) (database.Storage, error) { + db, err := NewDB(cfg) + if err != nil { + return nil, err + } + + return db, db.Open() +} diff --git a/database/ssdb/database_test.go b/database/ssdb/database_test.go new file mode 100644 index 0000000000..d6257015b0 --- /dev/null +++ b/database/ssdb/database_test.go @@ -0,0 +1,233 @@ +package ssdb_test + +import ( + "fmt" + "testing" + + "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/ssdb" + "github.com/seefan/gossdb/v2/conf" + . "gopkg.in/check.v1" +) + +// Launch gocheck tests +func Test(t *testing.T) { + TestingT(t) +} + +type SSDBSuite struct { + cfg *conf.Config + db database.Storage +} + +var _ = Suite(&SSDBSuite{cfg: &conf.Config{ + Host: "127.0.0.1", + Port: 8888, +}}) + +func (s *SSDBSuite) SetUpTest(c *C) { + var err error + s.db, err = ssdb.NewOpenDB(s.cfg) + c.Assert(err, IsNil) +} + +func (s *SSDBSuite) TestSetUpTest(c *C) { + var err error + s.db, err = ssdb.NewOpenDB(s.cfg) + c.Assert(err, IsNil) +} + +func (s *SSDBSuite) TestGetPut(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + var err error + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + result, err := s.db.Get(key) + c.Assert(err, IsNil) + c.Assert(result, DeepEquals, value) +} + +func (s *SSDBSuite) TestTemporaryDelete(c *C) { + fmt.Println("TestTemporaryDelete") + var ( + key = []byte("key") + value = []byte("value") + ) + + temp, err := s.db.CreateTemporary() + c.Assert(err, IsNil) + + c.Check(temp.HasPrefix([]byte(nil)), Equals, false) + + err = temp.Put(key, value) + c.Assert(err, IsNil) + c.Check(temp.HasPrefix([]byte(nil)), Equals, true) + + c.Assert(temp.Close(), IsNil) + c.Assert(temp.Drop(), IsNil) +} + +func (s *SSDBSuite) TestDelete(c *C) { + var ( + key = []byte("key") + value = []byte("value") + ) + + err := s.db.Put(key, value) + c.Assert(err, IsNil) + + _, err = s.db.Get(key) + c.Assert(err, IsNil) + + err = s.db.Delete(key) + c.Assert(err, IsNil) + +} + +func (s *SSDBSuite) TestByPrefix(c *C) { + //c.Check(s.db.FetchByPrefix([]byte{0x80}), DeepEquals, [][]byte{}) + + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + s.db.Put([]byte{0x80, 0x03}, []byte{0x03}) + s.db.Put([]byte{0x80, 0x02}, []byte{0x02}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + s.db.Put([]byte{0x90, 0x01}, []byte{0x04}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + s.db.Put([]byte{0x00, 0x01}, []byte{0x05}) + c.Check(len(s.db.FetchByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(s.db.KeysByPrefix([]byte{0x80})), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + keys := [][]byte{} + values := [][]byte{} + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + keys = append(keys, append([]byte(nil), k...)) + values = append(values, append([]byte(nil), v...)) + return nil + }), IsNil) + + c.Check(len(values), DeepEquals, len([][]byte{{0x01}, {0x02}, {0x03}})) + c.Check(len(keys), DeepEquals, len([][]byte{{0x80, 0x01}, {0x80, 0x02}, {0x80, 0x03}})) + + c.Check(s.db.ProcessByPrefix([]byte{0x80}, func(k, v []byte) error { + return database.ErrNotFound + }), Equals, database.ErrNotFound) + + c.Check(s.db.ProcessByPrefix([]byte{0xa0}, func(k, v []byte) error { + return database.ErrNotFound + }), IsNil) + + c.Check(s.db.FetchByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) + c.Check(s.db.KeysByPrefix([]byte{0xa0}), DeepEquals, [][]byte{}) +} + +func (s *SSDBSuite) TestHasPrefix(c *C) { + s.db.Put([]byte{0x80, 0x01}, []byte{0x01}) + + //c.Check(s.db.HasPrefix([]byte("")), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x80}), Equals, true) + c.Check(s.db.HasPrefix([]byte{0x79}), Equals, false) +} + +func (s *SSDBSuite) TestTransactionCommit(c *C) { + var ( + key = []byte("key") + key2 = []byte("key2") + value = []byte("value") + value2 = []byte("value2") + ) + s.db.Delete(key) + s.db.Delete(key2) + transaction, err := s.db.OpenTransaction() + c.Assert(err, IsNil) + defer transaction.Discard() + + err = s.db.Put(key, value) + c.Assert(err, IsNil) + + v, err := s.db.Get(key) + c.Assert(err, IsNil) + c.Check(v, DeepEquals, value) + + err = transaction.Put(key2, value2) + c.Assert(err, IsNil) + v, err = transaction.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + _, err = s.db.Get(key2) + c.Assert(err, ErrorMatches, "key not found") + + err = transaction.Delete(key) + c.Assert(err, IsNil) + _, err = transaction.Get(key) + c.Assert(err, ErrorMatches, "key not found") + v, err = s.db.Get(key) + c.Assert(err, IsNil) + c.Check(v, DeepEquals, value) + + err = transaction.Commit() + c.Check(err, IsNil) + + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + _, err = s.db.Get(key) + c.Assert(err, ErrorMatches, "key not found") +} + +func (s *SSDBSuite) TestBatch(c *C) { + var ( + key = []byte("bkey") + key2 = []byte("bkey2") + value = []byte("bvalue") + value2 = []byte("bvalue2") + ) + + err := s.db.Put(key, value) + c.Check(err, IsNil) + + batch := s.db.CreateBatch() + batch.Put(key2, value2) + v, err := s.db.Get(key) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value) + _, err = s.db.Get(key2) + c.Check(err, ErrorMatches, "key not found") + + err = batch.Write() + c.Check(err, IsNil) + + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + batch = s.db.CreateBatch() + batch.Delete(key) + batch.Delete(key2) + c.Check(err, IsNil) + v, err = s.db.Get(key) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value) + c.Check(err, IsNil) + v, err = s.db.Get(key2) + c.Check(err, IsNil) + c.Check(v, DeepEquals, value2) + + err = batch.Write() + c.Check(err, IsNil) + + _, err = s.db.Get(key2) + c.Check(err, ErrorMatches, "key not found") + _, err = s.db.Get(key) + c.Check(err, ErrorMatches, "key not found") +} diff --git a/database/ssdb/log.go b/database/ssdb/log.go new file mode 100644 index 0000000000..ded044275f --- /dev/null +++ b/database/ssdb/log.go @@ -0,0 +1,12 @@ +package ssdb + +import ( + "fmt" + "os" +) + +func ssdbLog(a ...interface{}) { + if os.Getenv("SSDB_DEBUG") != "" { + fmt.Println(a...) + } +} diff --git a/database/ssdb/storage.go b/database/ssdb/storage.go new file mode 100644 index 0000000000..76b8fb3a63 --- /dev/null +++ b/database/ssdb/storage.go @@ -0,0 +1,183 @@ +package ssdb + +import ( + "os" + + "github.com/aptly-dev/aptly/database" + "github.com/aptly-dev/aptly/database/goleveldb" + "github.com/seefan/gossdb/v2" + "github.com/seefan/gossdb/v2/conf" + "github.com/seefan/gossdb/v2/pool" +) + +type Storage struct { + cfg *conf.Config + db *pool.Client +} + +// CreateTemporary creates new DB of the same type in temp dir +func (s *Storage) CreateTemporary() (database.Storage, error) { + // use leveldb as temp db + tmpPath := os.Getenv("SSDB_TMPDB_PATH") + if tmpPath == "" { + tmpPath = "/tmp/ssdb_tmpdb_path" + } + gdb, err := goleveldb.NewDB(tmpPath) + if err != nil { + return nil, err + } + + return gdb.CreateTemporary() +} + +// Get key value from ssdb +func (s *Storage) Get(key []byte) (value []byte, err error) { + // ssdbLog("ssdb origin db get key:", string(key)) + getResp, err := s.db.Get(string(key)) + if err != nil { + return + } + + value = getResp.Bytes() + + if len(value) == 0 { + err = database.ErrNotFound + return + } + return +} + +// Put saves key to ssdb, if key has the same value in DB already, it is not saved +func (s *Storage) Put(key []byte, value []byte) (err error) { + //ssdbLog("ssdb origin db put key:", string(key), " value: ", string(value)) + err = s.db.Set(string(key), value) + if err != nil { + return + } + return +} + +// Delete removes key from ssdb +func (s *Storage) Delete(key []byte) (err error) { + //ssdbLog("ssdb origin db del key:", string(key)) + err = s.db.Del(string(key)) + if err != nil { + return + } + return +} + +// KeysByPrefix returns all keys that start with prefix +func (s *Storage) KeysByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0) + getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1) + if err != nil { + return nil + } + for _, ev := range getResp { + key := []byte(ev) + keyc := make([]byte, len(key)) + copy(keyc, key) + result = append(result, key) + } + return result +} + +// FetchByPrefix returns all values with keys that start with prefix +func (s *Storage) FetchByPrefix(prefix []byte) [][]byte { + result := make([][]byte, 0) + getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1) + if err != nil { + return nil + } + for _, ev := range getResp { + value := ev.Bytes() + valuec := make([]byte, len(value)) + copy(valuec, value) + result = append(result, valuec) + } + return result +} + +// HasPrefix checks whether it can find any key with given prefix and returns true if one exists +func (s *Storage) HasPrefix(prefix []byte) bool { + //ssdbLog("HasPrefix", string(prefix), string(prefix)+"}") + getResp, err := s.db.Keys(string(prefix), string(prefix)+"}", -1) + if err != nil { + return false + } + //ssdbLog("HasPrefix", len(getResp)) + if len(getResp) > 0 { + return true + } + return false +} + +// ProcessByPrefix iterates through all entries where key starts with prefix and calls +// StorageProcessor on key value pair +func (s *Storage) ProcessByPrefix(prefix []byte, proc database.StorageProcessor) error { + getResp, err := s.db.Scan(string(prefix), string(prefix)+"}", -1) + if err != nil { + return err + } + + for k, v := range getResp { + err := proc([]byte(k), v.Bytes()) + if err != nil { + return err + } + } + return nil +} + +// Close finishes ssdb connect +func (s *Storage) Close() error { + ssdbLog("ssdb close") + if s.db != nil { + s.db.Close() + s.db = nil + } + gossdb.Shutdown() + return nil +} + +// Reopen tries to open (re-open) the database +func (s *Storage) Open() error { + ssdbLog("ssdb open") + if s.db != nil && s.db.IsOpen() { + ssdbLog("ssdb opened") + return nil + } + + var err error + s.db, err = internalOpen(s.cfg) + return err +} + +// CreateBatch creates a Batch object +func (s *Storage) CreateBatch() database.Batch { + Batch := internalOpenBatch(s) + Batch.cfg = s.cfg + Batch.db = s.db + return Batch +} + +// OpenTransaction creates new transaction. +func (s *Storage) OpenTransaction() (database.Transaction, error) { + return internalOpenTransaction(s) +} + +// CompactDB compacts database by merging layers +func (s *Storage) CompactDB() error { + return nil +} + +// Drop removes all the ssdb files (DANGEROUS!) +func (s *Storage) Drop() error { + return nil +} + +// Check interface +var ( + _ database.Storage = &Storage{} +) diff --git a/database/ssdb/transaction.go b/database/ssdb/transaction.go new file mode 100644 index 0000000000..a6518f3861 --- /dev/null +++ b/database/ssdb/transaction.go @@ -0,0 +1,188 @@ +package ssdb + +import ( + "fmt" + + "github.com/aptly-dev/aptly/database" +) + +type trWriteData struct { + key []byte + value []byte + opts string + err error +} + +type trReadData struct { + kv []byte + err error +} + +type transaction struct { + // for key-value-operation chan + w chan trWriteData + // key read chan + r chan trReadData + q map[string]trWriteData + t database.Storage +} + +// func internalOpenTransaction... +func internalOpenTransaction(t database.Storage) (*transaction, error) { + tr := &transaction{ + w: make(chan trWriteData), + r: make(chan trReadData), + q: make(map[string]trWriteData), + t: t, + } + + return tr, tr.run() +} + +// func run... +func (t *transaction) run() error { + go func() { + for { + select { + case w, ok := <-t.w: + { + if !ok { + ssdbLog("ssdb transaction write chan closed") + return + } + + if w.opts == "commit" { + ssdbLog("ssdb transaction commit") + var errs []error + for _, vo := range t.q { + if vo.opts == "put" { + err := t.t.Put(vo.key, vo.value) + if err != nil { + //ssdbLog(err) + errs = append(errs, err) + } + } + + if vo.opts == delOpt { + err := t.t.Delete(vo.key) + if err != nil { + errs = append(errs, err) + } + } + } + if len(errs) == 0 { + t.w <- trWriteData{ + err: nil, + } + } else { + t.w <- trWriteData{ + err: fmt.Errorf("ssdb transaction write errs: %v", errs), + } + } + ssdbLog("ssdb transaction commit end") + } else { + ssdbLog("ssdb transaction", w.opts) + //ssdbLog("ssdb r transaction", w.opts, "key: ", string(w.key), "value: ", string(w.value)) + t.q[string(w.key)] = w + } + } + case r, ok := <-t.r: + { + if !ok { + ssdbLog("ssdb transaction read chan closed") + return + } + + if rData, ok := t.q[string(r.kv)]; ok { + if rData.opts == delOpt { + // del return not found error + t.r <- trReadData{ + kv: nil, + err: database.ErrNotFound, + } + } else { + t.r <- trReadData{ + kv: rData.value, + err: nil, + } + } + } else { + v, err := t.t.Get(r.kv) + t.r <- trReadData{ + kv: v, + err: err, + } + } + } + } + } + }() + + return nil +} + +// Get implements database.Reader interface. +func (t *transaction) Get(key []byte) ([]byte, error) { + keyc := make([]byte, len(key)) + copy(keyc, key) + r := trReadData{ + kv: keyc, + err: nil, + } + t.r <- r + result := <-t.r + return result.kv, result.err +} + +// Put implements database.Writer interface. +func (t *transaction) Put(key, value []byte) error { + //ssdbLog("golf*********************ssdb put") + //ssdbLog("ssdb transaction db put key:", string(key), " value: ", string(value)) + keyc := make([]byte, len(key)) + copy(keyc, key) + valuec := make([]byte, len(value)) + copy(valuec, value) + w := trWriteData{ + key: keyc, + value: valuec, + opts: "put", + } + + t.w <- w + return nil +} + +// Delete implements database.Writer interface. +func (t *transaction) Delete(key []byte) error { + //return t.t.Delete(key) + //ssdbLog("golf*********************ssdb del") + keyc := make([]byte, len(key)) + copy(keyc, key) + w := trWriteData{ + key: keyc, + opts: delOpt, + } + + t.w <- w + return nil +} + +func (t *transaction) Commit() error { + w := trWriteData{ + opts: "commit", + } + + t.w <- w + result := <-t.w + return result.err +} + +// Discard is safe to call after Commit(), it would be no-op +func (t *transaction) Discard() { + ssdbLog("ssdb transaction stop") + close(t.r) + close(t.w) +} + +// transaction should implement database.Transaction +var _ database.Transaction = &transaction{} diff --git a/go.mod b/go.mod index e552313488..81ee316ec8 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/aptly-dev/aptly go 1.16 +replace github.com/seefan/gossdb/v2 => github.com/hudeng-go/gossdb/v2 v2.0.1-0.20220523082930-c7fda08f33e2 + require ( github.com/AlekSi/pointer v1.0.0 github.com/Azure/azure-storage-blob-go v0.15.0 @@ -28,6 +30,7 @@ require ( github.com/pborman/uuid v0.0.0-20180122190007-c65b2f87fee3 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.12.1 + github.com/seefan/gossdb/v2 v2.0.0 github.com/smartystreets/gunit v1.0.4 // indirect github.com/smira/commander v0.0.0-20140515201010-f408b00e68d5 github.com/smira/flag v0.0.0-20170926215700-695ea5e84e76 diff --git a/go.sum b/go.sum index 6c9a66b67f..d7d07d467f 100644 --- a/go.sum +++ b/go.sum @@ -43,7 +43,6 @@ github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZ github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M= github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw= github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74= -github.com/Azure/go-autorest/autorest/mocks v0.4.1 h1:K0laFcLE6VLTOwNgSxaGbUcLPuGXlNkbVvq4cW4nIHk= github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k= github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg= github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8= @@ -91,7 +90,6 @@ github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= -github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= @@ -182,6 +180,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/hudeng-go/gossdb/v2 v2.0.1-0.20220523082930-c7fda08f33e2 h1:kYWIESmn3yHaGVyYcIGxFdVxhFNlAkol02xnRjQU3yU= +github.com/hudeng-go/gossdb/v2 v2.0.1-0.20220523082930-c7fda08f33e2/go.mod h1:YBb10QYLQSC0/14z1Y5jam9u7qKhhzTsxwdjTvzFEW0= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/jlaffaye/ftp v0.0.0-20180404123514-2403248fa8cc h1:lWFup/SOhwcpvRJIFqx/WQis5U4SrOSyWfSqvfdF09w= github.com/jlaffaye/ftp v0.0.0-20180404123514-2403248fa8cc/go.mod h1:lli8NYPQOFy3O++YmYbqVgOcQ1JPCwdOy+5zSjKJ9qY= @@ -288,6 +288,8 @@ github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFR github.com/rogpeppe/go-internal v1.6.1/go.mod h1:xXDCJY+GAPziupqXw64V24skbSoqbTEfhy4qGm1nDQc= github.com/rogpeppe/go-internal v1.8.0 h1:FCbCCtXNOY3UtUuHUYaghJg4y7Fd14rXifAYUAtL9R8= github.com/rogpeppe/go-internal v1.8.0/go.mod h1:WmiCO8CzOY8rg0OYDC4/i/2WRWAB6poM+XZ2dLUbcbE= +github.com/seefan/goerr v1.1.2 h1:rLUrQeJY1FRYd2WIsZDr7mr0F58gKPSnCvtBjj00b3Q= +github.com/seefan/goerr v1.1.2/go.mod h1:gipDsSn2T2Jwf0q9bl6K0CGyhvfNZiI8/Bi0MfsS258= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= diff --git a/utils/config.go b/utils/config.go index e2d2d93edb..454aa8ff8f 100644 --- a/utils/config.go +++ b/utils/config.go @@ -35,6 +35,14 @@ type ConfigStructure struct { // nolint: maligned AzurePublishRoots map[string]AzurePublishRoot `json:"AzurePublishEndpoints"` AsyncAPI bool `json:"AsyncAPI"` EnableMetricsEndpoint bool `json:"enableMetricsEndpoint"` + DatabaseBackend DBConfig `json:"databaseBackend"` +} + +// DBConfig +type DBConfig struct { + Type string `json:"type"` + URL string `json:"url"` + DbPath string `json:"dbPath"` } // FileSystemPublishRoot describes single filesystem publishing entry point diff --git a/utils/config_test.go b/utils/config_test.go index 4e10fab785..6912eec336 100644 --- a/utils/config_test.go +++ b/utils/config_test.go @@ -131,7 +131,12 @@ func (s *ConfigSuite) TestSaveConfig(c *C) { " }\n"+ " },\n"+ " \"AsyncAPI\": false,\n"+ - " \"enableMetricsEndpoint\": false\n"+ + " \"enableMetricsEndpoint\": false,\n"+ + " \"databaseBackend\": {\n"+ + " \"type\": \"\",\n"+ + " \"url\": \"\",\n"+ + " \"dbPath\": \"\"\n"+ + " }\n"+ "}") } From 9a668fd5dfff80510f07fcce61fece8e4fef6162 Mon Sep 17 00:00:00 2001 From: hudeng Date: Wed, 12 Oct 2022 09:54:22 +0800 Subject: [PATCH 2/2] fix: Add ssdb environment preparation Change-Id: I8534e66786021ce4384c92a2a8d14aa50839a4da --- database/ssdb/database_test.go | 41 +++++++++++++++++++++++++ system/t02_config/ConfigShowTest_gold | 7 ++++- system/t02_config/CreateConfigTest_gold | 7 ++++- 3 files changed, 53 insertions(+), 2 deletions(-) diff --git a/database/ssdb/database_test.go b/database/ssdb/database_test.go index d6257015b0..139432bfd9 100644 --- a/database/ssdb/database_test.go +++ b/database/ssdb/database_test.go @@ -2,6 +2,9 @@ package ssdb_test import ( "fmt" + "io/ioutil" + "os" + "os/exec" "testing" "github.com/aptly-dev/aptly/database" @@ -15,6 +18,44 @@ func Test(t *testing.T) { TestingT(t) } +func setUpSsdb() error { + setUpStr := ` + #!/bin/bash + if [ ! -e /tmp/ssdb-master/ssdb-master ]; then + mkdir -p /tmp/ssdb-master + wget --no-check-certificate https://github.com/ideawu/ssdb/archive/master.zip -O /tmp/ssdb-master/master.zip + cd /tmp/ssdb-master && unzip master && cd ssdb-master && make all + fi + cd /tmp/ssdb-master/ssdb-master && ./ssdb-server -d ssdb.conf -s restart + sleep 2` + + tmpShell, err := ioutil.TempFile("/tmp", "ssdbSetup") + if err != nil { + return err + } + defer os.Remove(tmpShell.Name()) + + _, err = tmpShell.WriteString(setUpStr) + if err != nil { + return err + } + + cmd := exec.Command("/bin/bash", tmpShell.Name()) + fmt.Println(cmd.String()) + output, err := cmd.Output() + fmt.Println(string(output)) + if err != nil { + return err + } + + return nil +} + +func TestMain(m *testing.M) { + setUpSsdb() + m.Run() +} + type SSDBSuite struct { cfg *conf.Config db database.Storage diff --git a/system/t02_config/ConfigShowTest_gold b/system/t02_config/ConfigShowTest_gold index 57b42b6aab..05ecd23918 100644 --- a/system/t02_config/ConfigShowTest_gold +++ b/system/t02_config/ConfigShowTest_gold @@ -25,5 +25,10 @@ "SwiftPublishEndpoints": {}, "AzurePublishEndpoints": {}, "AsyncAPI": false, - "enableMetricsEndpoint": true + "enableMetricsEndpoint": true, + "databaseBackend": { + "type": "", + "url": "", + "dbPath": "" + } } diff --git a/system/t02_config/CreateConfigTest_gold b/system/t02_config/CreateConfigTest_gold index 7d1a23e161..a43a559e21 100644 --- a/system/t02_config/CreateConfigTest_gold +++ b/system/t02_config/CreateConfigTest_gold @@ -25,5 +25,10 @@ "SwiftPublishEndpoints": {}, "AzurePublishEndpoints": {}, "AsyncAPI": false, - "enableMetricsEndpoint": false + "enableMetricsEndpoint": false, + "databaseBackend": { + "type": "", + "url": "", + "dbPath": "" + } } \ No newline at end of file