Skip to content

Commit 44c2479

Browse files
authored
Merge pull request #68 from anyproto/GO-6351-deletion-log
GO-6351 deletion log
2 parents 7000a18 + 0b65b1c commit 44c2479

File tree

12 files changed

+377
-59
lines changed

12 files changed

+377
-59
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
.PHONY: proto build test deps
22
export GOPRIVATE=github.com/anyproto
3-
export PATH:=deps:$(PATH)
3+
export PATH:=$(CURDIR)/deps:$(PATH)
44
BUILD_GOOS:=$(shell go env GOOS)
55
BUILD_GOARCH:=$(shell go env GOARCH)
66

@@ -14,3 +14,8 @@ test:
1414
deps:
1515
go mod download
1616
go build -o deps github.com/ahmetb/govvv
17+
go build -o deps go.uber.org/mock/mockgen
18+
19+
mocks:
20+
echo 'Generating mocks...'
21+
go generate ./...

cmd/consensusnode.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/anyproto/any-sync-consensusnode/config"
3131
"github.com/anyproto/any-sync-consensusnode/consensusrpc"
3232
"github.com/anyproto/any-sync-consensusnode/db"
33+
"github.com/anyproto/any-sync-consensusnode/deletelog"
3334
"github.com/anyproto/any-sync-consensusnode/stream"
3435
// import this to keep govvv in go.mod on mod tidy
3536
_ "github.com/ahmetb/govvv/integration-test/app-different-package/mypkg"
@@ -115,5 +116,6 @@ func Bootstrap(a *app.App) {
115116
Register(server.New()).
116117
Register(db.New()).
117118
Register(stream.New()).
118-
Register(consensusrpc.New())
119+
Register(consensusrpc.New()).
120+
Register(deletelog.New())
119121
}

config/config.go

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package config
22

