Skip to content

Commit 88b1db7

Browse files
holimanfjl
authored andcommitted
accounts/keystore: scan key directory without locks held (#15171)
The accountCache contains a file cache, and remembers from scan to scan what files were present earlier. Thus, whenever there's a change, the scan phase only bothers processing new and removed files.
1 parent 7a045af commit 88b1db7

File tree

4 files changed

+297
-102
lines changed

4 files changed

+297
-102
lines changed

accounts/keystore/account_cache.go

Lines changed: 122 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"github.com/ethereum/go-ethereum/accounts"
3232
"github.com/ethereum/go-ethereum/common"
3333
"github.com/ethereum/go-ethereum/log"
34+
"gopkg.in/fatih/set.v0"
3435
)
3536

3637
// Minimum amount of time between cache reloads. This limit applies if the platform does
@@ -71,13 +72,22 @@ type accountCache struct {
7172
byAddr map[common.Address][]accounts.Account
7273
throttle *time.Timer
7374
notify chan struct{}
75+
fileC fileCache
76+
}
77+
78+
// fileCache is a cache of files seen during scan of keystore
79+
type fileCache struct {
80+
all *set.SetNonTS // list of all files
81+
mtime time.Time // latest mtime seen
82+
mu sync.RWMutex
7483
}
7584

7685
func newAccountCache(keydir string) (*accountCache, chan struct{}) {
7786
ac := &accountCache{
7887
keydir: keydir,
7988
byAddr: make(map[common.Address][]accounts.Account),
8089
notify: make(chan struct{}, 1),
90+
fileC: fileCache{all: set.NewNonTS()},
8191
}
8292
ac.watcher = newWatcher(ac)
8393
return ac, ac.notify
@@ -127,6 +137,23 @@ func (ac *accountCache) delete(removed accounts.Account) {
127137
}
128138
}
129139

