Skip to content

Commit 0ffd955

Browse files
authored
Merge pull request #73 from anyproto/GO-6504-payload-collection
GO-6504 save payloads as upsert + don't start stream listener before migration
2 parents abe16b3 + e45979c commit 0ffd955

File tree

4 files changed

+90
-40
lines changed

4 files changed

+90
-40
lines changed

consensus.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
package consensus
22

3-
import "time"
3+
import (
4+
"strings"
5+
"time"
6+
)
47

58
type Log struct {
69
Id string `bson:"_id"`
@@ -15,8 +18,28 @@ type Record struct {
1518
Created time.Time `bson:"created"`
1619
}
1720

21+
func NewPayload(logId, recordId string, payload []byte) Payload {
22+
return Payload{
23+
Id: logId + "/" + recordId,
24+
Payload: payload,
25+
}
26+
}
27+
1828
type Payload struct {
1929
Id string `bson:"_id"`
20-
LogId string `bson:"logId"`
2130
Payload []byte `bson:"payload"`
2231
}
32+
33+
func (p Payload) RecordId() string {
34+
if idx := strings.LastIndex(p.Id, "/"); idx >= 0 {
35+
return p.Id[idx+1:]
36+
}
37+
return ""
38+
}
39+
40+
func (p Payload) LogId() string {
41+
if idx := strings.LastIndex(p.Id, "/"); idx >= 0 {
42+
return p.Id[:idx]
43+
}
44+
return ""
45+
}

consensus_test.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package consensus
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestNewPayload(t *testing.T) {
10+
pl := NewPayload("logId", "recordId", []byte{1, 2, 3})
11+
assert.Equal(t, "logId", pl.LogId())
12+
assert.Equal(t, "recordId", pl.RecordId())
13+
14+
pl = NewPayload("l/ogI/d/", "recordId", []byte{1, 2, 3})
15+
assert.Equal(t, "l/ogI/d/", pl.LogId())
16+
assert.Equal(t, "recordId", pl.RecordId())
17+
}

db/db.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"errors"
77
"fmt"
8+
"regexp"
89
"time"
910

1011
"github.com/anyproto/any-sync/app"
@@ -89,17 +90,18 @@ func (s *service) Run(ctx context.Context) (err error) {
8990
}
9091
s.logColl = s.client.Database(s.conf.Database).Collection(s.conf.LogCollection)
9192
s.running = true
93+
s.settingsColl = s.client.Database(s.conf.Database).Collection(settingsColl)
94+
s.payloadColl = s.client.Database(s.conf.Database).Collection(payloadColl)
95+
96+
if err = s.runMigrations(ctx); err != nil {
97+
return err
98+
}
9299
if s.changeReceiver != nil {
93100
if err = s.runStreamListener(ctx); err != nil {
94101
return err
95102
}
96103
}
97-
s.settingsColl = s.client.Database(s.conf.Database).Collection(settingsColl)
98-
s.payloadColl = s.client.Database(s.conf.Database).Collection(payloadColl)
99-
if _, err = s.payloadColl.Indexes().CreateOne(ctx, mongo.IndexModel{Keys: bson.D{{Key: "logId", Value: 1}}}); err != nil {
100-
return err
101-
}
102-
return s.runMigrations(ctx)
104+
return nil
103105
}
104106

