Skip to content

Commit 25b49de

Browse files
committed
Report when done
1 parent 4f8b2b8 commit 25b49de

File tree

1 file changed

+165
-125
lines changed

1 file changed

+165
-125
lines changed

service/upgrade_manager.go

Lines changed: 165 additions & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ type UpgradePlan struct {
106106
LastModifiedAt time.Time `json:"last_modified_at"`
107107
Entries []UpgradePlanEntry `json:"entries"`
108108
FinishedEntries []UpgradePlanEntry `json:"finished_entries"`
109+
Finished bool `json:"finished"`
109110
}
110111

111112
// IsReady returns true when all entries have finished.
@@ -127,14 +128,12 @@ func (p UpgradePlan) IsFailed() bool {
127128
type UpgradeEntryType string
128129

129130
const (
130-
UpgradeEntryTypeAgent = "agent"
131-
UpgradeEntryTypeDBServer = "dbserver"
132-
UpgradeEntryTypeCoordinator = "coordinator"
133-
UpgradeEntryTypeSingle = "single"
134-
UpgradeEntryTypeSyncMaster = "syncmaster"
135-
UpgradeEntryTypeSyncWorker = "syncworker"
136-
UpgradeEntryTypeDisableSupervision = "disable-supervision"
137-
UpgradeEntryTypeEnableSupervision = "enable-supervision"
131+
UpgradeEntryTypeAgent = "agent"
132+
UpgradeEntryTypeDBServer = "dbserver"
133+
UpgradeEntryTypeCoordinator = "coordinator"
134+
UpgradeEntryTypeSingle = "single"
135+
UpgradeEntryTypeSyncMaster = "syncmaster"
136+
UpgradeEntryTypeSyncWorker = "syncworker"
138137
)
139138

140139
// UpgradePlanEntry is the JSON structure that describes a single entry
@@ -152,8 +151,6 @@ func (e UpgradePlanEntry) CreateStatusServer(upgradeManagerContext UpgradeManage
152151
config, _, mode := upgradeManagerContext.ClusterConfig()
153152
var serverType ServerType
154153
switch e.Type {
155-
case UpgradeEntryTypeDisableSupervision, UpgradeEntryTypeEnableSupervision:
156-
return nil, nil
157154
case UpgradeEntryTypeAgent:
158155
serverType = ServerTypeAgent
159156
case UpgradeEntryTypeDBServer:
@@ -225,8 +222,6 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, force bool) e
225222
}
226223
}
227224

228-
// Check upgrade rules
229-
230225
// Fetch mode
231226
config, myPeer, mode := m.upgradeManagerContext.ClusterConfig()
232227

@@ -279,11 +274,6 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, force bool) e
279274
plan = UpgradePlan{
280275
CreatedAt: time.Now(),
281276
LastModifiedAt: time.Now(),
282-
Entries: []UpgradePlanEntry{
283-
UpgradePlanEntry{
284-
Type: UpgradeEntryTypeDisableSupervision,
285-
},
286-
},
287277
}
288278
// First add all agents
289279
for _, p := range config.AllPeers {
@@ -355,6 +345,9 @@ func (m *upgradeManager) StartDatabaseUpgrade(ctx context.Context, force bool) e
355345
return errors.Wrap(err, "Failed to write upgrade plan")
356346
}
357347

