Skip to content

Commit 0050dee

Browse files
authored
Merge pull request #154 from anyproto/GO-5827-speedup-deletion
GO-5827 make cidEntriesByString parallel, rename del key, remove old recheck …
2 parents d7bd9dd + 2a4bc28 commit 0050dee

File tree

5 files changed

+65
-46
lines changed

5 files changed

+65
-46
lines changed

index/cidentry.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package index
22

33
import (
44
"context"
5+
"sync"
56
"time"
67

78
"github.com/ipfs/go-cid"
@@ -12,6 +13,7 @@ import (
1213

1314
type CidEntries struct {
1415
entries []*cidEntry
16+
mu sync.Mutex
1517
}
1618

1719
func (ce *CidEntries) Release() {
@@ -23,6 +25,12 @@ func (ce *CidEntries) Release() {
2325
return
2426
}
2527

28+
func (ce *CidEntries) Add(entry *cidEntry) {
29+
ce.mu.Lock()
30+
defer ce.mu.Unlock()
31+
ce.entries = append(ce.entries, entry)
32+
}
33+
2634
type cidEntry struct {
2735
Cid cid.Cid
2836
release func()

index/cids.go

Lines changed: 53 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"github.com/ipfs/go-cid"
1010
"github.com/redis/go-redis/v9"
1111
"go.uber.org/zap"
12+
"golang.org/x/sync/errgroup"
1213

1314
"github.com/anyproto/any-sync-filenode/index/indexproto"
1415
)
@@ -33,18 +34,43 @@ func (ri *redisIndex) CidEntries(ctx context.Context, cids []cid.Cid) (entries *
3334
return entries, nil
3435
}
3536

36-
func (ri *redisIndex) CidEntriesByString(ctx context.Context, cids []string) (entries *CidEntries, err error) {
37-
entries = &CidEntries{}
38-
var c cid.Cid
39-
for _, cs := range cids {
40-
c, err = cid.Decode(cs)
41-
if err != nil {
42-
return
43-
}
44-
if err = ri.getAndAddToEntries(ctx, entries, c); err != nil {
45-
entries.Release()
46-
return nil, err
37+
func (ri *redisIndex) CidEntriesByString(ctx context.Context, cids []string) (*CidEntries, error) {
38+
g, ctx := errgroup.WithContext(ctx)
39+
g.SetLimit(10)
40+
41+
results := make([]*cidEntry, len(cids))
42+
43+
for i, cs := range cids {
44+
i, cs := i, cs
45+
g.Go(func() error {
46+
// decode
47+
c, err := cid.Decode(cs)
48+
if err != nil {
49+
return err
50+
}
51+
// fetch entry
52+
entry, err := ri.acquireCidEntry(ctx, c)
53+
if err != nil {
54+
return err
55+
}
56+
// store in the correct slot
57+
results[i] = entry
58+
return nil
59+
})
60+
}
61+
62+
if err := g.Wait(); err != nil {
63+
for _, result := range results {
64+
if result != nil {
65+
result.release()
66+
}
4767
}
68+
return nil, err
69+
}
70+
71+
entries := &CidEntries{}
72+
for _, e := range results {
73+
entries.Add(e)
4874
}
4975
return entries, nil
5076
}
@@ -66,22 +92,29 @@ func (ri *redisIndex) CidEntriesByBlocks(ctx context.Context, bs []blocks.Block)
6692
}
6793

6894
func (ri *redisIndex) getAndAddToEntries(ctx context.Context, entries *CidEntries, c cid.Cid) (err error) {
69-
_, release, err := ri.AcquireKey(ctx, CidKey(c))
95+
entry, err := ri.acquireCidEntry(ctx, c)
96+
if err != nil {
97+
return err
98+
}
99+
entries.Add(entry)
100+
return
101+
}
102+
103+
func (ri *redisIndex) acquireCidEntry(ctx context.Context, c cid.Cid) (entry *cidEntry, err error) {
104+
ok, release, err := ri.AcquireKey(ctx, CidKey(c))
70105
if err != nil {
71106
return
72107
}
73-
//temporarily ignore the exists check to make a deep check
74-
/*if !ok {
108+
if !ok {
75109
release()
76-
return ErrCidsNotExist
77-
}*/
78-
entry, err := ri.getCidEntry(ctx, c)
110+
return nil, ErrCidsNotExist
111+
}
112+
entry, err = ri.getCidEntry(ctx, c)
79113
if err != nil {
80114
release()
81-
return err
115+
return nil, err
82116
}
83117
entry.release = release
84-
entries.entries = append(entries.entries, entry)
85118
return
86119
}
87120

@@ -134,15 +167,7 @@ func (ri *redisIndex) getCidEntry(ctx context.Context, c cid.Cid) (entry *cidEnt
134167
cidData, err := ri.cl.Get(ctx, ck).Result()
135168
if err != nil {
136169
if errors.Is(err, redis.Nil) {
137-
// temporary additional check: try to load data from store and restore cid
138-
var b blocks.Block
139-
if b, err = ri.persistStore.Get(ctx, c); err != nil {
140-
log.WarnCtx(ctx, "restore cid entry error", zap.String("cid", c.String()), zap.Error(err))
141-
err = ErrCidsNotExist
142-
return
143-
}
144-
log.InfoCtx(ctx, "restore cid entry", zap.String("cid", c.String()))
145-
return ri.createCidEntry(ctx, b)
170+
return nil, ErrCidsNotExist
146171
}
147172
return
148173
}

index/cids_test.go

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -90,23 +90,6 @@ func TestRedisIndex_CidEntries(t *testing.T) {
9090
assert.NotEmpty(t, e.Version)
9191
}
9292
})
93-
t.Run("restore from store", func(t *testing.T) {
94-
bs := testutil.NewRandBlocks(4)
95-
fx := newFixture(t)
96-
defer fx.Finish(t)
97-
98-
require.NoError(t, fx.BlocksAdd(ctx, bs[:3]))
99-
100-
cids := testutil.BlocksToKeys(bs)
101-
102-
fx.persistStore.EXPECT().Get(ctx, bs[3].Cid()).Return(bs[3], nil)
103-
104-
result, err := fx.CidEntries(ctx, cids)
105-
defer result.Release()
106-
require.NoError(t, err)
107-
require.Len(t, result.entries, len(bs))
108-
t.Log(result.entries[3])
109-
})
11093
}
11194

11295
func TestRedisIndex_CidExistsInSpace(t *testing.T) {

index/index.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ type redisIndex struct {
119119
redsync *redsync.Redsync
120120
persistStore persistentStore
121121
persistTtl time.Duration
122+
persistMu sync.Mutex
122123
ticker periodicsync.PeriodicSync
123124
defaultLimit uint64
124125

@@ -301,7 +302,7 @@ func FileKey(fileId string) string {
301302

302303
func DelKey(k Key) string {
303304
hash := strconv.FormatUint(uint64(xxhash.ChecksumString32(k.GroupId)), 36)
304-
return "d:" + k.SpaceId + ".{" + hash + "}"
305+
return "del:" + k.SpaceId + ".{" + hash + "}"
305306
}
306307

307308
const infoKey = "info"

index/loader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,8 @@ func (ri *redisIndex) updateKeyUsage(ctx context.Context, key string) (err error
195195
}
196196

197197
func (ri *redisIndex) PersistKeys(ctx context.Context) {
198+
ri.persistMu.Lock()
199+
defer ri.persistMu.Unlock()
198200
st := time.Now()
199201
rand.Shuffle(len(partitions), func(i, j int) {
200202
partitions[i], partitions[j] = partitions[j], partitions[i]

0 commit comments

Comments
 (0)