Skip to content

Commit 6951d30

Browse files
authored
cache: speedup scanning (#6549)
Signed-off-by: jiefenghuang <jiefeng@juicedata.io>
1 parent f5abfdf commit 6951d30

File tree

4 files changed

+42
-4
lines changed

4 files changed

+42
-4
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/baidubce/bce-sdk-go v0.9.221
2323
github.com/bytedance/mockey v1.2.14
2424
github.com/ceph/go-ceph v0.18.0
25+
github.com/charlievieth/fastwalk v1.0.14
2526
github.com/cloudsoda/go-smb2 v0.0.0-20250228001242-d4c70e6251cc
2627
github.com/colinmarc/hdfs/v2 v2.4.0
2728
github.com/davies/groupcache v0.0.0-20230821031435-e4e8362f58e1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,8 @@ github.com/ceph/go-ceph v0.18.0/go.mod h1:cflETVTBNAQM6jdr7hpNHHFHKYiJiWWcAeRDrR
163163
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
164164
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
165165
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
166+
github.com/charlievieth/fastwalk v1.0.14 h1:3Eh5uaFGwHZd8EGwTjJnSpBkfwfsak9h6ICgnWlhAyg=
167+
github.com/charlievieth/fastwalk v1.0.14/go.mod h1:diVcUreiU1aQ4/Wu3NbxxH4/KYdKpLDojrQ1Bb2KgNY=
166168
github.com/cheggaaa/pb v1.0.29 h1:FckUN5ngEk2LpvuG0fw1GEFx6LtyY2pWI/Z2QgCnEYo=
167169
github.com/cheggaaa/pb v1.0.29/go.mod h1:W40334L7FMC5JKWldsTWbdGjLo0RxUKK73K+TuPxX30=
168170
github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag=

pkg/chunk/disk_cache.go

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
"syscall"
3838
"time"
3939

40+
"github.com/charlievieth/fastwalk"
4041
"github.com/davies/groupcache/consistenthash"
4142
"github.com/dustin/go-humanize"
4243
"github.com/google/uuid"
@@ -917,7 +918,8 @@ func (cache *cacheStore) scanCached() {
917918

918919
cachePrefix := filepath.Join(cache.dir, cacheDir)
919920
logger.Debugf("Scan %s to find cached blocks", cachePrefix)
920-
_ = filepath.WalkDir(cachePrefix, func(path string, d fs.DirEntry, err error) error {
921+
err := fastwalk.Walk(nil, cachePrefix, func(path string, d fs.DirEntry, err error) error {
922+
// this func should be concurrent safe
921923
if err != nil {
922924
return nil
923925
}
@@ -953,6 +955,9 @@ func (cache *cacheStore) scanCached() {
953955
}
954956
return nil
955957
})
958+
if err != nil {
959+
logger.Errorf("Scan cached files in %s failed: %s", cachePrefix, err)
960+
}
956961

957962
cache.Lock()
958963
cache.scanned = true
@@ -972,7 +977,8 @@ func (cache *cacheStore) scanStaging() {
972977
var count, usage uint64
973978
stagingPrefix := filepath.Join(cache.dir, stagingDir)
974979
logger.Debugf("Scan %s to find staging blocks", stagingPrefix)
975-
_ = filepath.WalkDir(stagingPrefix, func(path string, d fs.DirEntry, err error) error {
980+
err := fastwalk.Walk(nil, stagingPrefix, func(path string, d fs.DirEntry, err error) error {
981+
// this func should be concurrent safe
976982
if err != nil {
977983
return nil // ignore it
978984
}
@@ -1002,11 +1008,14 @@ func (cache *cacheStore) scanStaging() {
10021008
cache.m.stageBlocks.Add(1)
10031009
cache.m.stageBlockBytes.Add(float64(origSize))
10041010
cache.uploader(key, path, false)
1005-
count++
1006-
usage += uint64(origSize)
1011+
atomic.AddUint64(&count, 1)
1012+
atomic.AddUint64(&usage, uint64(origSize))
10071013
}
10081014
return nil
10091015
})
1016+
if err != nil {
1017+
logger.Errorf("Scan staging files in %s failed: %s", stagingPrefix, err)
1018+
}
10101019
if count > 0 {
10111020
logger.Infof("Found %d staging blocks (%s) in %s with %s", count, humanize.IBytes(usage), cache.dir, time.Since(start))
10121021
}

pkg/chunk/disk_cache_test.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,32 @@ func TestMetrics(t *testing.T) {
144144
}
145145
}
146146

147+
func TestScanCached(t *testing.T) {
148+
var err error
149+
cfg := defaultConf
150+
cfg.CacheEviction = EvictionNone
151+
cache := &cacheStore{
152+
opTs: make(map[time.Duration]func() error),
153+
}
154+
cache.state = newDCState(dcUnchanged, cache)
155+
cache.keys, err = NewKeyIndex(&cfg)
156+
require.NoError(t, err)
157+
cache.dir = "/tmp/jfstest_scan"
158+
rawDir := filepath.Join(cache.dir, cacheDir)
159+
if err := os.MkdirAll(rawDir, 0755); err != nil {
160+
t.Fatalf("mkdir %s: %s", rawDir, err)
161+
}
162+
num := 10
163+
for i := 0; i < num; i++ {
164+
if f, err := os.Create(filepath.Join(rawDir, fmt.Sprintf("test%d_1024", i))); err == nil {
165+
_ = f.Close()
166+
}
167+
}
168+
defer os.RemoveAll(rawDir)
169+
cache.scanCached()
170+
require.Equal(t, num, cache.keys.len())
171+
}
172+
147173
func TestChecksum(t *testing.T) {
148174
conf := testConf()
149175
conf.FreeSpace = 0.01

0 commit comments

Comments
 (0)