Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion RELEASE_VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
3.2.6
3.2.6-1
2 changes: 2 additions & 0 deletions go/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ type Configuration struct {
BufferInstanceWrites bool // Set to 'true' for write-optimization on backend table (compromise: writes can be stale and overwrite non stale data)
InstanceFlushIntervalMilliseconds int // Max interval between instance write buffer flushes
SkipMaxScaleCheck bool // If you don't ever have MaxScale BinlogServer in your topology (and most people don't), set this to 'true' to save some pointless queries
AllowLowerVersionForReplication bool // Disable checking if MySQL version we're trying to replicate from is higher than the replica, warning is produced.
UnseenInstanceForgetHours uint // Number of hours after which an unseen instance is forgotten
SnapshotTopologiesIntervalHours uint // Interval in hour between snapshot-topologies invocation. Default: 0 (disabled)
DiscoveryMaxConcurrency uint // Number of goroutines doing hosts discovery
Expand Down Expand Up @@ -345,6 +346,7 @@ func newConfiguration() *Configuration {
BufferInstanceWrites: false,
InstanceFlushIntervalMilliseconds: 100,
SkipMaxScaleCheck: true,
AllowLowerVersionForReplication: false,
UnseenInstanceForgetHours: 240,
SnapshotTopologiesIntervalHours: 0,
DiscoverByShowSlaveHosts: false,
Expand Down
16 changes: 11 additions & 5 deletions go/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -1520,11 +1520,15 @@ func (this *HttpAPI) CanReplicateFrom(params martini.Params, r render.Render, re
}

canReplicate, err := instance.CanReplicateFrom(belowInstance)
if err != nil {
if !canReplicate {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}

if err != nil {
log.Warningf("CanReplicateFrom(): %v", err)
}

Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("%t", canReplicate), Details: belowKey})
}

Expand Down Expand Up @@ -1552,14 +1556,16 @@ func (this *HttpAPI) CanReplicateFromGTID(params martini.Params, r render.Render
}