33
import (
4+
"os"
5+
46
commonaccount "github.com/anyproto/any-sync/accountservice"
57
"github.com/anyproto/any-sync/app"
68
"github.com/anyproto/any-sync/app/logger"
@@ -10,7 +12,9 @@ import (
1012
"github.com/anyproto/any-sync/net/transport/yamux"
1113
"github.com/anyproto/any-sync/nodeconf"
1214
"gopkg.in/yaml.v3"
13-
"os"
15+
16+
"github.com/anyproto/any-sync-consensusnode/db"
17+
"github.com/anyproto/any-sync-consensusnode/deletelog"
1418
)
1519

1620
const CName = "config"
@@ -33,11 +37,12 @@ type Config struct {
3337
Network nodeconf.Configuration `yaml:"network"`
3438
NetworkStorePath string `yaml:"networkStorePath"`
3539
NetworkUpdateIntervalSec int `yaml:"networkUpdateIntervalSec"`
36-
Mongo Mongo `yaml:"mongo"`
40+
Mongo db.Config `yaml:"mongo"`
3741
Metric metric.Config `yaml:"metric"`
3842
Log logger.Config `yaml:"log"`
3943
Yamux yamux.Config `yaml:"yamux"`
4044
Quic quic.Config `yaml:"quic"`
45+
Deletion deletelog.Config `yaml:"deletion"`
4146
}
4247

4348
func (c *Config) Init(a *app.App) (err error) {
@@ -48,7 +53,7 @@ func (c Config) Name() (name string) {
4853
return CName
4954
}
5055

51-
func (c Config) GetMongo() Mongo {
56+
func (c Config) GetMongo() db.Config {
5257
return c.Mongo
5358
}
5459

@@ -83,3 +88,11 @@ func (c Config) GetYamux() yamux.Config {
8388
func (c Config) GetQuic() quic.Config {
8489
return c.Quic
8590
}
91+
92+
func (c Config) GetDeletion() deletelog.Config {
93+
return c.Deletion
94+
}
95+
96+
func (c Config) GetDB() db.Config {
97+
return c.Mongo
98+
}

config/mongo.go renamed to db/config.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
1-
package config
1+
package db
22

3-
type Mongo struct {
3+
type configGetter interface {
4+
GetDB() Config
5+
}
6+
7+
type Config struct {
48
Connect string `yaml:"connect"`
59
Database string `yaml:"database"`
610
LogCollection string `yaml:"logCollection"`

db/db.go

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,26 @@ package db
33

44
import (
55
"context"
6+
"errors"
67
"fmt"
78
"time"
89

910
"github.com/anyproto/any-sync/app"
1011
"github.com/anyproto/any-sync/app/logger"
1112
"github.com/anyproto/any-sync/consensus/consensusproto/consensuserr"
13+
"go.mongodb.org/mongo-driver/bson"
1214
"go.mongodb.org/mongo-driver/mongo"
1315
"go.mongodb.org/mongo-driver/mongo/options"
1416
"go.mongodb.org/mongo-driver/mongo/readpref"
1517
"go.uber.org/zap"
1618

1719
consensus "github.com/anyproto/any-sync-consensusnode"
18-
"github.com/anyproto/any-sync-consensusnode/config"
1920
)
2021

2122
const CName = "consensus.db"
2223

24+
const settingsColl = "settings"
25+
2326
var log = logger.NewNamed(CName)
2427

2528
func New() Service {
@@ -40,12 +43,17 @@ type Service interface {
4043
FetchLog(ctx context.Context, logId string) (log consensus.Log, err error)
4144
// SetChangeReceiver sets the receiver for updates, it must be called before app.Run stage
4245
SetChangeReceiver(receiver ChangeReceiver) (err error)
46+
// SetDeletionId sets the last deleted log id
47+
SetDeletionId(ctx context.Context, lastId string) (err error)
48+
// GetDeletionId gets the last deletion log id
49+
GetDeletionId(ctx context.Context) (lastId string, err error)
4350
app.ComponentRunnable
4451
}
4552

4653
type service struct {
47-
conf config.Mongo
54+
conf Config
4855
logColl *mongo.Collection
56+
settingsColl *mongo.Collection
4957
running bool
5058
changeReceiver ChangeReceiver
5159

@@ -55,7 +63,7 @@ type service struct {
5563
}
5664

5765
func (s *service) Init(a *app.App) (err error) {
58-
s.conf = a.MustComponent(config.CName).(*config.Config).Mongo
66+
s.conf = a.MustComponent("config").(configGetter).GetDB()
5967
return nil
6068
}
6169

@@ -80,6 +88,7 @@ func (s *service) Run(ctx context.Context) (err error) {
8088
return err
8189
}
8290
}
91+
s.settingsColl = client.Database(s.conf.Database).Collection(settingsColl)
8392
return
8493
}
8594

@@ -200,6 +209,23 @@ func (s *service) streamListener(stream *mongo.ChangeStream) {
200209
}
201210
}
202211

212+
func (s *service) SetDeletionId(ctx context.Context, lastId string) (err error) {
213+
updateOpts := options.Update().SetUpsert(true)
214+
_, err = s.settingsColl.UpdateOne(ctx, bson.D{{"_id", "settings"}}, bson.D{{"$set", bson.D{{"logId", lastId}}}}, updateOpts)
215+
return
216+
}
217+
218+
func (s *service) GetDeletionId(ctx context.Context) (lastId string, err error) {
219+
var res struct {
220+
LogId string `bson:"logId"`
221+
}
222+
err = s.settingsColl.FindOne(ctx, bson.D{{"_id", "settings"}}).Decode(&res)
223+
if errors.Is(err, mongo.ErrNoDocuments) {
224+
err = nil
225+
}
226+
return res.LogId, err
227+
}
228+
203229
func (s *service) Close(ctx context.Context) (err error) {
204230
if s.logColl != nil {
205231
err = s.logColl.Database().Client().Disconnect(ctx)

db/db_test.go

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,15 @@ package db
22

33
import (
44
"context"
5-
consensus "github.com/anyproto/any-sync-consensusnode"
6-
"github.com/anyproto/any-sync-consensusnode/config"
5+
"testing"
6+
"time"
7+
78
"github.com/anyproto/any-sync/app"
89
"github.com/anyproto/any-sync/consensus/consensusproto/consensuserr"
910
"github.com/stretchr/testify/assert"
1011
"github.com/stretchr/testify/require"
11-
"testing"
12-
"time"
12+
13+
consensus "github.com/anyproto/any-sync-consensusnode"
1314
)
1415

1516
var ctx = context.Background()
@@ -180,20 +181,33 @@ func TestService_ChangeReceive(t *testing.T) {
180181
})
181182
}
182183

184+
func TestService_SetDeletionId(t *testing.T) {
185+
t.Run("empty", func(t *testing.T) {
186+
fx := newFixture(t, nil)
187+
defer fx.Finish(t)
188+
_ = fx.Service.(*service).settingsColl.Drop(ctx)
189+
recId, err := fx.GetDeletionId(ctx)
190+
assert.Empty(t, recId)
191+
assert.NoError(t, err)
192+
})
193+
t.Run("set get", func(t *testing.T) {
194+
fx := newFixture(t, nil)
195+
defer fx.Finish(t)
196+
require.NoError(t, fx.SetDeletionId(ctx, "123"))
197+
logId, err := fx.GetDeletionId(ctx)
198+
require.NoError(t, err)
199+
assert.Equal(t, "123", logId)
200+
})
201+
}
202+
183203
func newFixture(t *testing.T, cr ChangeReceiver) *fixture {
184204
ctx, cancel := context.WithTimeout(ctx, time.Second)
185205
fx := &fixture{
186206
Service: New(),
187207
cancel: cancel,
188208
a: new(app.App),
189209
}
190-
fx.a.Register(&config.Config{
191-
Mongo: config.Mongo{
192-
Connect: "mongodb://localhost:27017/?w=majority",
193-
Database: "consensus_test",
194-
LogCollection: "log",
195-
},
196-
})
210+
fx.a.Register(&testConfig{})
197211
fx.a.Register(fx.Service)
198212
require.NoError(t, fx.Service.SetChangeReceiver(cr))
199213
err := fx.a.Start(ctx)
@@ -237,3 +251,15 @@ func assertLogValid(t *testing.T, log consensus.Log, count int) {
237251
prevId = rec.PrevId
238252
}
239253
}
254+
255+
type testConfig struct {
256+
Config
257+
}
258+
259+
func (c *testConfig) Init(a *app.App) error { return nil }
260+
261+
func (c *testConfig) Name() string { return "config" }
262+
263+
func (c *testConfig) GetDB() Config {
264+
return Config{Connect: "mongodb://localhost:27017/?w=majority", Database: "consensus_test", LogCollection: "log"}
265+
}

0 commit comments

Comments
 (0)