140+
// deleteByFile removes an account referenced by the given path.
141+
func (ac *accountCache) deleteByFile(path string) {
142+
ac.mu.Lock()
143+
defer ac.mu.Unlock()
144+
i := sort.Search(len(ac.all), func(i int) bool { return ac.all[i].URL.Path >= path })
145+
146+
if i < len(ac.all) && ac.all[i].URL.Path == path {
147+
removed := ac.all[i]
148+
ac.all = append(ac.all[:i], ac.all[i+1:]...)
149+
if ba := removeAccount(ac.byAddr[removed.Address], removed); len(ba) == 0 {
150+
delete(ac.byAddr, removed.Address)
151+
} else {
152+
ac.byAddr[removed.Address] = ba
153+
}
154+
}
155+
}
156+
130157
func removeAccount(slice []accounts.Account, elem accounts.Account) []accounts.Account {
131158
for i := range slice {
132159
if slice[i] == elem {
@@ -167,15 +194,16 @@ func (ac *accountCache) find(a accounts.Account) (accounts.Account, error) {
167194
default:
168195
err := &AmbiguousAddrError{Addr: a.Address, Matches: make([]accounts.Account, len(matches))}
169196
copy(err.Matches, matches)
197+
sort.Sort(accountsByURL(err.Matches))
170198
return accounts.Account{}, err
171199
}
172200
}
173201

174202
func (ac *accountCache) maybeReload() {
175203
ac.mu.Lock()
176-
defer ac.mu.Unlock()
177204

178205
if ac.watcher.running {
206+
ac.mu.Unlock()
179207
return // A watcher is running and will keep the cache up-to-date.
180208
}
181209
if ac.throttle == nil {
@@ -184,12 +212,15 @@ func (ac *accountCache) maybeReload() {
184212
select {
185213
case <-ac.throttle.C:
186214
default:
215+
ac.mu.Unlock()
187216
return // The cache was reloaded recently.
188217
}
189218
}
219+
// No watcher running, start it.
190220
ac.watcher.start()
191-
ac.reload()
192221
ac.throttle.Reset(minReloadInterval)
222+
ac.mu.Unlock()
223+
ac.scanAccounts()
193224
}
194225

195226
func (ac *accountCache) close() {
@@ -205,70 +236,122 @@ func (ac *accountCache) close() {
205236
ac.mu.Unlock()
206237
}
207238

208-
// reload caches addresses of existing accounts.
209-
// Callers must hold ac.mu.
210-
func (ac *accountCache) reload() {
211-
accounts, err := ac.scan()
239+
// scanFiles performs a new scan on the given directory, compares against the already
240+
// cached filenames, and returns file sets: new, missing , modified
241+
func (fc *fileCache) scanFiles(keyDir string) (set.Interface, set.Interface, set.Interface, error) {
242+
t0 := time.Now()
243+
files, err := ioutil.ReadDir(keyDir)
244+
t1 := time.Now()
212245
if err != nil {
213-
log.Debug("Failed to reload keystore contents", "err", err)
246+
return nil, nil, nil, err
214247
}
215-
ac.all = accounts
216-
sort.Sort(ac.all)
217-
for k := range ac.byAddr {
218-
delete(ac.byAddr, k)
219-
}
220-
for _, a := range accounts {
221-
ac.byAddr[a.Address] = append(ac.byAddr[a.Address], a)
222-
}
223-
select {
224-
case ac.notify <- struct{}{}:
225-
default:
248+
fc.mu.RLock()
249+
prevMtime := fc.mtime
250+
fc.mu.RUnlock()
251+
252+
filesNow := set.NewNonTS()
253+
moddedFiles := set.NewNonTS()
254+
var newMtime time.Time
255+
for _, fi := range files {
256+
modTime := fi.ModTime()
257+
path := filepath.Join(keyDir, fi.Name())
258+
if skipKeyFile(fi) {
259+
log.Trace("Ignoring file on account scan", "path", path)
260+
continue
261+
}
262+
filesNow.Add(path)
263+
if modTime.After(prevMtime) {
264+
moddedFiles.Add(path)
265+
}
266+
if modTime.After(newMtime) {
267+
newMtime = modTime
268+
}
226269
}
227-
log.Debug("Reloaded keystore contents", "accounts", len(ac.all))
270+
t2 := time.Now()
271+
272+
fc.mu.Lock()
273+
// Missing = previous - current
274+
missing := set.Difference(fc.all, filesNow)
275+
// New = current - previous
276+
newFiles := set.Difference(filesNow, fc.all)
277+
// Modified = modified - new
278+
modified := set.Difference(moddedFiles, newFiles)
279+
fc.all = filesNow
280+
fc.mtime = newMtime
281+
fc.mu.Unlock()
282+
t3 := time.Now()
283+
log.Debug("FS scan times", "list", t1.Sub(t0), "set", t2.Sub(t1), "diff", t3.Sub(t2))
284+
return newFiles, missing, modified, nil
228285
}
229286

230-
func (ac *accountCache) scan() ([]accounts.Account, error) {
231-
files, err := ioutil.ReadDir(ac.keydir)
287+
// scanAccounts checks if any changes have occurred on the filesystem, and
288+
// updates the account cache accordingly
289+
func (ac *accountCache) scanAccounts() error {
290+
newFiles, missingFiles, modified, err := ac.fileC.scanFiles(ac.keydir)
291+
t1 := time.Now()
232292
if err != nil {
233-
return nil, err
293+
log.Debug("Failed to reload keystore contents", "err", err)
294+
return err
234295
}
235-
236296
var (
237297
buf = new(bufio.Reader)
238-
addrs []accounts.Account
239298
keyJSON struct {
240299
Address string `json:"address"`
241300
}
242301
)
243-
for _, fi := range files {
244-
path := filepath.Join(ac.keydir, fi.Name())
245-
if skipKeyFile(fi) {
246-
log.Trace("Ignoring file on account scan", "path", path)
247-
continue
248-
}
249-
logger := log.New("path", path)
250-
302+
readAccount := func(path string) *accounts.Account {
251303
fd, err := os.Open(path)
252304
if err != nil {
253-
logger.Trace("Failed to open keystore file", "err", err)
254-
continue
305+
log.Trace("Failed to open keystore file", "path", path, "err", err)
306+
return nil
255307
}
308+
defer fd.Close()
256309
buf.Reset(fd)
257310
// Parse the address.
258311
keyJSON.Address = ""
259312
err = json.NewDecoder(buf).Decode(&keyJSON)
260313
addr := common.HexToAddress(keyJSON.Address)
261314
switch {
262315
case err != nil:
263-
logger.Debug("Failed to decode keystore key", "err", err)
316+
log.Debug("Failed to decode keystore key", "path", path, "err", err)
264317
case (addr == common.Address{}):
265-
logger.Debug("Failed to decode keystore key", "err", "missing or zero address")
318+
log.Debug("Failed to decode keystore key", "path", path, "err", "missing or zero address")
266319
default:
267-
addrs = append(addrs, accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}})
320+
return &accounts.Account{Address: addr, URL: accounts.URL{Scheme: KeyStoreScheme, Path: path}}
268321
}
269-
fd.Close()
322+
return nil
270323
}
271-
return addrs, err
324+
325+
for _, p := range newFiles.List() {
326+
path, _ := p.(string)
327+
a := readAccount(path)
328+
if a != nil {
329+
ac.add(*a)
330+
}
331+
}
332+
for _, p := range missingFiles.List() {
333+
path, _ := p.(string)
334+
ac.deleteByFile(path)
335+
}
336+
337+
for _, p := range modified.List() {
338+
path, _ := p.(string)
339+
a := readAccount(path)
340+
ac.deleteByFile(path)
341+
if a != nil {
342+
ac.add(*a)
343+
}
344+
}
345+
346+
t2 := time.Now()
347+
348+
select {
349+
case ac.notify <- struct{}{}:
350+
default:
351+
}
352+
log.Trace("Handled keystore changes", "time", t2.Sub(t1))
353+
354+
return nil
272355
}
273356

274357
func skipKeyFile(fi os.FileInfo) bool {

accounts/keystore/account_cache_test.go

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package keystore
1818

1919
import (
2020
"fmt"
21+
"io/ioutil"
2122
"math/rand"
2223
"os"
2324
"path/filepath"
@@ -295,3 +296,101 @@ func TestCacheFind(t *testing.T) {
295296
}
296297
}
297298
}
299+
300+
func waitForAccounts(wantAccounts []accounts.Account, ks *KeyStore) error {
301+
var list []accounts.Account
302+
for d := 200 * time.Millisecond; d < 8*time.Second; d *= 2 {
303+
list = ks.Accounts()
304+
if reflect.DeepEqual(list, wantAccounts) {
305+
// ks should have also received change notifications
306+
select {
307+
case <-ks.changes:
308+
default:
309+
return fmt.Errorf("wasn't notified of new accounts")
310+
}
311+
return nil
312+
}
313+
time.Sleep(d)
314+
}
315+
return fmt.Errorf("\ngot %v\nwant %v", list, wantAccounts)
316+
}
317+
318+
// TestUpdatedKeyfileContents tests that updating the contents of a keystore file
319+
// is noticed by the watcher, and the account cache is updated accordingly
320+
func TestUpdatedKeyfileContents(t *testing.T) {
321+
t.Parallel()
322+
323+
// Create a temporary kesytore to test with
324+
rand.Seed(time.Now().UnixNano())
325+
dir := filepath.Join(os.TempDir(), fmt.Sprintf("eth-keystore-watch-test-%d-%d", os.Getpid(), rand.Int()))
326+
ks := NewKeyStore(dir, LightScryptN, LightScryptP)
327+
328+
list := ks.Accounts()
329+
if len(list) > 0 {
330+
t.Error("initial account list not empty:", list)
331+
}
332+
time.Sleep(100 * time.Millisecond)
333+
334+
// Create the directory and copy a key file into it.
335+
os.MkdirAll(dir, 0700)
336+
defer os.RemoveAll(dir)
337+
file := filepath.Join(dir, "aaa")
338+
339+
// Place one of our testfiles in there
340+
if err := cp.CopyFile(file, cachetestAccounts[0].URL.Path); err != nil {
341+
t.Fatal(err)
342+
}
343+
344+
// ks should see the account.
345+
wantAccounts := []accounts.Account{cachetestAccounts[0]}
346+
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
347+
if err := waitForAccounts(wantAccounts, ks); err != nil {
348+
t.Error(err)
349+
return
350+
}
351+
352+
// Now replace file contents
353+
if err := forceCopyFile(file, cachetestAccounts[1].URL.Path); err != nil {
354+
t.Fatal(err)
355+
return
356+
}
357+
wantAccounts = []accounts.Account{cachetestAccounts[1]}
358+
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
359+
if err := waitForAccounts(wantAccounts, ks); err != nil {
360+
t.Errorf("First replacement failed")
361+
t.Error(err)
362+
return
363+
}
364+
365+
// Now replace file contents again
366+
if err := forceCopyFile(file, cachetestAccounts[2].URL.Path); err != nil {
367+
t.Fatal(err)
368+
return
369+
}
370+
wantAccounts = []accounts.Account{cachetestAccounts[2]}
371+
wantAccounts[0].URL = accounts.URL{Scheme: KeyStoreScheme, Path: file}
372+
if err := waitForAccounts(wantAccounts, ks); err != nil {
373+
t.Errorf("Second replacement failed")
374+
t.Error(err)
375+
return
376+
}
377+
// Now replace file contents with crap
378+
if err := ioutil.WriteFile(file, []byte("foo"), 0644); err != nil {
379+
t.Fatal(err)
380+
return
381+
}
382+
if err := waitForAccounts([]accounts.Account{}, ks); err != nil {
383+
t.Errorf("Emptying account file failed")
384+
t.Error(err)
385+
return
386+
}
387+
}
388+
389+
// forceCopyFile is like cp.CopyFile, but doesn't complain if the destination exists.
390+
func forceCopyFile(dst, src string) error {
391+
data, err := ioutil.ReadFile(src)
392+
if err != nil {
393+
return err
394+
}
395+
return ioutil.WriteFile(dst, data, 0644)
396+
}

0 commit comments

Comments
 (0)