348+
// Inform user
349+
m.log.Info().Msgf("Created plan to upgrade from %v to %v", runningDBVersions, binaryDBVersions)
350+
358351
// We're done
359352
return nil
360353
}
@@ -588,8 +581,16 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
588581
} else if err != nil {
589582
// Failed to read plan
590583
m.log.Info().Err(err).Msg("Failed to read upgrade plan")
591-
} else if plan.IsReady() || plan.IsFailed() {
592-
// Plan already finished or failed
584+
} else if plan.IsReady() {
585+
// Plan entries have aal been processes
586+
if !plan.Finished {
587+
// Let's show the user that we're done
588+
if err := m.finishUpgradePlan(ctx, plan); err != nil {
589+
m.log.Error().Err(err).Msg("Failed to finish upgrade plan")
590+
}
591+
}
592+
} else if plan.IsFailed() {
593+
// Plan already failed
593594
} else if len(plan.Entries) > 0 {
594595
// Let's inspect the first entry
595596
if err := m.processUpgradePlan(ctx, plan); err != nil {
@@ -612,7 +613,7 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
612613
// it when needed.
613614
func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePlan) error {
614615
_, myPeer, _ := m.upgradeManagerContext.ClusterConfig()
615-
isRunningMaster, isRunning, _ := m.upgradeManagerContext.IsRunningMaster()
616+
_, isRunning, _ := m.upgradeManagerContext.IsRunningMaster()
616617
if !isRunning {
617618
return maskAny(fmt.Errorf("Not in running phase"))
618619
}
@@ -634,58 +635,51 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
634635
}
635636

636637
firstEntry := plan.Entries[0]
638+
// For server entries, we only respond when the peer is ours
639+
if firstEntry.PeerID != myPeer.ID {
640+
return nil
641+
}
642+
// Prepare cleanup
643+
defer func() {
644+
m.upgradeServerType = ""
645+
m.updateNeeded = false
646+
}()
647+
637648
switch firstEntry.Type {
638-
case UpgradeEntryTypeDisableSupervision:
639-
if !isRunningMaster {
640-
// Not for me
641-
return nil
642-
}
643-
if err := m.disableSupervision(ctx); err != nil {
644-
return recordFailure(errors.Wrap(err, "Failed to disable supervision"))
645-
}
646-
case UpgradeEntryTypeEnableSupervision:
647-
if !isRunningMaster {
648-
// Not for me
649-
return nil
650-
}
651-
if err := m.enableSupervision(ctx); err != nil {
652-
return recordFailure(errors.Wrap(err, "Failed to enable supervision"))
653-
}
654-
default:
655-
// For server entries, we only respond when the peer is ours
656-
if firstEntry.PeerID != myPeer.ID {
657-
return nil
649+
case UpgradeEntryTypeAgent:
650+
// Restart the agency in auto-upgrade mode
651+
m.log.Info().Msg("Upgrading agent")
652+
m.upgradeServerType = ServerTypeAgent
653+
m.updateNeeded = true
654+
if err := m.upgradeManagerContext.RestartServer(ServerTypeAgent); err != nil {
655+
return recordFailure(errors.Wrap(err, "Failed to restart agent"))
658656
}
659-
// Prepare cleanup
660-
defer func() {
661-
m.upgradeServerType = ""
662-
m.updateNeeded = false
663-
}()
664-
665-
switch firstEntry.Type {
666-
case UpgradeEntryTypeAgent:
667-
// Restart the agency in auto-upgrade mode
668-
m.log.Info().Msg("Upgrading agent")
669-
m.upgradeServerType = ServerTypeAgent
670-
m.updateNeeded = true
671-
if err := m.upgradeManagerContext.RestartServer(ServerTypeAgent); err != nil {
672-
return recordFailure(errors.Wrap(err, "Failed to restart agent"))
673-
}
674657

675-
// Wait until agency restarted
676-
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
677-
return recordFailure(errors.Wrap(err, "Agent restart in upgrade mode did not succeed"))
678-
}
658+
// Wait until agency restarted
659+
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
660+
return recordFailure(errors.Wrap(err, "Agent restart in upgrade mode did not succeed"))
661+
}
679662

680-
// Wait until agency happy again
681-
if err := m.waitUntil(ctx, m.isAgencyHealth, "Agency is not yet healthy: %v"); err != nil {
682-
return recordFailure(errors.Wrap(err, "Agency is not healthy in time"))
663+
// Wait until agency happy again
664+
if err := m.waitUntil(ctx, m.isAgencyHealth, "Agency is not yet healthy: %v"); err != nil {
665+
return recordFailure(errors.Wrap(err, "Agency is not healthy in time"))
666+
}
667+
case UpgradeEntryTypeDBServer:
668+
// Restart the dbserver in auto-upgrade mode
669+
m.log.Info().Msg("Upgrading dbserver")
670+
m.upgradeServerType = ServerTypeDBServer
671+
m.updateNeeded = true
672+
upgrade := func() error {
673+
m.log.Info().Msg("Disabling supervision")
674+
if err := m.disableSupervision(ctx); err != nil {
675+
return recordFailure(errors.Wrap(err, "Failed to disable supervision"))
683676
}
684-
case UpgradeEntryTypeDBServer:
685-
// Restart the dbserver in auto-upgrade mode
686-
m.log.Info().Msg("Upgrading dbserver")
687-
m.upgradeServerType = ServerTypeDBServer
688-
m.updateNeeded = true
677+
defer func() {
678+
m.log.Info().Msg("Enabling supervision")
679+
if err := m.enableSupervision(ctx); err != nil {
680+
recordFailure(errors.Wrap(err, "Failed to enable supervision"))
681+
}
682+
}()
689683
if err := m.upgradeManagerContext.RestartServer(ServerTypeDBServer); err != nil {
690684
return recordFailure(errors.Wrap(err, "Failed to restart dbserver"))
691685
}
@@ -699,29 +693,45 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
699693
if err := m.waitUntil(ctx, m.areDBServersResponding, "DBServers are not yet all responding: %v"); err != nil {
700694
return recordFailure(errors.Wrap(err, "Not all DBServers are responding in time"))
701695
}
702-
case UpgradeEntryTypeCoordinator:
703-
// Restart the coordinator in auto-upgrade mode
704-
m.log.Info().Msg("Upgrading coordinator")
705-
m.upgradeServerType = ServerTypeCoordinator
706-
m.updateNeeded = true
707-
if err := m.upgradeManagerContext.RestartServer(ServerTypeCoordinator); err != nil {
708-
return recordFailure(errors.Wrap(err, "Failed to restart coordinator"))
709-
}
696+
return nil
697+
}
698+
if err := upgrade(); err != nil {
699+
return maskAny(err)
700+
}
701+
case UpgradeEntryTypeCoordinator:
702+
// Restart the coordinator in auto-upgrade mode
703+
m.log.Info().Msg("Upgrading coordinator")
704+
m.upgradeServerType = ServerTypeCoordinator
705+
m.updateNeeded = true
706+
if err := m.upgradeManagerContext.RestartServer(ServerTypeCoordinator); err != nil {
707+
return recordFailure(errors.Wrap(err, "Failed to restart coordinator"))
708+
}
710709

711-
// Wait until coordinator restarted
712-
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
713-
return recordFailure(errors.Wrap(err, "Coordinator restart in upgrade mode did not succeed"))
714-
}
710+
// Wait until coordinator restarted
711+
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
712+
return recordFailure(errors.Wrap(err, "Coordinator restart in upgrade mode did not succeed"))
713+
}
715714

716-
// Wait until all coordinators respond
717-
if err := m.waitUntil(ctx, m.areCoordinatorsResponding, "Coordinator are not yet all responding: %v"); err != nil {
718-
return recordFailure(errors.Wrap(err, "Not all Coordinators are responding in time"))
715+
// Wait until all coordinators respond
716+
if err := m.waitUntil(ctx, m.areCoordinatorsResponding, "Coordinator are not yet all responding: %v"); err != nil {
717+
return recordFailure(errors.Wrap(err, "Not all Coordinators are responding in time"))
718+
}
719+
case UpgradeEntryTypeSingle:
720+
// Restart the activefailover single server in auto-upgrade mode
721+
m.log.Info().Msg("Upgrading single server")
722+
m.upgradeServerType = ServerTypeResilientSingle
723+
m.updateNeeded = true
724+
upgrade := func() error {
725+
m.log.Info().Msg("Disabling supervision")
726+
if err := m.disableSupervision(ctx); err != nil {
727+
return recordFailure(errors.Wrap(err, "Failed to disable supervision"))
719728
}
720-
case UpgradeEntryTypeSingle:
721-
// Restart the activefailover single server in auto-upgrade mode
722-
m.log.Info().Msg("Upgrading single server")
723-
m.upgradeServerType = ServerTypeResilientSingle
724-
m.updateNeeded = true
729+
defer func() {
730+
m.log.Info().Msg("Enabling supervision")
731+
if err := m.enableSupervision(ctx); err != nil {
732+
recordFailure(errors.Wrap(err, "Failed to enable supervision"))
733+
}
734+
}()
725735
if err := m.upgradeManagerContext.RestartServer(ServerTypeResilientSingle); err != nil {
726736
return recordFailure(errors.Wrap(err, "Failed to restart single server"))
727737
}
@@ -735,49 +745,53 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
735745
if err := m.waitUntil(ctx, m.areSingleServersResponding, "Active failover single server is not yet responding: %v"); err != nil {
736746
return recordFailure(errors.Wrap(err, "Not all single servers are responding in time"))
737747
}
738-
case UpgradeEntryTypeSyncMaster:
739-
// Restart the syncmaster
740-
m.log.Info().Msg("Restarting syncmaster")
741-
m.upgradeServerType = ""
742-
m.updateNeeded = false
743-
if err := m.upgradeManagerContext.RestartServer(ServerTypeSyncMaster); err != nil {
744-
return recordFailure(errors.Wrap(err, "Failed to restart syncmaster"))
745-
}
748+
return nil
749+
}
750+
if err := upgrade(); err != nil {
751+
return maskAny(err)
752+
}
753+
case UpgradeEntryTypeSyncMaster:
754+
// Restart the syncmaster
755+
m.log.Info().Msg("Restarting syncmaster")
756+
m.upgradeServerType = ""
757+
m.updateNeeded = false
758+
if err := m.upgradeManagerContext.RestartServer(ServerTypeSyncMaster); err != nil {
759+
return recordFailure(errors.Wrap(err, "Failed to restart syncmaster"))
760+
}
746761

747-
// Wait until syncmaster restarted
748-
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
749-
return recordFailure(errors.Wrap(err, "Syncmaster restart in upgrade mode did not succeed"))
750-
}
762+
// Wait until syncmaster restarted
763+
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
764+
return recordFailure(errors.Wrap(err, "Syncmaster restart in upgrade mode did not succeed"))
765+
}
751766

