Skip to content

Commit 7e0d35c

Browse files
authored
Merge pull request #155 from anyproto/GO-5827-improve-space-deletion
GO-5827 return additional cid checks, additional group check for the space deletion
2 parents 0050dee + 0c01c43 commit 7e0d35c

File tree

4 files changed

+163
-59
lines changed

4 files changed

+163
-59
lines changed

index/cids.go

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,14 +101,11 @@ func (ri *redisIndex) getAndAddToEntries(ctx context.Context, entries *CidEntrie
101101
}
102102

103103
func (ri *redisIndex) acquireCidEntry(ctx context.Context, c cid.Cid) (entry *cidEntry, err error) {
104-
ok, release, err := ri.AcquireKey(ctx, CidKey(c))
104+
_, release, err := ri.AcquireKey(ctx, CidKey(c))
105105
if err != nil {
106106
return
107107
}
108-
if !ok {
109-
release()
110-
return nil, ErrCidsNotExist
111-
}
108+
112109
entry, err = ri.getCidEntry(ctx, c)
113110
if err != nil {
114111
release()
@@ -167,7 +164,14 @@ func (ri *redisIndex) getCidEntry(ctx context.Context, c cid.Cid) (entry *cidEnt
167164
cidData, err := ri.cl.Get(ctx, ck).Result()
168165
if err != nil {
169166
if errors.Is(err, redis.Nil) {
170-
return nil, ErrCidsNotExist
167+
var b blocks.Block
168+
if b, err = ri.persistStore.Get(ctx, c); err != nil {
169+
log.WarnCtx(ctx, "restore cid entry error", zap.String("cid", c.String()), zap.Error(err))
170+
err = ErrCidsNotExist
171+
return
172+
}
173+
log.InfoCtx(ctx, "restore cid entry", zap.String("cid", c.String()))
174+
return ri.createCidEntry(ctx, b)
171175
}
172176
return
173177
}

index/cids_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,23 @@ func TestRedisIndex_CidEntries(t *testing.T) {
9090
assert.NotEmpty(t, e.Version)
9191
}
9292
})
93+
t.Run("restore from store", func(t *testing.T) {
94+
bs := testutil.NewRandBlocks(4)
95+
fx := newFixture(t)
96+
defer fx.Finish(t)
97+
98+
require.NoError(t, fx.BlocksAdd(ctx, bs[:3]))
99+
100+
cids := testutil.BlocksToKeys(bs)
101+
102+
fx.persistStore.EXPECT().Get(ctx, bs[3].Cid()).Return(bs[3], nil)
103+
104+
result, err := fx.CidEntries(ctx, cids)
105+
defer result.Release()
106+
require.NoError(t, err)
107+
require.Len(t, result.entries, len(bs))
108+
t.Log(result.entries[3])
109+
})
93110
}
94111

