Skip to content

Commit 89ce034

Browse files
authored
Merge pull request #185 from anyproto/GO-6240-ownership-transfer
GO-6240 ownership transfer
2 parents 403b413 + 4e26433 commit 89ce034

28 files changed

+2740
-1926
lines changed

Makefile

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
.PHONY: build test deps build-dev
22
SHELL=/usr/bin/env bash
33
export GOPRIVATE=github.com/anyproto
4-
export PATH:=deps:$(PATH)
4+
export PATH:=$(CURDIR)/deps:$(PATH)
55
export CGO_ENABLED:=1
66
BUILD_GOOS:=$(shell go env GOOS)
77
BUILD_GOARCH:=$(shell go env GOARCH)
@@ -25,8 +25,27 @@ test:
2525

2626
deps:
2727
go mod download
28+
go build -o deps google.golang.org/protobuf/cmd/protoc-gen-go
29+
go build -o deps github.com/planetscale/vtprotobuf/cmd/protoc-gen-go-vtproto
2830
go build -o deps github.com/ahmetb/govvv
29-
go build -o deps github.com/gogo/protobuf/protoc-gen-gogofaster
31+
go build -o deps go.uber.org/mock/mockgen
32+
33+
mocks:
34+
echo 'Generating mocks...'
35+
go generate ./...
36+
37+
PROTOC=protoc
38+
PROTOC_GEN_GO=deps/protoc-gen-go
39+
PROTOC_GEN_VTPROTO=deps/protoc-gen-go-vtproto
40+
41+
define generate_proto
42+
@echo "Generating Protobuf for directory: $(1)"
43+
$(PROTOC) \
44+
--go_out=. --plugin protoc-gen-go="$(PROTOC_GEN_GO)" \
45+
--go-vtproto_out=. --plugin protoc-gen-go-vtproto="$(PROTOC_GEN_VTPROTO)" \
46+
--go-vtproto_opt=features=marshal+unmarshal+size \
47+
--proto_path=$(1) $(wildcard $(1)/*.proto)
48+
endef
3049

3150
proto:
32-
protoc --gogofaster_out=:. index/indexproto/protos/*.proto
51+
$(call generate_proto,index/indexproto/protos)

deletelog/deletelog.go

Lines changed: 38 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/redis/go-redis/v9"
1616
"go.uber.org/zap"
1717

18+
"github.com/anyproto/any-sync-filenode/filenode"
1819
"github.com/anyproto/any-sync-filenode/index"
1920
"github.com/anyproto/any-sync-filenode/redisprovider"
2021
)
@@ -37,6 +38,7 @@ type deleteLog struct {
3738
redsync *redsync.Redsync
3839
ticker periodicsync.PeriodicSync
3940
index index.Index
41+
filenode filenode.Service
4042
disableTicker bool
4143
}
4244

@@ -45,6 +47,7 @@ func (d *deleteLog) Init(a *app.App) (err error) {
4547
d.coordinatorClient = a.MustComponent(coordinatorclient.CName).(coordinatorclient.CoordinatorClient)
4648
d.redsync = redsync.New(goredis.NewPool(d.redis))
4749
d.index = a.MustComponent(index.CName).(index.Index)
50+
d.filenode = a.MustComponent(filenode.CName).(filenode.Service)
4851
return
4952
}
5053

@@ -79,24 +82,20 @@ func (d *deleteLog) checkLog(ctx context.Context) (err error) {
7982
return
8083
}
8184
var handledCount, deletedCount int
82-
var ok bool
8385
for _, rec := range recs {
84-
if rec.Status == coordinatorproto.DeletionLogRecordStatus_Remove && rec.FileGroup != "" {
85-
key := index.Key{
86-
GroupId: rec.FileGroup,
87-
SpaceId: rec.SpaceId,
88-
}
89-
ok, err = d.index.SpaceDelete(ctx, key)
90-
if err != nil && !errors.Is(err, redis.Nil) && !errors.Is(err, index.ErrSpaceIsDeleted) {
91-
return
92-
}
93-
handledCount++
94-
if _, err = d.index.MarkSpaceAsDeleted(ctx, key); err != nil {
95-
return
96-
}
97-
if ok {
98-
deletedCount++
99-
}
86+
var ok bool
87+
switch rec.Status {
88+
case coordinatorproto.DeletionLogRecordStatus_Remove:
89+
ok, err = d.handleDeletion(ctx, rec)
90+
case coordinatorproto.DeletionLogRecordStatus_OwnershipChange:
91+
err = d.handleOwnershipTransfer(ctx, rec)
92+
}
93+
if err != nil {
94+
return
95+
}
96+
handledCount++
97+
if ok {
98+
deletedCount++
10099
}
101100
if err = d.redis.Set(ctx, lastKey, rec.Id, 0).Err(); err != nil {
102101
return
@@ -111,6 +110,28 @@ func (d *deleteLog) checkLog(ctx context.Context) (err error) {
111110
return
112111
}
113112

113+
func (d *deleteLog) handleDeletion(ctx context.Context, rec *coordinatorproto.DeletionLogRecord) (ok bool, err error) {
114+
if rec.FileGroup == "" {
115+
return
116+
}
117+
key := index.Key{
118+
GroupId: rec.FileGroup,
119+
SpaceId: rec.SpaceId,
120+
}
121+
ok, err = d.index.SpaceDelete(ctx, key)
122+
if err != nil && !errors.Is(err, redis.Nil) && !errors.Is(err, index.ErrSpaceIsDeleted) {
123+
return
124+
}
125+
if _, err = d.index.MarkSpaceAsDeleted(ctx, key); err != nil {
126+
return
127+
}
128+
return ok, nil
129+
}
130+
131+
func (d *deleteLog) handleOwnershipTransfer(ctx context.Context, rec *coordinatorproto.DeletionLogRecord) (err error) {
132+
return d.filenode.OwnershipTransfer(ctx, rec.SpaceId, rec.FileGroup, rec.AclRecordId)
133+
}
134+
114135
func (d *deleteLog) Close(ctx context.Context) (err error) {
115136
if d.ticker != nil {
116137
d.ticker.Close()

deletelog/deletelog_test.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ import (
1313
"github.com/stretchr/testify/require"
1414
"go.uber.org/mock/gomock"
1515

16+
"github.com/anyproto/any-sync-filenode/filenode"
17+
"github.com/anyproto/any-sync-filenode/filenode/mock_filenode"
1618
"github.com/anyproto/any-sync-filenode/index"
1719
"github.com/anyproto/any-sync-filenode/index/mock_index"
1820
"github.com/anyproto/any-sync-filenode/redisprovider/testredisprovider"
@@ -59,6 +61,26 @@ func TestDeleteLog_checkLog(t *testing.T) {
5961
assert.Equal(t, "2", lastId)
6062
})
6163

64+
t.Run("ownership change", func(t *testing.T) {
65+
fx := newFixture(t)
66+
defer fx.finish(t)
67+
now := time.Now().Unix()
68+
fx.coord.EXPECT().DeletionLog(ctx, "", recordsLimit).Return([]*coordinatorproto.DeletionLogRecord{
69+
{
70+
Id: "1",
71+
SpaceId: "s1",
72+
Status: coordinatorproto.DeletionLogRecordStatus_OwnershipChange,
73+
Timestamp: now,
74+
FileGroup: "f1",
75+
AclRecordId: "acl1",
76+
},
77+
}, nil)
78+
fx.filenode.EXPECT().OwnershipTransfer(ctx, "s1", "f1", "acl1").Return(nil)
79+
require.NoError(t, fx.checkLog(ctx))
80+
lastId, err := fx.redis.Get(ctx, lastKey).Result()
81+
require.NoError(t, err)
82+
assert.Equal(t, "1", lastId)
83+
})
6284
}
6385

6486
func newFixture(t *testing.T) *fixture {
@@ -68,6 +90,7 @@ func newFixture(t *testing.T) *fixture {
6890
a: new(app.App),
6991
coord: mock_coordinatorclient.NewMockCoordinatorClient(ctrl),
7092
index: mock_index.NewMockIndex(ctrl),
93+
filenode: mock_filenode.NewMockService(ctrl),
7194
deleteLog: New().(*deleteLog),
7295
}
7396
fx.disableTicker = true
@@ -77,18 +100,25 @@ func newFixture(t *testing.T) *fixture {
77100
fx.index.EXPECT().Init(gomock.Any()).AnyTimes()
78101
fx.index.EXPECT().Run(gomock.Any()).AnyTimes()
79102
fx.index.EXPECT().Close(gomock.Any()).AnyTimes()
103+
fx.filenode.EXPECT().Name().Return(filenode.CName).AnyTimes()
104+
fx.filenode.EXPECT().Init(gomock.Any()).AnyTimes()
80105

81-
fx.a.Register(testredisprovider.NewTestRedisProviderNum(7)).Register(fx.coord).Register(fx.index).Register(fx.deleteLog)
106+
fx.a.Register(testredisprovider.NewTestRedisProviderNum(7)).
107+
Register(fx.coord).
108+
Register(fx.index).
109+
Register(fx.filenode).
110+
Register(fx.deleteLog)
82111
require.NoError(t, fx.a.Start(ctx))
83112

84113
return fx
85114
}
86115

87116
type fixture struct {
88-
ctrl *gomock.Controller
89-
a *app.App
90-
coord *mock_coordinatorclient.MockCoordinatorClient
91-
index *mock_index.MockIndex
117+
ctrl *gomock.Controller
118+
a *app.App
119+
coord *mock_coordinatorclient.MockCoordinatorClient
120+
index *mock_index.MockIndex
121+
filenode *mock_filenode.MockService
92122
*deleteLog
93123
}
94124

etc/any-sync-filenode.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,4 +69,4 @@ network:
6969
networkStorePath: .
7070
networkUpdateIntervalSec: 600
7171
defaultLimit: 1073741824
72-
persistTtl: 300
72+
persistTtl: 1800

filenode/filenode.go

Lines changed: 65 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import (
1717
"github.com/anyproto/any-sync/net/peer"
1818
"github.com/anyproto/any-sync/net/rpc/server"
1919
"github.com/anyproto/any-sync/nodeconf"
20+
"github.com/anyproto/any-sync/util/crypto"
2021
blocks "github.com/ipfs/go-block-format"
2122
"github.com/ipfs/go-cid"
2223
"go.uber.org/zap"
@@ -37,7 +38,9 @@ func New() Service {
3738
return new(fileNode)
3839
}
3940

41+
//go:generate mockgen -destination mock_filenode/mock_filenode.go github.com/anyproto/any-sync-filenode/filenode Service
4042
type Service interface {
43+
OwnershipTransfer(ctx context.Context, spaceId, oldIdentity string, aclRecordId string) (err error)
4144
app.Component
4245
}
4346

@@ -180,13 +183,24 @@ func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit boo
180183
return storageKey, fileprotoerr.ErrForbidden
181184
}
182185

183-
ownerPubKey, err := fn.acl.OwnerPubKey(ctx, spaceId)
184-
if err != nil {
185-
log.WarnCtx(ctx, "acl ownerPubKey error", zap.Error(err))
186-
return storageKey, fileprotoerr.ErrForbidden
187-
}
186+
var (
187+
ownerPubKey crypto.PubKey
188+
ownerRecordIndex int
189+
isOneToOne bool
190+
)
188191

189-
err = fn.acl.ReadState(ctx, spaceId, func(aclState *list.AclState) error {
192+
err = fn.acl.ReadList(ctx, spaceId, func(aclList list.AclList) error {
193+
aclState := aclList.AclState()
194+
var ownerRecordId string
195+
if ownerPubKey, ownerRecordId, err = aclState.OwnerPubKeyWithRecordId(); err != nil {
196+
log.WarnCtx(ctx, "acl ownerPubKey error", zap.Error(err))
197+
return fileprotoerr.ErrForbidden
198+
}
199+
ownerRecordIndex = aclList.GetRecordIndex(ownerRecordId)
200+
if ownerRecordIndex < 0 {
201+
log.ErrorCtx(ctx, "acl ownerRecordIndex not found", zap.String("spaceId", spaceId), zap.String("recordId", ownerRecordId), zap.Error(err))
202+
return fileprotoerr.ErrUnexpected
203+
}
190204
if aclState.IsOneToOne() {
191205
if aclState.Permissions(identity).NoPermissions() {
192206
return fileprotoerr.ErrForbidden
@@ -207,6 +221,7 @@ func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit boo
207221
GroupId: identity.Account(),
208222
SpaceId: newSpaceId,
209223
}
224+
isOneToOne = true
210225
} else {
211226
storageKey = index.Key{
212227
GroupId: ownerPubKey.Account(),
@@ -231,10 +246,19 @@ func (fn *fileNode) StoreKey(ctx context.Context, spaceId string, checkLimit boo
231246
}
232247
}
233248

234-
if e := fn.index.Migrate(ctx, storageKey); e != nil {
235-
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
249+
if !isOneToOne {
250+
if e := fn.index.Migrate(ctx, storageKey); e != nil {
251+
log.WarnCtx(ctx, "space migrate error", zap.String("spaceId", spaceId), zap.Error(e))
252+
}
253+
var oldIdentity string
254+
if ownerRecordIndex == 0 {
255+
oldIdentity = storageKey.GroupId
256+
}
257+
if err = fn.index.CheckAndMoveOwnership(ctx, storageKey, oldIdentity, ownerRecordIndex); err != nil {
258+
log.ErrorCtx(ctx, "check ownership error", zap.String("spaceId", spaceId), zap.Error(err))
259+
return storageKey, fileprotoerr.ErrUnexpected
260+
}
236261
}
237-
238262
if checkLimit {
239263
if err = fn.index.CheckLimits(ctx, storageKey); err != nil {
240264
if errors.Is(err, index.ErrLimitExceed) {
@@ -417,3 +441,35 @@ func (fn *fileNode) FilesGet(ctx context.Context, spaceId string) (fileIds []str
417441
}
418442
return fn.index.FilesList(ctx, storeKey)
419443
}
444+
445+
func (fn *fileNode) OwnershipTransfer(ctx context.Context, spaceId, oldIdentity, aclRecordId string) (err error) {
446+
var (
447+
ownerPubKey crypto.PubKey
448+
ownerRecordIndex int
449+
)
450+
defer func() {
451+
log.InfoCtx(ctx, "ownership transfer", zap.String("spaceId", spaceId), zap.String("recordId", aclRecordId), zap.Error(err))
452+
}()
453+
err = fn.acl.ReadList(ctx, spaceId, func(aclList list.AclList) error {
454+
if !aclList.HasHead(aclRecordId) {
455+
log.WarnCtx(ctx, "ownership transfer error: acl record not found", zap.String("spaceId", spaceId), zap.String("recordId", aclRecordId))
456+
return fileprotoerr.ErrAclRecordNotFound
457+
}
458+
aclState := aclList.AclState()
459+
var ownerRecordId string
460+
if ownerPubKey, ownerRecordId, err = aclState.OwnerPubKeyWithRecordId(); err != nil {
461+
log.WarnCtx(ctx, "acl ownerPubKey error", zap.Error(err))
462+
return fileprotoerr.ErrForbidden
463+
}
464+
ownerRecordIndex = aclList.GetRecordIndex(ownerRecordId)
465+
if ownerRecordIndex < 0 {
466+
log.ErrorCtx(ctx, "acl ownerRecordIndex not found", zap.String("spaceId", spaceId), zap.String("recordId", ownerRecordId), zap.Error(err))
467+
return fileprotoerr.ErrUnexpected
468+
}
469+
return nil
470+
})
471+
if err != nil {
472+
return
473+
}
474+
return fn.index.CheckAndMoveOwnership(ctx, index.Key{GroupId: ownerPubKey.Account(), SpaceId: spaceId}, oldIdentity, ownerRecordIndex)
475+
}

0 commit comments

Comments
 (0)