Skip to content

Commit a8ed239

Browse files
authored
Merge pull request #39 from rande/improve_debug_ouput
Improve debug ouput
2 parents 8c4b3cd + 992e91c commit a8ed239

File tree

16 files changed

+698
-169
lines changed

16 files changed

+698
-169
lines changed

app.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,15 @@ func GetApp(conf *Config, l *goapp.Lifecycle) (*goapp.App, error) {
4141
return conf
4242
})
4343

44+
app.Set("bolt.compacter", func(app *goapp.App) interface{} {
45+
logger := app.Get("logger").(*log.Logger)
46+
47+
return &BoltCompacter{
48+
Logger: logger,
49+
TxMaxSize: 65536,
50+
}
51+
})
52+
4453
app.Set("mux", func(app *goapp.App) interface{} {
4554
m := goji.NewMux()
4655

bolt.go

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,239 @@
1+
package pkgmirror
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"os"
7+
"time"
8+
9+
log "github.com/Sirupsen/logrus"
10+
"github.com/boltdb/bolt"
11+
)
12+
13+
func OpenDatabaseWithBucket(basePath string, bucket []byte) (db *bolt.DB, err error) {
14+
if err = os.MkdirAll(basePath, 0755); err != nil {
15+
return
16+
}
17+
18+
path := fmt.Sprintf("%s/%s.db", basePath, bucket)
19+
20+
db, err = bolt.Open(path, 0600, &bolt.Options{
21+
Timeout: 1 * time.Second,
22+
ReadOnly: false,
23+
})
24+
25+
if err != nil {
26+
return
27+
}
28+
29+
err = db.Update(func(tx *bolt.Tx) error {
30+
_, err := tx.CreateBucketIfNotExists(bucket)
31+
32+
return err
33+
})
34+
35+
return
36+
}
37+
38+
// adapted from : https://github.com/boltdb/bolt/blob/master/cmd/bolt/main.go
39+
40+
var (
41+
42+
// ErrPathRequired is returned when the path to a Bolt database is not specified.
43+
ErrPathRequired = errors.New("path required")
44+
45+
// ErrFileNotFound is returned when a Bolt database does not exist.
46+
ErrFileNotFound = errors.New("file not found")
47+
48+
ErrUnableToCloseDatabase = errors.New("Unable to close the database")
49+
)
50+
51+
type BoltCompacter struct {
52+
TxMaxSize int64
53+
Logger *log.Logger
54+
}
55+
56+
// Run executes the command.
57+
func (bc *BoltCompacter) Compact(srcPath string) (err error) {
58+
now := time.Now()
59+
bckPath := fmt.Sprintf("%s.%d-%d-%d-%d.backup", srcPath, now.Year(), now.Month(), now.Day(), now.Unix())
60+
dstPath := fmt.Sprintf("%s.%d-%d-%d-%d.compacted", srcPath, now.Year(), now.Month(), now.Day(), now.Unix())
61+
62+
logger := bc.Logger.WithFields(log.Fields{
63+
"method": "compacter",
64+
"srcPath": srcPath,
65+
"dstPath": dstPath,
66+
})
67+
68+
// Require database paths.
69+
if srcPath == "" {
70+
logger.Error("srcPath is not defined")
71+
72+
return ErrPathRequired
73+
}
74+
75+
// Ensure source file exists.
76+
fi, err := os.Stat(srcPath)
77+
if os.IsNotExist(err) {
78+
logger.Error("srcPath does not exist")
79+
80+
return ErrFileNotFound
81+
} else if err != nil {
82+
return err
83+
}
84+
initialSize := fi.Size()
85+
86+
// Open source database.
87+
src, err := bolt.Open(srcPath, 0444, nil)
88+
if err != nil {
89+
logger.Error("unable to open the src database")
90+
return err
91+
}
92+
defer src.Close()
93+
94+
// Open destination database.
95+
dst, err := bolt.Open(dstPath, fi.Mode(), nil)
96+
if err != nil {
97+
logger.Error("unable to open the dst database")
98+
return err
99+
}
100+
defer dst.Close()
101+
102+
// Run compaction.
103+
if err := bc.compact(dst, src); err != nil {
104+
logger.Error("unable to compact the database")
105+
return err
106+
}
107+
108+
// Report stats on new size.
109+
fi, err = os.Stat(dstPath)
110+
if err != nil {
111+
logger.Error("unable to get the stat on the compacted database")
112+
return err
113+
} else if fi.Size() == 0 {
114+
return fmt.Errorf("zero db size")
115+
}
116+
117+
logger.WithFields(log.Fields{
118+
"stat": fmt.Sprintf("%d -> %d bytes (gain=%.2fx)\n", initialSize, fi.Size(), float64(initialSize)/float64(fi.Size())),
119+
}).Info("Compact action ended!")
120+
121+
// mv srcPath => bckPath
122+
if err := os.Rename(srcPath, bckPath); err != nil {
123+
return err
124+
}
125+
126+
if err := os.Rename(dstPath, srcPath); err != nil {
127+
return err
128+
}
129+
130+
// delete backup
131+
os.Remove(bckPath)
132+
133+
return nil
134+
}
135+
136+
func (bc *BoltCompacter) compact(dst, src *bolt.DB) error {
137+
// commit regularly, or we'll run out of memory for large datasets if using one transaction.
138+
var size int64
139+
tx, err := dst.Begin(true)
140+
if err != nil {
141+
return err
142+
}
143+
defer tx.Rollback()
144+
145+
if err := bc.walk(src, func(keys [][]byte, k, v []byte, seq uint64) error {
146+
// On each key/value, check if we have exceeded tx size.
147+
sz := int64(len(k) + len(v))
148+
if size+sz > bc.TxMaxSize && bc.TxMaxSize != 0 {
149+
// Commit previous transaction.
150+
if err := tx.Commit(); err != nil {
151+
return err
152+
}
153+
154+
// Start new transaction.
155+
tx, err = dst.Begin(true)
156+
if err != nil {
157+
return err
158+
}
159+
size = 0
160+
}
161+
size += sz
162+
163+
// Create bucket on the root transaction if this is the first level.
164+
nk := len(keys)
165+
if nk == 0 {
166+
bkt, err := tx.CreateBucket(k)
167+
if err != nil {
168+
return err
169+
}
170+
if err := bkt.SetSequence(seq); err != nil {
171+
return err
172+
}
173+
return nil
174+
}
175+
176+
// Create buckets on subsequent levels, if necessary.
177+
b := tx.Bucket(keys[0])
178+
if nk > 1 {
179+
for _, k := range keys[1:] {
180+
b = b.Bucket(k)
181+
}
182+
}
183+
184+
// If there is no value then this is a bucket call.
185+
if v == nil {
186+
bkt, err := b.CreateBucket(k)
187+
if err != nil {
188+
return err
189+
}
190+
if err := bkt.SetSequence(seq); err != nil {
191+
return err
192+
}
193+
return nil
194+
}
195+
196+
// Otherwise treat it as a key/value pair.
197+
return b.Put(k, v)
198+
}); err != nil {
199+
return err
200+
}
201+
202+
return tx.Commit()
203+
}
204+
205+
// walkFunc is the type of the function called for keys (buckets and "normal"
206+
// values) discovered by Walk. keys is the list of keys to descend to the bucket
207+
// owning the discovered key/value pair k/v.
208+
type walkFunc func(keys [][]byte, k, v []byte, seq uint64) error
209+
210+
// walk walks recursively the bolt database db, calling walkFn for each key it finds.
211+
func (bc *BoltCompacter) walk(db *bolt.DB, walkFn walkFunc) error {
212+
return db.View(func(tx *bolt.Tx) error {
213+
return tx.ForEach(func(name []byte, b *bolt.Bucket) error {
214+
return bc.walkBucket(b, nil, name, nil, b.Sequence(), walkFn)
215+
})
216+
})
217+
}
218+
219+
func (bc *BoltCompacter) walkBucket(b *bolt.Bucket, keypath [][]byte, k, v []byte, seq uint64, fn walkFunc) error {
220+
// Execute callback.
221+
if err := fn(keypath, k, v, seq); err != nil {
222+
return err
223+
}
224+
225+
// If this is not a bucket then stop.
226+
if v != nil {
227+
return nil
228+
}
229+
230+
// Iterate over each child key/value.
231+
keypath = append(keypath, k)
232+
return b.ForEach(func(k, v []byte) error {
233+
if v == nil {
234+
bkt := b.Bucket(k)
235+
return bc.walkBucket(bkt, keypath, k, nil, bkt.Sequence(), fn)
236+
}
237+
return bc.walkBucket(b, keypath, k, v, b.Sequence(), fn)
238+
})
239+
}

docs/usage.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ If the ``Clone`` settings is not set, you need to manually add git repository:
9090

9191
git clone --mirror [email protected]:rande/gonode.git ./data/git/github.com/rande/gonode.git
9292

93-
93+
94+
> In order to clone, make sure you have a proper ssh key setup and remote ssh fingerprint accepted on the server.
95+
9496
Bower
9597
-----
9698

errors.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,12 @@ import (
1111

1212
var (
1313
SyncInProgressError = errors.New("A synchronization is already running")
14+
DatabaseLockedError = errors.New("The database is locked")
1415
EmptyKeyError = errors.New("No value available")
1516
ResourceNotFoundError = errors.New("Resource not found")
1617
EmptyDataError = errors.New("Empty data")
1718
SameKeyError = errors.New("Same key")
1819
HttpError = errors.New("Http error")
1920
InvalidPackageError = errors.New("Invalid package error")
21+
InvalidReferenceError = errors.New("Invalid reference")
2022
)

mirror/bower/bower.go

Lines changed: 47 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,38 +36,73 @@ func NewBowerService() *BowerService {
3636
}
3737

3838
type BowerService struct {
39-
DB *bolt.DB
40-
Config *BowerConfig
41-
Logger *log.Entry
42-
lock bool
43-
StateChan chan pkgmirror.State
39+
DB *bolt.DB
40+
Config *BowerConfig
41+
Logger *log.Entry
42+
lock bool
43+
StateChan chan pkgmirror.State
44+
BoltCompacter *pkgmirror.BoltCompacter
4445
}
4546

4647
func (bs *BowerService) Init(app *goapp.App) (err error) {
4748
bs.Logger.Info("Init")
4849

50+
return bs.openDatabase()
51+
}
52+
53+
func (bs *BowerService) openDatabase() (err error) {
4954
if bs.DB, err = pkgmirror.OpenDatabaseWithBucket(bs.Config.Path, bs.Config.Code); err != nil {
5055
bs.Logger.WithFields(log.Fields{
51-
"error": err,
52-
"path": bs.Config.Path,
53-
"bucket": string(bs.Config.Code),
54-
"action": "Init",
56+
log.ErrorKey: err,
57+
"path": bs.Config.Path,
58+
"bucket": string(bs.Config.Code),
59+
"action": "Init",
5560
}).Error("Unable to open the internal database")
61+
62+
return err
5663
}
5764

58-
return
65+
return nil
66+
}
67+
68+
func (bs *BowerService) optimize() error {
69+
bs.lock = true
70+
71+
path := bs.DB.Path()
72+
73+
bs.DB.Close()
74+
75+
if err := bs.BoltCompacter.Compact(path); err != nil {
76+
return err
77+
}
78+
79+
err := bs.openDatabase()
80+
81+
bs.lock = false
82+
83+
return err
5984
}
6085

6186
func (bs *BowerService) Serve(state *goapp.GoroutineState) error {
6287
bs.Logger.Info("Starting Bower Service")
6388

6489
syncEnd := make(chan bool)
6590

91+
iteration := 0
6692
sync := func() {
6793
bs.Logger.Info("Starting a new sync...")
6894

6995
bs.SyncPackages()
7096

97+
iteration++
98+
99+
// optimize every 10 iteration
100+
if iteration > 9 {
101+
bs.Logger.Info("Starting database optimization")
102+
bs.optimize()
103+
iteration = 0
104+
}
105+
71106
syncEnd <- true
72107
}
73108

@@ -123,8 +158,8 @@ func (bs *BowerService) SyncPackages() error {
123158

124159
if err := pkgmirror.LoadRemoteStruct(fmt.Sprintf("%s/packages", bs.Config.SourceServer), &pkgs); err != nil {
125160
logger.WithFields(log.Fields{
126-
"path": "packages",
127-
"error": err.Error(),
161+
"path": "packages",
162+
log.ErrorKey: err.Error(),
128163
}).Error("Error loading bower packages list")
129164

130165
return err // an error occurs avoid empty file

mirror/bower/bower_app.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ func ConfigureApp(config *pkgmirror.Config, l *goapp.Lifecycle) {
4040
"code": name,
4141
})
4242
s.StateChan = pkgmirror.GetStateChannel(fmt.Sprintf("pkgmirror.bower.%s", name), app.Get("pkgmirror.channel.state").(chan pkgmirror.State))
43+
s.BoltCompacter = app.Get("bolt.compacter").(*pkgmirror.BoltCompacter)
4344

4445
if err := s.Init(app); err != nil {
4546
panic(err)

0 commit comments

Comments
 (0)