Skip to content

Commit 59188a7

Browse files
committed
deletion log: handle ownership transfer changes
1 parent 147cb75 commit 59188a7

File tree

4 files changed

+59
-26
lines changed

4 files changed

+59
-26
lines changed

nodespace/spacedeleter/spacedeleter.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,8 @@ func (s *spaceDeleter) delete(ctx context.Context) (err error) {
8787
case <-ctx.Done():
8888
return ctx.Err()
8989
}
90-
lastRecordId, err := s.deletionStorage.LastRecordId(ctx)
91-
if err != nil && !errors.Is(err, nodestorage.ErrNoLastRecordId) {
90+
lastRecordId, err := s.deletionStorage.DeletionLogId(ctx)
91+
if err != nil && !errors.Is(err, nodestorage.ErrNoDeletionLogId) {
9292
return err
9393
}
9494
log.Debug("getting deletion log", zap.Int("limit", logLimit), zap.String("lastRecordId", lastRecordId))
@@ -135,6 +135,12 @@ func (s *spaceDeleter) processDeletionRecord(ctx context.Context, rec *coordinat
135135
if err != nil {
136136
return err
137137
}
138+
case coordinatorproto.DeletionLogRecordStatus_OwnershipChange:
139+
log.Debug("received ownership change record")
140+
err := s.deletionStorage.SetDeletionLogId(ctx, rec.Id)
141+
if err != nil {
142+
return err
143+
}
138144
}
139145
return nil
140146
}

nodespace/spacedeleter/spacedeleter_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func TestSpaceDeleter_Run_Ok(t *testing.T) {
4242
close(fx.waiterChan)
4343
<-fx.deleter.testChan
4444

45-
id, err := fx.storage.IndexStorage().LastRecordId(ctx)
45+
id, err := fx.storage.IndexStorage().DeletionLogId(ctx)
4646
require.NoError(t, err)
4747
require.Equal(t, lg[2].Id, id)
4848
store, err = fx.storage.WaitSpaceStorage(ctx, payload.SpaceHeaderWithId.Id)
@@ -72,7 +72,7 @@ func TestSpaceDeleter_Run_Ok_NewPush(t *testing.T) {
7272
close(fx.waiterChan)
7373
<-fx.deleter.testChan
7474

75-
id, err := fx.storage.IndexStorage().LastRecordId(ctx)
75+
id, err := fx.storage.IndexStorage().DeletionLogId(ctx)
7676
require.NoError(t, err)
7777
require.Equal(t, lg[3].Id, id)
7878
status, err := fx.storage.IndexStorage().SpaceStatus(ctx, payload.SpaceHeaderWithId.Id)
@@ -90,7 +90,7 @@ func TestSpaceDeleter_Run_Ok_NoStorage(t *testing.T) {
9090
close(fx.waiterChan)
9191
<-fx.deleter.testChan
9292

93-
id, err := fx.storage.IndexStorage().LastRecordId(ctx)
93+
id, err := fx.storage.IndexStorage().DeletionLogId(ctx)
9494
require.NoError(t, err)
9595
require.Equal(t, lg[2].Id, id)
9696
status, err := fx.storage.IndexStorage().SpaceStatus(ctx, "space3")
@@ -130,7 +130,7 @@ func TestSpaceDeleter_Run_Ok_EmptyStorage(t *testing.T) {
130130
close(fx.waiterChan)
131131
<-fx.deleter.testChan
132132

133-
id, err := fx.storage.IndexStorage().LastRecordId(ctx)
133+
id, err := fx.storage.IndexStorage().DeletionLogId(ctx)
134134
require.NoError(t, err)
135135
require.Equal(t, lg[2].Id, id)
136136
store, err = fx.storage.WaitSpaceStorage(context.Background(), payload.SpaceHeaderWithId.Id)

nodestorage/indexstorage.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,8 @@ const (
2626
)
2727

2828
var (
29-
ErrUnknownSpaceId = errors.New("unknown space id")
30-
ErrNoLastRecordId = errors.New("no last record id")
29+
ErrUnknownSpaceId = errors.New("unknown space id")
30+
ErrNoDeletionLogId = errors.New("no last record id")
3131
)
3232

3333
const (
@@ -57,7 +57,8 @@ type IndexStorage interface {
5757
SpaceStatus(ctx context.Context, spaceId string) (status SpaceStatus, err error)
5858
MarkArchived(ctx context.Context, spaceId string, compressedSize, uncompressedSize int64) (err error)
5959
MarkError(ctx context.Context, spaceId string, errString string) (err error)
60-
LastRecordId(ctx context.Context) (id string, err error)
60+
DeletionLogId(ctx context.Context) (id string, err error)
61+
SetDeletionLogId(ctx context.Context, id string) (err error)
6162
FindOldestInactiveSpace(ctx context.Context, olderThan time.Duration, skip int) (spaceId string, err error)
6263

6364
UpdateLastAccess(ctx context.Context, spaceId string) (err error)
@@ -205,17 +206,29 @@ func (d *indexStorage) MarkArchived(ctx context.Context, spaceId string, compres
205206
return err
206207
}
207208

208-
func (d *indexStorage) LastRecordId(ctx context.Context) (id string, err error) {
209+
func (d *indexStorage) DeletionLogId(ctx context.Context) (id string, err error) {
209210
doc, err := d.settingsColl.FindId(ctx, lastDeletionIdKey)
210211
if err != nil {
211212
if errors.Is(err, anystore.ErrDocNotFound) {
212-
return "", ErrNoLastRecordId
213+
return "", ErrNoDeletionLogId
213214
}
214215
return "", err
215216
}
216217
return doc.Value().GetString(valueKey), nil
217218
}
218219

220+
func (d *indexStorage) SetDeletionLogId(ctx context.Context, id string) (err error) {
221+
_, err = d.settingsColl.UpsertId(ctx, lastDeletionIdKey, query.ModifyFunc(func(a *anyenc.Arena, v *anyenc.Value) (result *anyenc.Value, modified bool, err error) {
222+
prevKey := v.GetString(valueKey)
223+
if prevKey < id {
224+
v.Set(valueKey, a.NewString(id))
225+
return v, true, nil
226+
}
227+
return v, false, nil
228+
}))
229+
return
230+
}
231+
219232
func (d *indexStorage) UpdateLastAccess(ctx context.Context, spaceId string) (err error) {
220233
now := time.Now()
221234
if val, ok := d.lastAccessCache.Load(spaceId); ok {

nodestorage/mock_nodestorage/mock_nodestorage.go

Lines changed: 29 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)