95112
func TestRedisIndex_CidExistsInSpace(t *testing.T) {

index/delete.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package index
22

33
import (
44
"context"
5+
"errors"
56
"slices"
67
"strings"
78
"time"
@@ -12,6 +13,9 @@ import (
1213
func (ri *redisIndex) SpaceDelete(ctx context.Context, key Key) (ok bool, err error) {
1314
entry, release, err := ri.AcquireSpace(ctx, key)
1415
if err != nil {
16+
if errors.Is(err, ErrSpaceIsDeleted) {
17+
return ri.removeSpaceFromGroup(ctx, key)
18+
}
1519
return
1620
}
1721
defer release()
@@ -74,3 +78,38 @@ func (ri *redisIndex) MarkSpaceAsDeleted(ctx context.Context, key Key) (ok bool,
7478
}
7579
return false, nil
7680
}
81+
82+
func (ri *redisIndex) removeSpaceFromGroup(ctx context.Context, key Key) (ok bool, err error) {
83+
gExists, gRelease, err := ri.AcquireKey(ctx, GroupKey(key))
84+
if err != nil {
85+
return
86+
}
87+
defer gRelease()
88+
if !gExists {
89+
return false, nil
90+
}
91+
92+
gEntry, err := ri.getGroupEntry(ctx, key)
93+
if err != nil {
94+
if errors.Is(err, redis.Nil) {
95+
return false, nil
96+
}
97+
return
98+
}
99+
if !slices.Contains(gEntry.SpaceIds, key.SpaceId) {
100+
return false, nil
101+
}
102+
gEntry.SpaceIds = slices.DeleteFunc(gEntry.SpaceIds, func(spaceId string) bool {
103+
return spaceId == key.SpaceId
104+
})
105+
_, err = ri.cl.Pipelined(ctx, func(pipe redis.Pipeliner) error {
106+
gEntry.Save(ctx, pipe)
107+
return nil
108+
})
109+
if err != nil {
110+
return
111+
}
112+
// add spaces broken spaces to the debug list
113+
_ = ri.cl.LPush(ctx, "debug:spaceIdsToRecheck", key.SpaceId).Err()
114+
return true, nil
115+
}

index/delete_test.go

Lines changed: 97 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -10,59 +10,103 @@ import (
1010
)
1111

1212
func TestRedisIndex_SpaceDelete(t *testing.T) {
13-
fx := newFixture(t)
14-
defer fx.Finish(t)
15-
key := newRandKey()
16-
17-
// space not exists
18-
ok, err := fx.SpaceDelete(ctx, key)
19-
require.NoError(t, err)
20-
assert.False(t, ok)
21-
22-
// add files
23-
bs := testutil.NewRandBlocks(5)
24-
require.NoError(t, fx.BlocksAdd(ctx, bs))
25-
cids, err := fx.CidEntriesByBlocks(ctx, bs)
26-
require.NoError(t, err)
27-
cids.Release()
28-
29-
// bind to files with intersected cids
30-
fileId1 := testutil.NewRandCid().String()
31-
fileId2 := testutil.NewRandCid().String()
32-
33-
cids1, err := fx.CidEntriesByBlocks(ctx, bs)
34-
require.NoError(t, err)
35-
require.NoError(t, fx.FileBind(ctx, key, fileId1, cids1))
36-
cids1.Release()
37-
38-
cids2, err := fx.CidEntriesByBlocks(ctx, bs[:2])
39-
require.NoError(t, err)
40-
require.NoError(t, fx.FileBind(ctx, key, fileId2, cids2))
41-
cids2.Release()
42-
43-
groupInfo, err := fx.GroupInfo(ctx, key.GroupId)
44-
require.NoError(t, err)
45-
assert.NotEmpty(t, groupInfo.BytesUsage)
46-
assert.Contains(t, groupInfo.SpaceIds, key.SpaceId)
47-
48-
require.NoError(t, fx.SetGroupLimit(ctx, key.GroupId, 5000))
49-
require.NoError(t, fx.SetSpaceLimit(ctx, key, 4000))
50-
51-
ok, err = fx.SpaceDelete(ctx, key)
52-
require.NoError(t, err)
53-
assert.True(t, ok)
54-
55-
groupInfo, err = fx.GroupInfo(ctx, key.GroupId)
56-
require.NoError(t, err)
57-
assert.Empty(t, groupInfo.BytesUsage)
58-
assert.NotContains(t, groupInfo.SpaceIds, key.SpaceId)
59-
assert.Equal(t, uint64(5000), groupInfo.AccountLimit)
60-
assert.Equal(t, uint64(5000), groupInfo.Limit)
61-
62-
// second call
63-
ok, err = fx.SpaceDelete(ctx, key)
64-
require.NoError(t, err)
65-
assert.False(t, ok)
13+
t.Run("success", func(t *testing.T) {
14+
fx := newFixture(t)
15+
defer fx.Finish(t)
16+
key := newRandKey()
17+
18+
// space not exists
19+
ok, err := fx.SpaceDelete(ctx, key)
20+
require.NoError(t, err)
21+
assert.False(t, ok)
22+
23+
// add files
24+
bs := testutil.NewRandBlocks(5)
25+
require.NoError(t, fx.BlocksAdd(ctx, bs))
26+
cids, err := fx.CidEntriesByBlocks(ctx, bs)
27+
require.NoError(t, err)
28+
cids.Release()
29+
30+
// bind to files with intersected cids
31+
fileId1 := testutil.NewRandCid().String()
32+
fileId2 := testutil.NewRandCid().String()
33+
34+
cids1, err := fx.CidEntriesByBlocks(ctx, bs)
35+
require.NoError(t, err)
36+
require.NoError(t, fx.FileBind(ctx, key, fileId1, cids1))
37+
cids1.Release()
38+
39+
cids2, err := fx.CidEntriesByBlocks(ctx, bs[:2])
40+
require.NoError(t, err)
41+
require.NoError(t, fx.FileBind(ctx, key, fileId2, cids2))
42+
cids2.Release()
43+
44+
groupInfo, err := fx.GroupInfo(ctx, key.GroupId)
45+
require.NoError(t, err)
46+
assert.NotEmpty(t, groupInfo.BytesUsage)
47+
assert.Contains(t, groupInfo.SpaceIds, key.SpaceId)
48+
49+
require.NoError(t, fx.SetGroupLimit(ctx, key.GroupId, 5000))
50+
require.NoError(t, fx.SetSpaceLimit(ctx, key, 4000))
51+
52+
ok, err = fx.SpaceDelete(ctx, key)
53+
require.NoError(t, err)
54+
assert.True(t, ok)
55+
56+
groupInfo, err = fx.GroupInfo(ctx, key.GroupId)
57+
require.NoError(t, err)
58+
assert.Empty(t, groupInfo.BytesUsage)
59+
assert.NotContains(t, groupInfo.SpaceIds, key.SpaceId)
60+
assert.Equal(t, uint64(5000), groupInfo.AccountLimit)
61+
assert.Equal(t, uint64(5000), groupInfo.Limit)
62+
63+
// second call
64+
ok, err = fx.SpaceDelete(ctx, key)
65+
require.NoError(t, err)
66+
assert.False(t, ok)
67+
})
68+
t.Run("delete from group", func(t *testing.T) {
69+
fx := newFixture(t)
70+
defer fx.Finish(t)
71+
key := newRandKey()
72+
73+
// add files
74+
bs := testutil.NewRandBlocks(5)
75+
require.NoError(t, fx.BlocksAdd(ctx, bs))
76+
cids, err := fx.CidEntriesByBlocks(ctx, bs)
77+
require.NoError(t, err)
78+
cids.Release()
79+
80+
// bind to files with intersected cids
81+
fileId1 := testutil.NewRandCid().String()
82+
fileId2 := testutil.NewRandCid().String()
83+
84+
cids1, err := fx.CidEntriesByBlocks(ctx, bs)
85+
require.NoError(t, err)
86+
require.NoError(t, fx.FileBind(ctx, key, fileId1, cids1))
87+
cids1.Release()
88+
89+
cids2, err := fx.CidEntriesByBlocks(ctx, bs[:2])
90+
require.NoError(t, err)
91+
require.NoError(t, fx.FileBind(ctx, key, fileId2, cids2))
92+
cids2.Release()
93+
94+
require.NoError(t, fx.FileUnbind(ctx, key, fileId1, fileId2))
95+
96+
ok, err := fx.MarkSpaceAsDeleted(ctx, key)
97+
require.NoError(t, err)
98+
assert.True(t, ok)
99+
100+
require.NoError(t, fx.cl.Del(ctx, SpaceKey(key)).Err())
101+
102+
ok, err = fx.SpaceDelete(ctx, key)
103+
require.NoError(t, err)
104+
assert.True(t, ok)
105+
106+
groupInfo, err := fx.GroupInfo(ctx, key.GroupId)
107+
require.NoError(t, err)
108+
assert.NotContains(t, groupInfo.SpaceIds, key.SpaceId)
109+
})
66110
}
67111

68112
func TestRedisIndex_MarkSpaceAsDeleted(t *testing.T) {

0 commit comments

Comments
 (0)