752-
// Wait until syncmaster 'up'
753-
address := myPeer.Address
754-
port := myPeer.Port + myPeer.PortOffset + ServerType(ServerTypeSyncMaster).PortOffset()
755-
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncMaster, address, port, nil); !up {
756-
return recordFailure(fmt.Errorf("Syncmaster is not up in time"))
757-
}
758-
case UpgradeEntryTypeSyncWorker:
759-
// Restart the syncworker
760-
m.log.Info().Msg("Restarting syncworker")
761-
m.upgradeServerType = ""
762-
m.updateNeeded = false
763-
if err := m.upgradeManagerContext.RestartServer(ServerTypeSyncWorker); err != nil {
764-
return recordFailure(errors.Wrap(err, "Failed to restart syncworker"))
765-
}
767+
// Wait until syncmaster 'up'
768+
address := myPeer.Address
769+
port := myPeer.Port + myPeer.PortOffset + ServerType(ServerTypeSyncMaster).PortOffset()
770+
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncMaster, address, port, nil); !up {
771+
return recordFailure(fmt.Errorf("Syncmaster is not up in time"))
772+
}
773+
case UpgradeEntryTypeSyncWorker:
774+
// Restart the syncworker
775+
m.log.Info().Msg("Restarting syncworker")
776+
m.upgradeServerType = ""
777+
m.updateNeeded = false
778+
if err := m.upgradeManagerContext.RestartServer(ServerTypeSyncWorker); err != nil {
779+
return recordFailure(errors.Wrap(err, "Failed to restart syncworker"))
780+
}
766781

