Skip to content

Commit a1bb3e7

Browse files
committed
feat: database backend add ssdb support
Change-Id: I054c5fc9b02f613601781de8613d684faa0ea7f2
1 parent e90ac67 commit a1bb3e7

File tree

12 files changed

+860
-4
lines changed

12 files changed

+860
-4
lines changed

AUTHORS

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,3 +52,4 @@ List of contributors, in chronological order:
5252
* Steven Stone (https://github.com/smstone)
5353
* Josh Bayfield (https://github.com/jbayfield)
5454
* Boxjan (https://github.com/boxjan)
55+
* goldendeng (https://github.com/hudeng-go)

context/context.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,16 @@ package context
33

44
import (
55
gocontext "context"
6+
"errors"
67
"fmt"
78
"math/rand"
9+
"net/url"
810
"os"
911
"os/signal"
1012
"path/filepath"
1113
"runtime"
1214
"runtime/pprof"
15+
"strconv"
1316
"strings"
1417
"sync"
1518
"time"
@@ -19,6 +22,7 @@ import (
1922
"github.com/aptly-dev/aptly/console"
2023
"github.com/aptly-dev/aptly/database"
2124
"github.com/aptly-dev/aptly/database/goleveldb"
25+
"github.com/aptly-dev/aptly/database/ssdb"
2226
"github.com/aptly-dev/aptly/deb"
2327
"github.com/aptly-dev/aptly/files"
2428
"github.com/aptly-dev/aptly/http"
@@ -27,6 +31,7 @@ import (
2731
"github.com/aptly-dev/aptly/swift"
2832
"github.com/aptly-dev/aptly/task"
2933
"github.com/aptly-dev/aptly/utils"
34+
"github.com/seefan/gossdb/v2/conf"
3035
"github.com/smira/commander"
3136
"github.com/smira/flag"
3237
)
@@ -287,7 +292,32 @@ func (context *AptlyContext) _database() (database.Storage, error) {
287292
if context.database == nil {
288293
var err error
289294

290-
context.database, err = goleveldb.NewDB(context.dbPath())
295+
if context.config().DatabaseBackend.Type == "leveldb" {
296+
if context.config().DatabaseBackend.DbPath != "" {
297+
dbPath := filepath.Join(context.config().RootDir, context.config().DatabaseBackend.DbPath)
298+
context.database, err = goleveldb.NewDB(dbPath)
299+
} else {
300+
return nil, errors.New("leveldb databaseBackend config invalid")
301+
}
302+
} else if context.config().DatabaseBackend.Type == "ssdb" {
303+
var cfg conf.Config
304+
u, e := url.Parse(context.config().DatabaseBackend.URL)
305+
306+
if e != nil {
307+
return nil, e
308+
}
309+
cfg.Port, e = strconv.Atoi(u.Port())
310+
cfg.Host = strings.Split(u.Host, ":")[0]
311+
if e != nil {
312+
return nil, e
313+
}
314+
password, _ := u.User.Password()
315+
cfg.Password = password
316+
context.database, err = ssdb.NewOpenDB(&cfg)
317+
} else {
318+
context.database, err = goleveldb.NewDB(context.dbPath())
319+
}
320+
291321
if err != nil {
292322
return nil, fmt.Errorf("can't instantiate database: %s", err)
293323
}

database/ssdb/batch.go

Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package ssdb
2+
3+
import (
4+
"fmt"
5+
6+
"github.com/aptly-dev/aptly/database"
7+
"github.com/seefan/gossdb/v2/conf"
8+
"github.com/seefan/gossdb/v2/pool"
9+
)
10+
11+
const (
12+
delOpt = "del"
13+
)
14+
15+
type bWriteData struct {
16+
key []byte
17+
value []byte
18+
opts string
19+
err error
20+
}
21+
22+
type Batch struct {
23+
cfg *conf.Config
24+
// key-value chan
25+
w chan bWriteData
26+
p map[string]interface{}
27+
d []string
28+
db *pool.Client
29+
}
30+
31+
// func internalOpenBatch...
32+
func internalOpenBatch(t database.Storage) *Batch {
33+
b := &Batch{
34+
w: make(chan bWriteData),
35+
p: make(map[string]interface{}),
36+
}
37+
b.run()
38+
39+
return b
40+
}
41+
42+
func (b *Batch) run() {
43+
go func() {
44+
for {
45+
select {
46+
case w, ok := <-b.w:
47+
{
48+
if !ok {
49+
ssdbLog("ssdb batch write chan closed")
50+
return
51+
}
52+
53+
if w.opts == "write" {
54+
ssdbLog("ssdb batch write")
55+
var err error
56+
if len(b.p) > 0 && len(b.d) == 0 {
57+
err = b.db.MultiSet(b.p)
58+
ssdbLog("ssdb batch set errinfo: ", err)
59+
} else if len(b.d) > 0 && len(b.p) == 0 {
60+
err = b.db.MultiDel(b.d...)
61+
ssdbLog("ssdb batch del errinfo: ", err)
62+
} else if len(b.p) == 0 && len(b.d) == 0 {
63+
err = nil
64+
} else {
65+
err = fmt.Errorf("ssdb batch does not support both put and delete operations")
66+
}
67+
ssdbLog("ssdb batch write errinfo: ", err)
68+
b.w <- bWriteData{
69+
err: err,
70+
}
71+
ssdbLog("ssdb batch write end")
72+
} else {
73+
ssdbLog("ssdb batch", w.opts)
74+
if w.opts == "put" {
75+
b.p[string(w.key)] = w.value
76+
} else if w.opts == delOpt {
77+
b.d = append(b.d, string(w.key))
78+
}
79+
}
80+
}
81+
}
82+
}
83+
}()
84+
}
85+
86+
func (b *Batch) stop() {
87+
ssdbLog("ssdb batch stop")
88+
close(b.w)
89+
}
90+
91+
func (b *Batch) Put(key, value []byte) (err error) {
92+
// err = b.db.Set(string(key), string(value))
93+
w := bWriteData{
94+
key: key,
95+
value: value,
96+
opts: "put",
97+
}
98+
99+
b.w <- w
100+
return nil
101+
}
102+
103+
func (b *Batch) Delete(key []byte) (err error) {
104+
/* err = b.db.Del(string(key))
105+
return */
106+
w := bWriteData{
107+
key: key,
108+
opts: delOpt,
109+
}
110+
111+
b.w <- w
112+
return nil
113+
}
114+
115+
func (b *Batch) Write() (err error) {
116+
defer b.stop()
117+
w := bWriteData{
118+
opts: "write",
119+
}
120+
121+
b.w <- w
122+
result := <-b.w
123+
return result.err
124+
}
125+
126+
// batch should implement database.Batch
127+
var (
128+
_ database.Batch = &Batch{}
129+
)

database/ssdb/database.go

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package ssdb
2+
3+
import (
4+
"os"
5+
"strconv"
6+
7+
"github.com/aptly-dev/aptly/database"
8+
"github.com/seefan/gossdb/v2"
9+
"github.com/seefan/gossdb/v2/conf"
10+
"github.com/seefan/gossdb/v2/pool"
11+
)
12+
13+
var defaultBufSize = 102400
14+
var defaultPoolSize = 1
15+
16+
func internalOpen(cfg *conf.Config) (*pool.Client, error) {
17+
ssdbLog("internalOpen")
18+
19+
cfg.ReadBufferSize = defaultBufSize
20+
cfg.WriteBufferSize = defaultBufSize
21+
cfg.MaxPoolSize = defaultPoolSize
22+
cfg.PoolSize = defaultPoolSize
23+
cfg.MinPoolSize = defaultPoolSize
24+
cfg.MaxWaitSize = 100 * defaultPoolSize
25+
cfg.RetryEnabled = true
26+
27+
//override by env
28+
if os.Getenv("SSDB_READBUFFERSIZE") != "" {
29+
readBufSize, err := strconv.Atoi(os.Getenv("SSDB_READBUFFERSIZE"))
30+
if err != nil {
31+
cfg.ReadBufferSize = readBufSize
32+
}
33+
}
34+
35+
if os.Getenv("SSDB_WRITEBUFFERSIZE") != "" {
36+
writeBufSize, err := strconv.Atoi(os.Getenv("SSDB_WRITEBUFFERSIZE"))
37+
if err != nil {
38+
cfg.WriteBufferSize = writeBufSize
39+
}
40+
}
41+
42+
var cfgs = []*conf.Config{cfg}
43+
err := gossdb.Start(cfgs...)
44+
if err != nil {
45+
return nil, err
46+
}
47+
48+
return gossdb.NewClient()
49+
}
50+
51+
func NewDB(cfg *conf.Config) (database.Storage, error) {
52+
return &Storage{cfg: cfg}, nil
53+
}
54+
55+
func NewOpenDB(cfg *conf.Config) (database.Storage, error) {
56+
db, err := NewDB(cfg)
57+
if err != nil {
58+
return nil, err
59+
}
60+
61+
return db, db.Open()
62+
}

0 commit comments

Comments
 (0)