Skip to content

Commit 7954f88

Browse files
committed
Using callback from agency and less reads
1 parent 642dbf1 commit 7954f88

File tree

3 files changed

+155
-3
lines changed

3 files changed

+155
-3
lines changed

pkg/trigger/trigger.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//
2+
// DISCLAIMER
3+
//
4+
// Copyright 2018 ArangoDB GmbH, Cologne, Germany
5+
//
6+
// Licensed under the Apache License, Version 2.0 (the "License");
7+
// you may not use this file except in compliance with the License.
8+
// You may obtain a copy of the License at
9+
//
10+
// http://www.apache.org/licenses/LICENSE-2.0
11+
//
12+
// Unless required by applicable law or agreed to in writing, software
13+
// distributed under the License is distributed on an "AS IS" BASIS,
14+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
// See the License for the specific language governing permissions and
16+
// limitations under the License.
17+
//
18+
// Copyright holder is ArangoDB GmbH, Cologne, Germany
19+
//
20+
// Author Ewout Prangsma
21+
//
22+
23+
package trigger
24+
25+
import "sync"
26+
27+
// Trigger is a synchronization utility used to wait (in a select statement)
28+
// until someone triggers it.
29+
type Trigger struct {
30+
mu sync.Mutex
31+
done chan struct{}
32+
pendingTriggers int
33+
}
34+
35+
// Done returns the channel to use in a select case.
36+
// This channel is closed when someone calls Trigger.
37+
func (t *Trigger) Done() <-chan struct{} {
38+
t.mu.Lock()
39+
defer t.mu.Unlock()
40+
41+
if t.done == nil {
42+
t.done = make(chan struct{})
43+
}
44+
if t.pendingTriggers > 0 {
45+
t.pendingTriggers = 0
46+
d := t.done
47+
close(t.done)
48+
t.done = nil
49+
return d
50+
}
51+
return t.done
52+
}
53+
54+
// Trigger closes any Done channel.
55+
func (t *Trigger) Trigger() {
56+
t.mu.Lock()
57+
defer t.mu.Unlock()
58+
t.pendingTriggers++
59+
if t.done != nil {
60+
close(t.done)
61+
t.done = nil
62+
}
63+
}

service/server.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ func (s *httpServer) Run(hostAddr, containerAddr string, tlsConfig *tls.Config,
171171
mux.HandleFunc("/database-auto-upgrade", s.databaseAutoUpgradeHandler)
172172
// Agency callback
173173
mux.HandleFunc("/cb/masterChanged", s.cbMasterChanged)
174+
mux.HandleFunc("/cb/upgradePlanChanged", s.cbUpgradePlanChanged)
174175
}
175176

176177
s.server.Addr = containerAddr
@@ -723,6 +724,20 @@ func (s *httpServer) cbMasterChanged(w http.ResponseWriter, r *http.Request) {
723724
w.Write([]byte("OK"))
724725
}
725726