105107
func (s *service) tx(ctx context.Context, f func(txCtx mongo.SessionContext) error) (err error) {
@@ -121,11 +123,7 @@ func (s *service) tx(ctx context.Context, f func(txCtx mongo.SessionContext) err
121123
func (s *service) AddLog(ctx context.Context, l consensus.Log) (err error) {
122124
return s.tx(ctx, func(txCtx mongo.SessionContext) error {
123125
for i, record := range l.Records {
124-
if err := s.savePayload(txCtx, consensus.Payload{
125-
Id: record.Id,
126-
LogId: l.Id,
127-
Payload: record.Payload,
128-
}); err != nil {
126+
if err := s.savePayload(txCtx, consensus.NewPayload(l.Id, record.Id, record.Payload)); err != nil {
129127
return err
130128
}
131129
l.Records[i].Payload = nil
@@ -181,11 +179,7 @@ type updateOp struct {
181179
func (s *service) AddRecord(ctx context.Context, logId string, record consensus.Record) error {
182180
return s.tx(ctx, func(txCtx mongo.SessionContext) (err error) {
183181
// save payload in a separate collection to avoid the one doc size limit
184-
if err = s.savePayload(txCtx, consensus.Payload{
185-
Id: record.Id,
186-
LogId: logId,
187-
Payload: record.Payload,
188-
}); err != nil {
182+
if err = s.savePayload(txCtx, consensus.NewPayload(logId, record.Id, record.Payload)); err != nil {
189183
return err
190184
}
191185

@@ -214,10 +208,12 @@ func (s *service) savePayload(ctx context.Context, payload consensus.Payload) (e
214208
if payload.Payload == nil {
215209
return fmt.Errorf("payload is nil")
216210
}
217-
_, err = s.payloadColl.InsertOne(ctx, payload)
218-
if mongo.IsDuplicateKeyError(err) {
219-
return nil
220-
}
211+
_, err = s.payloadColl.UpdateOne(
212+
ctx,
213+
bson.D{{"_id", payload.Id}},
214+
bson.D{{"$setOnInsert", payload}},
215+
options.Update().SetUpsert(true),
216+
)
221217
return
222218
}
223219

@@ -245,7 +241,8 @@ func (s *service) injectPayloads(ctx context.Context, l consensus.Log, afterReco
245241
}
246242
}
247243

248-
cur, err := s.payloadColl.Find(ctx, bson.D{{"logId", l.Id}})
244+
regExpStr := "^" + regexp.QuoteMeta(l.Id) + "/+"
245+
cur, err := s.payloadColl.Find(ctx, bson.D{{"_id", bson.D{{"$regex", regExpStr}}}})
249246
if err != nil {
250247
return
251248
}
@@ -258,7 +255,7 @@ func (s *service) injectPayloads(ctx context.Context, l consensus.Log, afterReco
258255
if err = cur.Decode(&curRecord); err != nil {
259256
return
260257
}
261-
if idx, ok := payloads[curRecord.Id]; ok {
258+
if idx, ok := payloads[curRecord.RecordId()]; ok {
262259
res.Records[idx].Payload = curRecord.Payload
263260
}
264261
}

db/migration.go

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,35 @@ const (
2020
)
2121

2222
func (s *service) runMigrations(ctx context.Context) (err error) {
23-
return s.tx(ctx, func(txCtx mongo.SessionContext) error {
24-
version, err := s.getDbVersion(txCtx)
23+
var migrate func(ctx context.Context) error
24+
err = s.tx(ctx, func(txCtx mongo.SessionContext) error {
25+
ver, err := s.getDbVersion(txCtx)
2526
if err != nil {
2627
return fmt.Errorf("failed to get db version: %w", err)
2728
}
28-
if version < Version2 {
29-
st := time.Now()
30-
log.Info("migrate", zap.Int("version", version))
31-
err = s.migrateV2(txCtx)
32-
if err != nil {
33-
log.Warn("migration failed", zap.Duration("dur", time.Since(st)), zap.Error(err))
34-
} else {
35-
log.Info("migration done", zap.Duration("dur", time.Since(st)))
36-
}
29+
if ver == Version2 {
30+
return nil
3731
}
38-
if err != nil {
39-
return fmt.Errorf("migration failed: %w", err)
32+
if ver < Version2 {
33+
migrate = s.migrateV2
4034
}
4135
if err = s.setDbVersion(txCtx, Version2); err != nil {
4236
return fmt.Errorf("failed to set db version: %w", err)
4337
}
4438
return nil
4539
})
40+
if migrate != nil {
41+
log.Info("running migration")
42+
st := time.Now()
43+
err = migrate(ctx)
44+
if err == nil {
45+
log.Info("migration done", zap.Duration("dur", time.Since(st)))
46+
} else {
47+
log.Warn("migration failed", zap.Duration("dur", time.Since(st)), zap.Error(err))
48+
return err
49+
}
50+
}
51+
return nil
4652
}
4753

4854
func (s *service) migrateV2(ctx context.Context) (err error) {
@@ -58,11 +64,18 @@ func (s *service) migrateV2(ctx context.Context) (err error) {
5864
if err := cur.Decode(&l); err != nil {
5965
return fmt.Errorf("failed to decode log: %w", err)
6066
}
67+
var updated bool
6168
for i, rec := range l.Records {
62-
if err = s.savePayload(ctx, consensus.Payload{Id: rec.Id, LogId: l.Id, Payload: rec.Payload}); err != nil {
63-
return fmt.Errorf("failed to save payload for record %s: %w", rec.Id, err)
69+
if rec.Payload != nil {
70+
if err = s.savePayload(ctx, consensus.NewPayload(l.Id, rec.Id, rec.Payload)); err != nil {
71+
return fmt.Errorf("failed to save payload for record %s: %w", rec.Id, err)
72+
}
73+
l.Records[i].Payload = nil
74+
updated = true
6475
}
65-
l.Records[i].Payload = nil
76+
}
77+
if !updated {
78+
continue
6679
}
6780
_, err = s.logColl.UpdateOne(ctx, bson.D{{"_id", l.Id}}, bson.D{{"$set", bson.D{{"records", l.Records}}}})
6881
if err != nil {

0 commit comments

Comments
 (0)