Skip to content

Commit ed3932c

Browse files
committed
migrate: re-run post-step callbacks on error
This commit modifies the migration framework to re-attempt post-step callbacks if they error during a migration. Previously, if a post-step callbacks failed, but their associated SQL migration succeeded, the post-step callback would not be re-attempted on the next migration run, and instead proceed with the next SQL migration. This is achieved by introducing the concept of a "post-step callback" migration version. Post-step callbacks are their corresponding SQL migration version offset by +1000000000. During the execution of a post-step callback, the post-step callback migration version will be persisted as the database version. That way, if the post-step callback errors, the version for the database will be the post-step callback version on the next startup. The post-step callback will then be re-attempted before proceeding with the next SQL migration.
1 parent 9bb14a9 commit ed3932c

File tree

2 files changed

+265
-17
lines changed

2 files changed

+265
-17
lines changed

migrate.go

Lines changed: 246 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,31 @@ var (
3636
ErrLockTimeout = errors.New("timeout: can't acquire database lock")
3737
)
3838

39+
// PostStepCallbackOffset is a sentinel offset added to a normal SQL migration
40+
// version. During the post-step callback for version V, the database version is
41+
// persisted as (V + PostStepCallbackOffset). This enables the migrate package
42+
// to detect and re-run a post-step callback which errored after the
43+
// corresponding SQL migration was applied successfully.
44+
// Note that only the post-step callback is re-run, the SQL migration is not
45+
// re-applied.
46+
//
47+
// If the persisted version is >= PostStepCallbackOffset and not `dirty`, then
48+
// the post-step callback for (version - PostStepCallbackOffset) will be re-run
49+
// on the next migration run.
50+
// If the persisted version is `dirty`, manual intervention is required, as it's
51+
// not possible by the migration framework to determine whether post-step
52+
// callback actually was executed successfully or not during the last execution
53+
// attempt.
54+
//
55+
// NOTE:
56+
// Changing this value is a breaking change for any database that currently
57+
// has a post-step phase recorded. Do not change it unless you also provide
58+
// a safe transition strategy.
59+
// Also note that no SQL migration can use a version >= PostStepCallbackOffset.
60+
// Such versions are reserved for the post-step callback phase, and any SQL
61+
// migrations with such versions will cause an error.
62+
const PostStepCallbackOffset = 1000000000
63+
3964
// ErrShortLimit is an error returned when not enough migrations
4065
// can be returned by a source for a given limit.
4166
type ErrShortLimit struct {
@@ -296,6 +321,23 @@ func (m *Migrate) Migrate(version uint) error {
296321
return m.unlockErr(ErrDirty{curVersion})
297322
}
298323

324+
// If the current version is a clean post-step callback version, then
325+
// we need to rerun the post-step callback for the previous version
326+
// before we can continue with any SQL migration(s).
327+
if IsPostStepCallbackVersion(curVersion) {
328+
sqlMigVersion := SQLMigrationVersion(curVersion)
329+
330+
err := m.executePostStepCallbackForSQLMig(sqlMigVersion)
331+
if err != nil {
332+
return m.unlockErr(err)
333+
}
334+
335+
curVersion, dirty, err = m.databaseDrv.Version()
336+
if err != nil {
337+
return m.unlockErr(err)
338+
}
339+
}
340+
299341
ret := make(chan interface{}, m.PrefetchMigrations)
300342
go m.read(curVersion, int(version), ret)
301343

@@ -322,6 +364,23 @@ func (m *Migrate) Steps(n int) error {
322364
return m.unlockErr(ErrDirty{curVersion})
323365
}
324366

367+
// If the current version is a clean post-step callback version, then
368+
// we need to rerun the post-step callback for the previous version
369+
// before we can continue with any SQL migration(s).
370+
if IsPostStepCallbackVersion(curVersion) {
371+
sqlMigVersion := SQLMigrationVersion(curVersion)
372+
373+
err := m.executePostStepCallbackForSQLMig(sqlMigVersion)
374+
if err != nil {
375+
return m.unlockErr(err)
376+
}
377+
378+
curVersion, dirty, err = m.databaseDrv.Version()
379+
if err != nil {
380+
return m.unlockErr(err)
381+
}
382+
}
383+
325384
ret := make(chan interface{}, m.PrefetchMigrations)
326385

327386
if n > 0 {
@@ -349,6 +408,23 @@ func (m *Migrate) Up() error {
349408
return m.unlockErr(ErrDirty{curVersion})
350409
}
351410

411+
// If the current version is a clean post-step callback version, then
412+
// we need to rerun the post-step callback for the previous version
413+
// before we can continue with any SQL migration(s).
414+
if IsPostStepCallbackVersion(curVersion) {
415+
sqlMigVersion := SQLMigrationVersion(curVersion)
416+
417+
err := m.executePostStepCallbackForSQLMig(sqlMigVersion)
418+
if err != nil {
419+
return m.unlockErr(err)
420+
}
421+
422+
curVersion, dirty, err = m.databaseDrv.Version()
423+
if err != nil {
424+
return m.unlockErr(err)
425+
}
426+
}
427+
352428
ret := make(chan interface{}, m.PrefetchMigrations)
353429

354430
go m.readUp(curVersion, -1, ret)
@@ -371,6 +447,23 @@ func (m *Migrate) Down() error {
371447
return m.unlockErr(ErrDirty{curVersion})
372448
}
373449

450+
// If the current version is a clean post-step callback version, then
451+
// we need to rerun the post-step callback for the previous version
452+
// before we can continue with any SQL migration(s).
453+
if IsPostStepCallbackVersion(curVersion) {
454+
sqlMigVersion := SQLMigrationVersion(curVersion)
455+
456+
err := m.executePostStepCallbackForSQLMig(sqlMigVersion)
457+
if err != nil {
458+
return m.unlockErr(err)
459+
}
460+
461+
curVersion, dirty, err = m.databaseDrv.Version()
462+
if err != nil {
463+
return m.unlockErr(err)
464+
}
465+
}
466+
374467
ret := make(chan interface{}, m.PrefetchMigrations)
375468
go m.readDown(curVersion, -1, ret)
376469
return m.unlockErr(m.runMigrations(ret))
@@ -409,6 +502,23 @@ func (m *Migrate) Run(migration ...*Migration) error {
409502
return m.unlockErr(ErrDirty{curVersion})
410503
}
411504

505+
// If the current version is a clean post step callback version, then
506+
// we need to rerun the post step callback for the previous version
507+
// before we can continue with any SQL migration(s).
508+
if IsPostStepCallbackVersion(curVersion) {
509+
sqlMigVersion := SQLMigrationVersion(curVersion)
510+
511+
err := m.executePostStepCallbackForSQLMig(sqlMigVersion)
512+
if err != nil {
513+
return m.unlockErr(err)
514+
}
515+
516+
curVersion, dirty, err = m.databaseDrv.Version()
517+
if err != nil {
518+
return m.unlockErr(err)
519+
}
520+
}
521+
412522
ret := make(chan interface{}, m.PrefetchMigrations)
413523

414524
go func() {
@@ -787,6 +897,30 @@ func (m *Migrate) readDown(from int, limit int, ret chan<- interface{}) {
787897
}
788898
}
789899

900+
// readSingle reads a single migration for the given version, and sends it
901+
// over the passed channel.
902+
func (m *Migrate) readSingle(ver uint, ret chan<- interface{}) {
903+
defer close(ret)
904+
905+
if err := m.versionExists(ver); err != nil {
906+
ret <- err
907+
return
908+
}
909+
910+
migr, err := m.newMigration(ver, int(ver))
911+
if err != nil {
912+
ret <- err
913+
return
914+
}
915+
916+
ret <- migr
917+
go func() {
918+
if err := migr.Buffer(); err != nil {
919+
m.logErr(err)
920+
}
921+
}()
922+
}
923+
790924
// runMigrations reads *Migration and error from a channel. Any other type
791925
// sent on this channel will result in a panic. Each migration is then
792926
// proxied to the database driver and run against the database.
@@ -807,6 +941,12 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
807941
case *Migration:
808942
migr := r
809943

944+
if migr.Version >= PostStepCallbackOffset {
945+
return fmt.Errorf("migration version %v is "+
946+
"invalid, must be < %v", migr.Version,
947+
PostStepCallbackOffset)
948+
}
949+
810950
// set version with dirty state
811951
if err := m.databaseDrv.SetVersion(migr.TargetVersion, true); err != nil {
812952
return err
@@ -818,23 +958,9 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
818958
return err
819959
}
820960

821-
// If there is a post execution function for
822-
// this migration, run it now.
823-
cb, ok := m.opts.postStepCallbacks[migr.Version]
824-
if ok {
825-
m.logVerbosePrintf("Running post step "+
826-
"callback for %v\n", migr.LogString())
827-
828-
err := cb(migr, m.databaseDrv)
829-
if err != nil {
830-
return fmt.Errorf("failed to "+
831-
"execute post "+
832-
"step callback: %w",
833-
err)
834-
}
835-
836-
m.logVerbosePrintf("Post step callback "+
837-
"finished for %v\n", migr.LogString())
961+
err := m.executePostStepCallback(migr)
962+
if err != nil {
963+
return err
838964
}
839965
}
840966

@@ -863,6 +989,109 @@ func (m *Migrate) runMigrations(ret <-chan interface{}) error {
863989
return nil
864990
}
865991

992+
// executePostStepCallback checks if a post-step callback exists for the passed
993+
// migration and proceeds to execute if one exists.
994+
func (m *Migrate) executePostStepCallback(migr *Migration) error {
995+
cb, ok := m.opts.postStepCallbacks[migr.Version]
996+
if ok {
997+
m.logVerbosePrintf("Running post step callback for %v\n",
998+
migr.LogString())
999+
1000+
postStepVersion := int(migr.Version) + PostStepCallbackOffset
1001+
1002+
// Persist that we are in the post-step phase for this version.
1003+
if err := m.databaseDrv.SetVersion(postStepVersion, true); err != nil {
1004+
return err
1005+
}
1006+
1007+
err := cb(migr, m.databaseDrv)
1008+
if err != nil {
1009+
// Mark the database version as the postStepVersion but
1010+
// in a clean state, to indicate that the post-step
1011+
// callback errored. We will therefore re-run the
1012+
// post-step callback on the next migration run.
1013+
if setErr := m.databaseDrv.SetVersion(postStepVersion, false); setErr != nil {
1014+
// Note that if we error here, the database
1015+
// version will remain in a dirty state. As we
1016+
// cannot know if the post-step callback was
1017+
// executed or not in that scenario, manual
1018+
// intervention is required.
1019+
return fmt.Errorf("WARNING, failed to set "+
1020+
"migration version after post "+
1021+
"migration step errored. Manual "+
1022+
"intervention needed! Post migration "+
1023+
"error: %w, version setting error : %w",
1024+
err, setErr)
1025+
}
1026+
1027+
return fmt.Errorf("failed to execute post step "+
1028+
"callback: %w", err)
1029+
}
1030+
1031+
m.logVerbosePrintf("Post step callback finished for %v\n",
1032+
migr.LogString())
1033+
}
1034+
1035+
return nil
1036+
}
1037+
1038+
// executePostStepCallbackForSQLMig executes only the post-step callback for the
1039+
// passed SQL migration version.
1040+
// The function can be used to re-execute the post-step callback for a SQL
1041+
// migration version where the SQL migration was successfully applied, but where
1042+
// the post-step callback failed.
1043+
func (m *Migrate) executePostStepCallbackForSQLMig(sqlMigVersion int) error {
1044+
var (
1045+
r interface{}
1046+
migRet = make(chan interface{}, m.PrefetchMigrations)
1047+
err error
1048+
)
1049+
1050+
// Fetch the migration for the specified SQL migration version.
1051+
go m.readSingle(uint(sqlMigVersion), migRet)
1052+
1053+
select {
1054+
case r = <-migRet:
1055+
case <-time.After(30 * time.Second):
1056+
return fmt.Errorf("timeout waiting for single migration "+
1057+
"version %v", sqlMigVersion)
1058+
}
1059+
1060+
if m.stop() {
1061+
return nil
1062+
}
1063+
1064+
switch r := r.(type) {
1065+
case *Migration:
1066+
// If the migration was found, execute the post step callback.
1067+
migr := r
1068+
1069+
err = m.executePostStepCallback(migr)
1070+
if err != nil {
1071+
return err
1072+
}
1073+
1074+
m.logVerbosePrintf("successfully re-executed post step "+
1075+
"callback for SQL migration version: %v\n",
1076+
sqlMigVersion)
1077+
1078+
// set clean state
1079+
if err = m.databaseDrv.SetVersion(migr.TargetVersion, false); err != nil {
1080+
return err
1081+
}
1082+
1083+
return nil
1084+
1085+
case error:
1086+
return fmt.Errorf("reading SQL migration at version "+
1087+
"%v failed: %w", sqlMigVersion, r)
1088+
1089+
default:
1090+
return fmt.Errorf("unknown type: %T when reading "+
1091+
"single migration", r)
1092+
}
1093+
}
1094+
8661095
// versionExists checks the source if either the up or down migration for
8671096
// the specified migration version exists.
8681097
func (m *Migrate) versionExists(version uint) (result error) {

util.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,3 +59,22 @@ func FilterCustomQuery(u *nurl.URL) *nurl.URL {
5959
ux.RawQuery = vx.Encode()
6060
return &ux
6161
}
62+
63+
// IsPostStepCallbackVersion returns true if the passed version is a post-step
64+
// callback version.
65+
func IsPostStepCallbackVersion(version int) bool {
66+
return version >= PostStepCallbackOffset
67+
}
68+
69+
// SQLMigrationVersion returns the corresponding SQL migration version for the
70+
// given version. If the version passed is a post-step callback version, the
71+
// function will return the version for the corresponding SQL migration.
72+
// If the version passed already is a SQL migration version, the function will
73+
// return the passed version as is.
74+
func SQLMigrationVersion(version int) int {
75+
if IsPostStepCallbackVersion(version) {
76+
return version - PostStepCallbackOffset
77+
}
78+
79+
return version
80+
}

0 commit comments

Comments
 (0)