727+
// cbUpgradePlanChanged is a callback called by the agency when the upgrade plan is modified.
728+
func (s *httpServer) cbUpgradePlanChanged(w http.ResponseWriter, r *http.Request) {
729+
s.log.Debug().Msgf("Upgrade plan changed callback from %s", r.RemoteAddr)
730+
if r.Method != "POST" {
731+
w.WriteHeader(http.StatusMethodNotAllowed)
732+
return
733+
}
734+
735+
// Interrupt upgrade manager
736+
s.context.UpgradeManager().UpgradePlanChangedCallback()
737+
w.WriteHeader(http.StatusOK)
738+
w.Write([]byte("OK"))
739+
}
740+
726741
func handleError(w http.ResponseWriter, err error) {
727742
if loc, ok := IsRedirect(err); ok {
728743
header := w.Header()

service/upgrade_manager.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,15 @@ import (
3030
"sync"
3131
"time"
3232

33-
"github.com/arangodb-helper/arangodb/client"
3433
driver "github.com/arangodb/go-driver"
3534
"github.com/arangodb/go-driver/agency"
3635
"github.com/arangodb/go-upgrade-rules"
3736
"github.com/pkg/errors"
3837
"github.com/rs/zerolog"
3938
"github.com/ryanuber/columnize"
39+
40+
"github.com/arangodb-helper/arangodb/client"
41+
"github.com/arangodb-helper/arangodb/pkg/trigger"
4042
)
4143

4244
// UpgradeManager is the API of a service used to control the upgrade process from 1 database version to the next.
@@ -68,6 +70,9 @@ type UpgradeManager interface {
6870

6971
// RunWatchUpgradePlan keeps watching the upgrade plan until the given context is canceled.
7072
RunWatchUpgradePlan(context.Context)
73+
74+
// UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
75+
UpgradePlanChangedCallback()
7176
}
7277

7378
// UpgradeManagerContext holds methods used by the upgrade manager to control its context.
@@ -209,6 +214,7 @@ type upgradeManager struct {
209214
upgradeManagerContext UpgradeManagerContext
210215
upgradeServerType ServerType
211216
updateNeeded bool
217+
cbTrigger trigger.Trigger
212218
}
213219

214220
// StartDatabaseUpgrade is called to start the upgrade process
@@ -751,13 +757,28 @@ func (m *upgradeManager) removeUpgradePlan(ctx context.Context) error {
751757
// RunWatchUpgradePlan keeps watching the upgrade plan in the agency.
752758
// Once it detects that this starter has to act, it does.
753759
func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
754-
_, _, mode := m.upgradeManagerContext.ClusterConfig()
760+
_, myPeer, mode := m.upgradeManagerContext.ClusterConfig()
761+
ownURL := myPeer.CreateStarterURL("/")
755762
if !mode.HasAgency() {
756763
// Nothing to do here without an agency
757764
return
758765
}
766+
registeredCallback := false
767+
defer func() {
768+
if registeredCallback {
769+
m.unregisterUpgradePlanChangedCallback(ctx, ownURL)
770+
}
771+
}()
759772
for {
760-
delay := time.Second * 10
773+
delay := time.Minute
774+
if !registeredCallback {
775+
m.log.Debug().Msg("Registering upgrade plan changed callback...")
776+
if err := m.registerUpgradePlanChangedCallback(ctx, ownURL); err != nil {
777+
m.log.Info().Err(err).Msg("Failed to register upgrade plan changed callback")
778+
} else {
779+
registeredCallback = true
780+
}
781+
}
761782
plan, err := m.readUpgradePlan(ctx)
762783
if agency.IsKeyNotFound(err) {
763784
// Just try later
@@ -785,13 +806,60 @@ func (m *upgradeManager) RunWatchUpgradePlan(ctx context.Context) {
785806
select {
786807
case <-time.After(delay):
787808
// Continue
809+
case <-m.cbTrigger.Done():
810+
// Continue
788811
case <-ctx.Done():
789812
// Context canceled
790813
return
791814
}
792815
}
793816
}
794817

818+
// UpgradePlanChangedCallback is an agency callback to notify about changes in the upgrade plan
819+
func (m *upgradeManager) UpgradePlanChangedCallback() {
820+
m.cbTrigger.Trigger()
821+
}
822+
823+
// registerUpgradePlanChangedCallback registers our callback URL with the agency
824+
func (m *upgradeManager) registerUpgradePlanChangedCallback(ctx context.Context, ownURL string) error {
825+
// Get api client
826+
api, err := m.createAgencyAPI()
827+
if err != nil {
828+
return maskAny(err)
829+
}
830+
// Register callback
831+
cbURL, err := getURLWithPath(ownURL, "/cb/upgradePlanChanged")
832+
if err != nil {
833+
return maskAny(err)
834+
}
835+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
836+
defer cancel()
837+
if err := api.RegisterChangeCallback(ctx, upgradePlanKey, cbURL); err != nil {
838+
return maskAny(err)
839+
}
840+
return nil
841+
}
842+
843+
// unregisterUpgradePlanChangedCallback removes our callback URL from the agency
844+
func (m *upgradeManager) unregisterUpgradePlanChangedCallback(ctx context.Context, ownURL string) error {
845+
// Get api client
846+
api, err := m.createAgencyAPI()
847+
if err != nil {
848+
return maskAny(err)
849+
}
850+
// Register callback
851+
cbURL, err := getURLWithPath(ownURL, "/cb/upgradePlanChanged")
852+
if err != nil {
853+
return maskAny(err)
854+
}
855+
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
856+
defer cancel()
857+
if err := api.UnregisterChangeCallback(ctx, upgradePlanKey, cbURL); err != nil {
858+
return maskAny(err)
859+
}
860+
return nil
861+
}
862+
795863
// processUpgradePlan inspects the first entry of the given plan and acts upon
796864
// it when needed.
797865
func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePlan) error {
@@ -854,6 +922,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
854922
return recordFailure(errors.Wrap(err, "Cluster is not healthy in time"))
855923
}
856924
}
925+
m.log.Info().Msg("Finished upgrading agent")
857926
case UpgradeEntryTypeDBServer:
858927
// Restart the dbserver in auto-upgrade mode
859928
m.log.Info().Msg("Upgrading dbserver")
@@ -894,6 +963,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
894963
if err := upgrade(); err != nil {
895964
return maskAny(err)
896965
}
966+
m.log.Info().Msg("Finished upgrading dbserver")
897967
case UpgradeEntryTypeCoordinator:
898968
// Restart the coordinator in auto-upgrade mode
899969
m.log.Info().Msg("Upgrading coordinator")
@@ -917,6 +987,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
917987
if err := m.waitUntil(ctx, m.isClusterHealthy, "Cluster is not yet healthy: %v"); err != nil {
918988
return recordFailure(errors.Wrap(err, "Cluster is not healthy in time"))
919989
}
990+
m.log.Info().Msg("Finished upgrading coordinator")
920991
case UpgradeEntryTypeSingle:
921992
// Restart the activefailover single server in auto-upgrade mode
922993
m.log.Info().Msg("Upgrading single server")
@@ -951,6 +1022,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
9511022
if err := upgrade(); err != nil {
9521023
return maskAny(err)
9531024
}
1025+
m.log.Info().Msg("Finished upgrading single server")
9541026
case UpgradeEntryTypeSyncMaster:
9551027
// Restart the syncmaster
9561028
m.log.Info().Msg("Restarting syncmaster")
@@ -971,6 +1043,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
9711043
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncMaster, address, port, nil); !up {
9721044
return recordFailure(fmt.Errorf("Syncmaster is not up in time"))
9731045
}
1046+
m.log.Info().Msg("Finished restarting syncmaster")
9741047
case UpgradeEntryTypeSyncWorker:
9751048
// Restart the syncworker
9761049
m.log.Info().Msg("Restarting syncworker")
@@ -991,6 +1064,7 @@ func (m *upgradeManager) processUpgradePlan(ctx context.Context, plan UpgradePla
9911064
if up, _, _, _, _, _, _, _ := m.upgradeManagerContext.TestInstance(ctx, ServerTypeSyncWorker, address, port, nil); !up {
9921065
return recordFailure(fmt.Errorf("Syncworker is not up in time"))
9931066
}
1067+
m.log.Info().Msg("Finished restarting syncworker")
9941068
default:
9951069
return maskAny(fmt.Errorf("Unsupported upgrade plan entry type '%s'", firstEntry.Type))
9961070
}

0 commit comments

Comments
 (0)