canReplicate, err := instance.CanReplicateFrom(belowInstance)
if err != nil {
if !canReplicate {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
return
}
if !canReplicate {
Respond(r, &APIResponse{Code: OK, Message: fmt.Sprintf("%t", canReplicate), Details: belowKey})
return

if err != nil {
log.Warningf("CanReplicateFromGTID(): %v", err)
err = nil
}

err = inst.CheckMoveViaGTID(instance, belowInstance)
if err != nil {
Respond(r, &APIResponse{Code: ERROR, Message: err.Error()})
Expand Down
10 changes: 8 additions & 2 deletions go/inst/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (this *Instance) IsDescendantOf(other *Instance) bool {
// CanReplicateFrom uses heuristics to decide whether this instacne can practically replicate from other instance.
// Checks are made to binlog format, version number, binary logs etc.
func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
var warningMsg error = nil

if this.Key.Equals(&other.Key) {
return false, fmt.Errorf("instance cannot replicate from itself: %+v", this.Key)
}
Expand All @@ -462,7 +464,11 @@ func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
// Not OK for a replica, for it has to relay the logs.
}
if this.IsSmallerMajorVersion(other) && !this.IsBinlogServer() {
return false, fmt.Errorf("instance %+v has version %s, which is lower than %s on %+v ", this.Key, this.Version, other.Version, other.Key)
if !config.Config.AllowLowerVersionForReplication {
return false, fmt.Errorf("instance %+v has version %s, which is lower than %s on %+v ", this.Key, this.Version, other.Version, other.Key)
} else {
warningMsg = fmt.Errorf("instance %+v has version %s, which is lower than %s on %+v ", this.Key, this.Version, other.Version, other.Key)
}
}
if this.LogBinEnabled && this.LogReplicationUpdatesEnabled {
if this.IsSmallerBinlogFormat(other) {
Expand All @@ -483,7 +489,7 @@ func (this *Instance) CanReplicateFrom(other *Instance) (bool, error) {
if this.SQLDelay < other.SQLDelay && int64(other.SQLDelay) > int64(config.Config.ReasonableMaintenanceReplicationLagSeconds) {
return false, fmt.Errorf("%+v has higher SQL_Delay (%+v seconds) than %+v does (%+v seconds)", other.Key, other.SQLDelay, this.Key, this.SQLDelay)
}
return true, nil
return true, warningMsg
}

// HasReasonableMaintenanceReplicationLag returns true when the replica lag is reasonable, and maintenance operations should have a green light to go.
Expand Down
18 changes: 17 additions & 1 deletion go/inst/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
package inst

import (
"testing"

"github.com/openark/golib/log"
test "github.com/openark/golib/tests"
"github.com/openark/orchestrator/go/config"
"testing"
)

func init() {
Expand Down Expand Up @@ -134,6 +135,21 @@ func TestCanReplicateFrom(t *testing.T) {
canReplicate, _ = i55.CanReplicateFrom(&i56)
test.S(t).ExpectFalse(canReplicate)

i80 := Instance{Key: key3, Version: "8.0"}
i80.LogBinEnabled = true
i80.LogReplicationUpdatesEnabled = true
i80.ServerID = 80

canReplicate, err = i56.CanReplicateFrom(&i80)
test.S(t).ExpectNotNil(err)
test.S(t).ExpectEquals(canReplicate, false)

config.Config.AllowLowerVersionForReplication = true
canReplicate, err = i56.CanReplicateFrom(&i80)
test.S(t).ExpectNotNil(err)
test.S(t).ExpectEquals(canReplicate, true)
config.Config.AllowLowerVersionForReplication = false

iStatement := Instance{Key: key1, Binlog_format: "STATEMENT", ServerID: 1, Version: "5.5", LogBinEnabled: true, LogReplicationUpdatesEnabled: true}
iRow := Instance{Key: key2, Binlog_format: "ROW", ServerID: 2, Version: "5.5", LogBinEnabled: true, LogReplicationUpdatesEnabled: true}
canReplicate, err = iRow.CanReplicateFrom(&iStatement)
Expand Down
92 changes: 81 additions & 11 deletions go/inst/instance_topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,16 @@ func MoveUp(instanceKey *InstanceKey) (*Instance, error) {
return instance, fmt.Errorf("master is not a replica itself: %+v", master.Key)
}

if canReplicate, err := instance.CanReplicateFrom(master); canReplicate == false {
canReplicate, err := instance.CanReplicateFrom(master)
if !canReplicate {
return instance, err
}

// There may be a situation when it's ok to replicate but err contains some information, we will log this as a warning then
if err != nil {
log.Warningf("MoveUp(): %v", err)
}

if master.IsBinlogServer() {
// Quick solution via binlog servers
return Repoint(instanceKey, &master.MasterKey, GTIDHintDeny)
Expand Down Expand Up @@ -429,10 +436,16 @@ func MoveUpReplicas(instanceKey *InstanceKey, pattern string) ([](*Instance), *I

var replicaErr error
ExecuteOnTopology(func() {
if canReplicate, err := replica.CanReplicateFrom(instance); canReplicate == false || err != nil {
canReplicate, err := replica.CanReplicateFrom(instance)
if !canReplicate {
replicaErr = err
return
}

if err != nil {
log.Warningf("MoveUpReplicas(): :%v", err)
}

if instance.IsBinlogServer() {
// Special case. Just repoint
replica, err = Repoint(&replica.Key, instanceKey, GTIDHintDeny)
Expand Down Expand Up @@ -527,9 +540,15 @@ func MoveBelow(instanceKey, siblingKey *InstanceKey) (*Instance, error) {
return instance, fmt.Errorf("instances are not siblings: %+v, %+v", *instanceKey, *siblingKey)
}

if canReplicate, err := instance.CanReplicateFrom(sibling); !canReplicate {
canReplicate, err := instance.CanReplicateFrom(sibling)
if !canReplicate {
return instance, err
}

if err != nil {
log.Warningf("MoveBelow(): %v", err)
}

log.Infof("Will move %+v below %+v", instanceKey, siblingKey)

if maintenanceToken, merr := BeginMaintenance(instanceKey, GetMaintenanceOwner(), fmt.Sprintf("move below %+v", *siblingKey)); merr != nil {
Expand Down Expand Up @@ -629,9 +648,16 @@ func moveInstanceBelowViaGTID(instance, otherInstance *Instance) (*Instance, err
return instance, merr
}

if canReplicate, err := instance.CanReplicateFrom(otherInstance); !canReplicate {
var err error
canReplicate, err := instance.CanReplicateFrom(otherInstance)
if !canReplicate {
return instance, err
}

if err != nil {
log.Warningf("moveInstanceBelowViaGTID(): %v", err)
}

if err := CheckMoveViaGTID(instance, otherInstance); err != nil {
return instance, err
}
Expand All @@ -640,7 +666,6 @@ func moveInstanceBelowViaGTID(instance, otherInstance *Instance) (*Instance, err
instanceKey := &instance.Key
otherInstanceKey := &otherInstance.Key

var err error
if maintenanceToken, merr := BeginMaintenance(instanceKey, GetMaintenanceOwner(), fmt.Sprintf("move below %+v", *otherInstanceKey)); merr != nil {
err = fmt.Errorf("Cannot begin maintenance on %+v: %v", *instanceKey, merr)
goto Cleanup
Expand Down Expand Up @@ -812,10 +837,17 @@ func Repoint(instanceKey *InstanceKey, masterKey *InstanceKey, gtidHint Operatio
return instance, err
}
}
if canReplicate, err := instance.CanReplicateFrom(master); !canReplicate {

canReplicate, err := instance.CanReplicateFrom(master)
if !canReplicate {
return instance, err
}

if err != nil {
log.Warningf("Repoint(): %v", err)
err = nil
}

// if a binlog server check it is sufficiently up to date
if master.IsBinlogServer() {
// "Repoint" operation trusts the user. But only so much. Repoiting to a binlog server which is not yet there is strictly wrong.
Expand Down Expand Up @@ -991,9 +1023,17 @@ func MakeCoMaster(instanceKey *InstanceKey) (*Instance, error) {
} else if _, found, _ := ReadInstance(&master.MasterKey); found {
return instance, fmt.Errorf("%+v is not a real master; it replicates from: %+v", master.Key, master.MasterKey)
}
if canReplicate, err := master.CanReplicateFrom(instance); !canReplicate {

canReplicate, err := master.CanReplicateFrom(instance)
if !canReplicate {
return instance, err
}

if err != nil {
log.Warningf("MakeCoMaster(): %v", err)
err = nil
}

log.Infof("Will make %+v co-master of %+v", instanceKey, master.Key)

var gitHint OperationGTIDHint = GTIDHintNeutral
Expand Down Expand Up @@ -1584,9 +1624,16 @@ func MatchBelow(instanceKey, otherKey *InstanceKey, requireInstanceMaintenance b
return instance, nil, merr
}

if canReplicate, err := instance.CanReplicateFrom(otherInstance); !canReplicate {
canReplicate, err := instance.CanReplicateFrom(otherInstance)
if !canReplicate {
return instance, nil, err
}

if err != nil {
log.Warningf("MatchBelow(): %v", err)
err = nil
}

var nextBinlogCoordinatesToMatch *BinlogCoordinates
var countMatchedEvents int

Expand Down Expand Up @@ -1791,9 +1838,16 @@ func TakeMaster(instanceKey *InstanceKey, allowTakingCoMaster bool) (*Instance,
}
log.Debugf("TakeMaster: will attempt making %+v take its master %+v, now resolved as %+v", *instanceKey, instance.MasterKey, masterInstance.Key)

if canReplicate, err := masterInstance.CanReplicateFrom(instance); canReplicate == false {
canReplicate, err := masterInstance.CanReplicateFrom(instance)
if !canReplicate {
return instance, err
}

if err != nil {
log.Warningf("TakeMaster(): %v", err)
err = nil
}

// We begin
masterInstance, err = StopReplication(&masterInstance.Key)
if err != nil {
Expand Down Expand Up @@ -2245,11 +2299,15 @@ func chooseCandidateReplica(replicas [](*Instance)) (candidateReplica *Instance,
replicas = RemoveInstance(replicas, &candidateReplica.Key)
for _, replica := range replicas {
replica := replica
if canReplicate, err := replica.CanReplicateFrom(candidateReplica); !canReplicate {

canReplicate, err := replica.CanReplicateFrom(candidateReplica)

if !canReplicate {
// lost due to inability to replicate
cannotReplicateReplicas = append(cannotReplicateReplicas, replica)
if err != nil {
log.Errorf("chooseCandidateReplica(): error checking CanReplicateFrom(). replica: %v; error: %v", replica.Key, err)
err = nil // Zerro out the error so we don't log it again with warning
}
} else if replica.ExecBinlogCoordinates.SmallerThan(&candidateReplica.ExecBinlogCoordinates) {
laterReplicas = append(laterReplicas, replica)
Expand All @@ -2259,6 +2317,11 @@ func chooseCandidateReplica(replicas [](*Instance)) (candidateReplica *Instance,
// lost due to being more advanced/ahead of chosen replica.
aheadReplicas = append(aheadReplicas, replica)
}

if err != nil {
log.Warningf("chooseCandidateReplica(): replica: %v, warning: %+v", replica.Key, err)
err = nil
}
}
return candidateReplica, aheadReplicas, equalReplicas, laterReplicas, cannotReplicateReplicas, err
}
Expand Down Expand Up @@ -2658,9 +2721,16 @@ func RegroupReplicas(masterKey *InstanceKey, returnReplicaEvenOnFailureToRegroup
// It may choose to use Pseudo-GTID, or normal binlog positions, or take advantage of binlog servers,
// or it may combine any of the above in a multi-step operation.
func relocateBelowInternal(instance, other *Instance) (*Instance, error) {
if canReplicate, err := instance.CanReplicateFrom(other); !canReplicate {

canReplicate, err := instance.CanReplicateFrom(other)
if !canReplicate {
return instance, log.Errorf("%+v cannot replicate from %+v. Reason: %+v", instance.Key, other.Key, err)
}

if err != nil {
log.Warningf("relocateBelowInternal(): %v", err)
}

// simplest:
if InstanceIsMasterOf(other, instance) {
// already the desired setup.
Expand Down
2 changes: 1 addition & 1 deletion go/inst/instance_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func MajorVersion(version string) []string {

// IsSmallerMajorVersion tests two versions against another and returns true if
// the former is a smaller "major" varsion than the latter.
// e.g. 5.5.36 is NOT a smaller major version as comapred to 5.5.40, but IS as compared to 5.6.9
// e.g. 5.5.36 is NOT a smaller major version as comapred to 5.5.40, but IS as compared to 6.0.0
func IsSmallerMajorVersion(version string, otherVersion string) bool {
thisMajorVersion := MajorVersion(version)
otherMajorVersion := MajorVersion(otherVersion)
Expand Down
8 changes: 7 additions & 1 deletion go/logic/topology_recovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,15 @@ func canTakeOverPromotedServerAsMaster(wantToTakeOver *inst.Instance, toBeTakenO
if !wantToTakeOver.MasterKey.Equals(&toBeTakenOver.Key) {
return false
}
if canReplicate, _ := toBeTakenOver.CanReplicateFrom(wantToTakeOver); !canReplicate {

canReplicate, err := toBeTakenOver.CanReplicateFrom(wantToTakeOver)
if !canReplicate {
return false
}
if err != nil {
log.Warningf("canTakeOverPromotedServerAsMaster(): %v", err)
}

return true
}

Expand Down