Skip to content

Commit d3bfd1e

Browse files
committed
save payloads as upsert + don't start stream listener before migration
1 parent 64a0f1e commit d3bfd1e

File tree

2 files changed

+16
-11
lines changed

2 files changed

+16
-11
lines changed

db/db.go

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,17 +89,20 @@ func (s *service) Run(ctx context.Context) (err error) {
8989
}
9090
s.logColl = s.client.Database(s.conf.Database).Collection(s.conf.LogCollection)
9191
s.running = true
92-
if s.changeReceiver != nil {
93-
if err = s.runStreamListener(ctx); err != nil {
94-
return err
95-
}
96-
}
9792
s.settingsColl = s.client.Database(s.conf.Database).Collection(settingsColl)
9893
s.payloadColl = s.client.Database(s.conf.Database).Collection(payloadColl)
9994
if _, err = s.payloadColl.Indexes().CreateOne(ctx, mongo.IndexModel{Keys: bson.D{{Key: "logId", Value: 1}}}); err != nil {
10095
return err
10196
}
102-
return s.runMigrations(ctx)
97+
if err = s.runMigrations(ctx); err != nil {
98+
return err
99+
}
100+
if s.changeReceiver != nil {
101+
if err = s.runStreamListener(ctx); err != nil {
102+
return err
103+
}
104+
}
105+
return nil
103106
}
104107

105108
func (s *service) tx(ctx context.Context, f func(txCtx mongo.SessionContext) error) (err error) {
@@ -214,10 +217,12 @@ func (s *service) savePayload(ctx context.Context, payload consensus.Payload) (e
214217
if payload.Payload == nil {
215218
return fmt.Errorf("payload is nil")
216219
}
217-
_, err = s.payloadColl.InsertOne(ctx, payload)
218-
if mongo.IsDuplicateKeyError(err) {
219-
return nil
220-
}
220+
_, err = s.payloadColl.UpdateOne(
221+
ctx,
222+
bson.D{{"_id", payload.Id}},
223+
bson.D{{"$setOnInsert", payload}},
224+
options.Update().SetUpsert(true),
225+
)
221226
return
222227
}
223228

db/migration.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (s *service) runMigrations(ctx context.Context) (err error) {
2727
}
2828
if version < Version2 {
2929
st := time.Now()
30-
log.Info("migrate", zap.Int("version", version))
30+
log.Info("migrate", zap.Int("currentVersion", version))
3131
err = s.migrateV2(txCtx)
3232
if err != nil {
3333
log.Warn("migration failed", zap.Duration("dur", time.Since(st)), zap.Error(err))

0 commit comments

Comments
 (0)