@@ -20,29 +20,35 @@ const (
2020)
2121
2222func (s * service ) runMigrations (ctx context.Context ) (err error ) {
23- return s .tx (ctx , func (txCtx mongo.SessionContext ) error {
24- version , err := s .getDbVersion (txCtx )
23+ var migrate func (ctx context.Context ) error
24+ err = s .tx (ctx , func (txCtx mongo.SessionContext ) error {
25+ ver , err := s .getDbVersion (txCtx )
2526 if err != nil {
2627 return fmt .Errorf ("failed to get db version: %w" , err )
2728 }
28- if version < Version2 {
29- st := time .Now ()
30- log .Info ("migrate" , zap .Int ("currentVersion" , version ))
31- err = s .migrateV2 (txCtx )
32- if err != nil {
33- log .Warn ("migration failed" , zap .Duration ("dur" , time .Since (st )), zap .Error (err ))
34- } else {
35- log .Info ("migration done" , zap .Duration ("dur" , time .Since (st )))
36- }
29+ if ver == Version2 {
30+ return nil
3731 }
38- if err != nil {
39- return fmt . Errorf ( "migration failed: %w" , err )
32+ if ver < Version2 {
33+ migrate = s . migrateV2
4034 }
4135 if err = s .setDbVersion (txCtx , Version2 ); err != nil {
4236 return fmt .Errorf ("failed to set db version: %w" , err )
4337 }
4438 return nil
4539 })
40+ if migrate != nil {
41+ log .Info ("running migration" )
42+ st := time .Now ()
43+ err = migrate (ctx )
44+ if err == nil {
45+ log .Info ("migration done" , zap .Duration ("dur" , time .Since (st )))
46+ } else {
47+ log .Warn ("migration failed" , zap .Duration ("dur" , time .Since (st )), zap .Error (err ))
48+ return err
49+ }
50+ }
51+ return nil
4652}
4753
4854func (s * service ) migrateV2 (ctx context.Context ) (err error ) {
@@ -58,11 +64,18 @@ func (s *service) migrateV2(ctx context.Context) (err error) {
5864 if err := cur .Decode (& l ); err != nil {
5965 return fmt .Errorf ("failed to decode log: %w" , err )
6066 }
67+ var updated bool
6168 for i , rec := range l .Records {
62- if err = s .savePayload (ctx , consensus .NewPayload (l .Id , rec .Id , rec .Payload )); err != nil {
63- return fmt .Errorf ("failed to save payload for record %s: %w" , rec .Id , err )
69+ if rec .Payload != nil {
70+ if err = s .savePayload (ctx , consensus .NewPayload (l .Id , rec .Id , rec .Payload )); err != nil {
71+ return fmt .Errorf ("failed to save payload for record %s: %w" , rec .Id , err )
72+ }
73+ l .Records [i ].Payload = nil
74+ updated = true
6475 }
65- l .Records [i ].Payload = nil
76+ }
77+ if ! updated {
78+ continue
6679 }
6780 _ , err = s .logColl .UpdateOne (ctx , bson.D {{"_id" , l .Id }}, bson.D {{"$set" , bson.D {{"records" , l .Records }}}})
6881 if err != nil {
0 commit comments