767-
// Wait until syncworker restarted
768-
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
769-
return recordFailure(errors.Wrap(err, "Syncworker restart in upgrade mode did not succeed"))
770-
}
782+
// Wait until syncworker restarted
783+
if err := m.waitUntilUpgradeServerStarted(ctx); err != nil {
784+
return recordFailure(errors.Wrap(err, "Syncworker restart in upgrade mode did not succeed"))
785+
}
771786

772-
// Wait until syncworker 'up'
773-
address := myPeer.Address
774-
port := myPeer.Port + myPeer.PortOffset + ServerType(ServerTypeSyncWorker).PortOffset()
775-
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncWorker, address, port, nil); !up {
776-
return recordFailure(fmt.Errorf("Syncworker is not up in time"))
777-
}
778-
default:
779-
return maskAny(fmt.Errorf("Unsupported upgrade plan entry type '%s'", firstEntry.Type))
787+
// Wait until syncworker 'up'
788+
address := myPeer.Address
789+
port := myPeer.Port + myPeer.PortOffset + ServerType(ServerTypeSyncWorker).PortOffset()
790+
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncWorker, address, port, nil); !up {
791+
return recordFailure(fmt.Errorf("Syncworker is not up in time"))
780792
}
793+
default:
794+
return maskAny(fmt.Errorf("Unsupported upgrade plan entry type '%s'", firstEntry.Type))
781795
}
782796

783797
// Move first entry to finished entries
@@ -792,6 +806,32 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
792806
return nil
793807
}
794808

809+
// finishUpgradePlan is called at the end of the upgrade process.
810+
// It shows the user that everything is ready & what versions we have now.
811+
func (m *upgradeManager) finishUpgradePlan(ctx context.Context, plan UpgradePlan) error {
812+
isRunningAsMaster, isRunning, _ := m.upgradeManagerContext.IsRunningMaster()
813+
if !isRunning {
814+
return maskAny(fmt.Errorf("Not in running phase"))
815+
} else if !isRunningAsMaster {
816+
return nil
817+
}
818+
if _, err := m.ShowArangodServerVersions(ctx); err != nil {
819+
return maskAny(err)
820+
}
821+
822+
// Save plan
823+
overwrite := false
824+
plan.Finished = true
825+
if _, err := m.writeUpgradePlan(ctx, plan, overwrite); err != nil {
826+
return maskAny(err)
827+
}
828+
829+
// Inform user that we're done
830+
m.log.Info().Msg("Upgrade plan has finished successfully")
831+
832+
return nil
833+
}
834+
795835
// runSingleServerUpgradeProcess runs the entire upgrade process of a single server until it is finished.
796836
func (m *upgradeManager) runSingleServerUpgradeProcess(ctx context.Context, myPeer *Peer, mode ServiceMode) {
797837
// Unlock when we're done

0 commit comments

Comments
 (0)