@@ -19,16 +19,19 @@ package ethdb
19
19
import (
20
20
"strconv"
21
21
"strings"
22
+ "sync"
22
23
"time"
23
24
24
25
"github.com/ethereum/go-ethereum/compression/rle"
25
26
"github.com/ethereum/go-ethereum/logger"
26
27
"github.com/ethereum/go-ethereum/logger/glog"
27
- "github.com/rcrowley /go-metrics"
28
+ "github.com/ethereum /go-ethereum/ metrics"
28
29
"github.com/syndtr/goleveldb/leveldb"
29
30
"github.com/syndtr/goleveldb/leveldb/errors"
30
31
"github.com/syndtr/goleveldb/leveldb/iterator"
31
32
"github.com/syndtr/goleveldb/leveldb/opt"
33
+
34
+ gometrics "github.com/rcrowley/go-metrics"
32
35
)
33
36
34
37
var OpenFileLimit = 64
@@ -37,15 +40,18 @@ type LDBDatabase struct {
37
40
fn string // filename for reporting
38
41
db * leveldb.DB // LevelDB instance
39
42
40
- GetTimer metrics.Timer // Timer for measuring the database get request counts and latencies
41
- PutTimer metrics.Timer // Timer for measuring the database put request counts and latencies
42
- DelTimer metrics.Timer // Timer for measuring the database delete request counts and latencies
43
- MissMeter metrics.Meter // Meter for measuring the missed database get requests
44
- ReadMeter metrics.Meter // Meter for measuring the database get request data usage
45
- WriteMeter metrics.Meter // Meter for measuring the database put request data usage
46
- CompTimeMeter metrics.Meter // Meter for measuring the total time spent in database compaction
47
- CompReadMeter metrics.Meter // Meter for measuring the data read during compaction
48
- CompWriteMeter metrics.Meter // Meter for measuring the data written during compaction
43
+ getTimer gometrics.Timer // Timer for measuring the database get request counts and latencies
44
+ putTimer gometrics.Timer // Timer for measuring the database put request counts and latencies
45
+ delTimer gometrics.Timer // Timer for measuring the database delete request counts and latencies
46
+ missMeter gometrics.Meter // Meter for measuring the missed database get requests
47
+ readMeter gometrics.Meter // Meter for measuring the database get request data usage
48
+ writeMeter gometrics.Meter // Meter for measuring the database put request data usage
49
+ compTimeMeter gometrics.Meter // Meter for measuring the total time spent in database compaction
50
+ compReadMeter gometrics.Meter // Meter for measuring the data read during compaction
51
+ compWriteMeter gometrics.Meter // Meter for measuring the data written during compaction
52
+
53
+ quitLock sync.Mutex // Mutex protecting the quit channel access
54
+ quitChan chan chan error // Quit channel to stop the metrics collection before closing the database
49
55
}
50
56
51
57
// NewLDBDatabase returns a LevelDB wrapped object. LDBDatabase does not persist data by
@@ -54,64 +60,61 @@ type LDBDatabase struct {
54
60
func NewLDBDatabase (file string ) (* LDBDatabase , error ) {
55
61
// Open the db
56
62
db , err := leveldb .OpenFile (file , & opt.Options {OpenFilesCacheCapacity : OpenFileLimit })
57
- // check for curruption and attempt to recover
63
+ // check for corruption and attempt to recover
58
64
if _ , iscorrupted := err .(* errors.ErrCorrupted ); iscorrupted {
59
65
db , err = leveldb .RecoverFile (file , nil )
60
66
}
61
67
// (re) check for errors and abort if opening of the db failed
62
68
if err != nil {
63
69
return nil , err
64
70
}
65
- database := & LDBDatabase {
71
+ return & LDBDatabase {
66
72
fn : file ,
67
73
db : db ,
68
- }
69
- go database .meter (3 * time .Second )
70
-
71
- return database , nil
74
+ }, nil
72
75
}
73
76
74
77
// Put puts the given key / value to the queue
75
78
func (self * LDBDatabase ) Put (key []byte , value []byte ) error {
76
79
// Measure the database put latency, if requested
77
- if self .PutTimer != nil {
78
- defer self .PutTimer .UpdateSince (time .Now ())
80
+ if self .putTimer != nil {
81
+ defer self .putTimer .UpdateSince (time .Now ())
79
82
}
80
83
// Generate the data to write to disk, update the meter and write
81
84
dat := rle .Compress (value )
82
85
83
- if self .WriteMeter != nil {
84
- self .WriteMeter .Mark (int64 (len (dat )))
86
+ if self .writeMeter != nil {
87
+ self .writeMeter .Mark (int64 (len (dat )))
85
88
}
86
89
return self .db .Put (key , dat , nil )
87
90
}
88
91
89
92
// Get returns the given key if it's present.
90
93
func (self * LDBDatabase ) Get (key []byte ) ([]byte , error ) {
91
94
// Measure the database get latency, if requested
92
- if self .GetTimer != nil {
93
- defer self .GetTimer .UpdateSince (time .Now ())
95
+ if self .getTimer != nil {
96
+ defer self .getTimer .UpdateSince (time .Now ())
94
97
}
95
98
// Retrieve the key and increment the miss counter if not found
96
99
dat , err := self .db .Get (key , nil )
97
100
if err != nil {
98
- if self .MissMeter != nil {
99
- self .MissMeter .Mark (1 )
101
+ if self .missMeter != nil {
102
+ self .missMeter .Mark (1 )
100
103
}
101
104
return nil , err
102
105
}
103
106
// Otherwise update the actually retrieved amount of data
104
- if self .ReadMeter != nil {
105
- self .ReadMeter .Mark (int64 (len (dat )))
107
+ if self .readMeter != nil {
108
+ self .readMeter .Mark (int64 (len (dat )))
106
109
}
107
110
return rle .Decompress (dat )
108
111
}
109
112
110
113
// Delete deletes the key from the queue and database
111
114
func (self * LDBDatabase ) Delete (key []byte ) error {
112
115
// Measure the database delete latency, if requested
113
- if self .DelTimer != nil {
114
- defer self .DelTimer .UpdateSince (time .Now ())
116
+ if self .delTimer != nil {
117
+ defer self .delTimer .UpdateSince (time .Now ())
115
118
}
116
119
// Execute the actual operation
117
120
return self .db .Delete (key , nil )
@@ -127,8 +130,20 @@ func (self *LDBDatabase) Flush() error {
127
130
}
128
131
129
132
func (self * LDBDatabase ) Close () {
133
+ // Stop the metrics collection to avoid internal database races
134
+ self .quitLock .Lock ()
135
+ defer self .quitLock .Unlock ()
136
+
137
+ if self .quitChan != nil {
138
+ errc := make (chan error )
139
+ self .quitChan <- errc
140
+ if err := <- errc ; err != nil {
141
+ glog .V (logger .Error ).Infof ("metrics failure in '%s': %v\n " , self .fn , err )
142
+ }
143
+ }
144
+ // Flush and close the database
130
145
if err := self .Flush (); err != nil {
131
- glog .V (logger .Error ).Infof ("error: flush '%s': %v\n " , self .fn , err )
146
+ glog .V (logger .Error ).Infof ("flushing '%s' failed : %v\n " , self .fn , err )
132
147
}
133
148
self .db .Close ()
134
149
glog .V (logger .Error ).Infoln ("flushed and closed db:" , self .fn )
@@ -138,6 +153,27 @@ func (self *LDBDatabase) LDB() *leveldb.DB {
138
153
return self .db
139
154
}
140
155
156
+ // Meter configures the database metrics collectors and
157
+ func (self * LDBDatabase ) Meter (prefix string ) {
158
+ // Initialize all the metrics collector at the requested prefix
159
+ self .getTimer = metrics .NewTimer (prefix + "user/gets" )
160
+ self .putTimer = metrics .NewTimer (prefix + "user/puts" )
161
+ self .delTimer = metrics .NewTimer (prefix + "user/dels" )
162
+ self .missMeter = metrics .NewMeter (prefix + "user/misses" )
163
+ self .readMeter = metrics .NewMeter (prefix + "user/reads" )
164
+ self .writeMeter = metrics .NewMeter (prefix + "user/writes" )
165
+ self .compTimeMeter = metrics .NewMeter (prefix + "compact/time" )
166
+ self .compReadMeter = metrics .NewMeter (prefix + "compact/input" )
167
+ self .compWriteMeter = metrics .NewMeter (prefix + "compact/output" )
168
+
169
+ // Create a quit channel for the periodic collector and run it
170
+ self .quitLock .Lock ()
171
+ self .quitChan = make (chan chan error )
172
+ self .quitLock .Unlock ()
173
+
174
+ go self .meter (3 * time .Second )
175
+ }
176
+
141
177
// meter periodically retrieves internal leveldb counters and reports them to
142
178
// the metrics subsystem.
143
179
//
@@ -193,16 +229,24 @@ func (self *LDBDatabase) meter(refresh time.Duration) {
193
229
}
194
230
}
195
231
// Update all the requested meters
196
- if self .CompTimeMeter != nil {
197
- self .CompTimeMeter .Mark (int64 ((counters [i % 2 ][0 ] - counters [(i - 1 )% 2 ][0 ]) * 1000 * 1000 * 1000 ))
232
+ if self .compTimeMeter != nil {
233
+ self .compTimeMeter .Mark (int64 ((counters [i % 2 ][0 ] - counters [(i - 1 )% 2 ][0 ]) * 1000 * 1000 * 1000 ))
198
234
}
199
- if self .CompReadMeter != nil {
200
- self .CompReadMeter .Mark (int64 ((counters [i % 2 ][1 ] - counters [(i - 1 )% 2 ][1 ]) * 1024 * 1024 ))
235
+ if self .compReadMeter != nil {
236
+ self .compReadMeter .Mark (int64 ((counters [i % 2 ][1 ] - counters [(i - 1 )% 2 ][1 ]) * 1024 * 1024 ))
201
237
}
202
- if self .CompWriteMeter != nil {
203
- self .CompWriteMeter .Mark (int64 ((counters [i % 2 ][2 ] - counters [(i - 1 )% 2 ][2 ]) * 1024 * 1024 ))
238
+ if self .compWriteMeter != nil {
239
+ self .compWriteMeter .Mark (int64 ((counters [i % 2 ][2 ] - counters [(i - 1 )% 2 ][2 ]) * 1024 * 1024 ))
204
240
}
205
241
// Sleep a bit, then repeat the stats collection
206
- time .Sleep (refresh )
242
+ select {
243
+ case errc := <- self .quitChan :
244
+ // Quit requesting, stop hammering the database
245
+ errc <- nil
246
+ return
247
+
248
+ case <- time .After (refresh ):
249
+ // Timeout, gather a new set of stats
250
+ }
207
251
}
208
252
}
0 commit comments