Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 8 additions & 9 deletions go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ import (

// DefaultCell : If no cell name is passed, then use following
const (
DefaultCell = "zone1"
DefaultStartPort = 6700
DefaultVttestEnv = "VTTEST=endtoend"
DefaultCell = "zone1"
DefaultStartPort = 6700
DefaultVttestEnv = "VTTEST=endtoend"
DefaultVtorcsByCell = 1
)

var (
Expand Down Expand Up @@ -298,7 +299,6 @@ func (cluster *LocalProcessCluster) StartUnshardedKeyspace(keyspace Keyspace, re
}

func (cluster *LocalProcessCluster) startPartialKeyspace(keyspace Keyspace, shardNames []string, movedShard string, replicaCount int, rdonly bool, customizers ...any) (err error) {

cluster.HasPartialKeyspaces = true
routedKeyspace := &Keyspace{
Name: fmt.Sprintf("%s_routed", keyspace.Name),
Expand Down Expand Up @@ -806,7 +806,7 @@ func NewBareCluster(cell string, hostname string) *LocalProcessCluster {
// path/to/whatever exists
cluster.ReusingVTDATAROOT = true
} else {
err = createDirectory(cluster.CurrentVTDATAROOT, 0700)
err = createDirectory(cluster.CurrentVTDATAROOT, 0o700)
if err != nil {
log.Fatal(err)
}
Expand Down Expand Up @@ -1160,7 +1160,8 @@ func (cluster *LocalProcessCluster) waitForMySQLProcessToExit(mysqlctlProcessLis

// StartVtbackup starts a vtbackup
func (cluster *LocalProcessCluster) StartVtbackup(newInitDBFile string, initialBackup bool,
keyspace string, shard string, cell string, extraArgs ...string) error {
keyspace string, shard string, cell string, extraArgs ...string,
) error {
log.Info("Starting vtbackup")
cluster.VtbackupProcess = *VtbackupProcessInstance(
cluster.GetAndReserveTabletUID(),
Expand All @@ -1175,7 +1176,6 @@ func (cluster *LocalProcessCluster) StartVtbackup(newInitDBFile string, initialB
initialBackup)
cluster.VtbackupProcess.ExtraArgs = extraArgs
return cluster.VtbackupProcess.Setup()

}

// GetAndReservePort gives port for required process
Expand All @@ -1191,7 +1191,6 @@ func (cluster *LocalProcessCluster) GetAndReservePort() int {
cluster.nextPortForProcess = cluster.nextPortForProcess + 1
log.Infof("Attempting to reserve port: %v", cluster.nextPortForProcess)
ln, err := net.Listen("tcp", net.JoinHostPort("127.0.0.1", strconv.Itoa(cluster.nextPortForProcess)))

if err != nil {
log.Errorf("Can't listen on port %v: %s, trying next port", cluster.nextPortForProcess, err)
continue
Expand All @@ -1214,7 +1213,7 @@ const portFileTimeout = 1 * time.Hour
// If yes, then return that port, and save port + 200 in the same file
// here, assumptions is 200 ports might be consumed for all tests in a package
func getPort() int {
portFile, err := os.OpenFile(path.Join(os.TempDir(), "endtoend.port"), os.O_CREATE|os.O_RDWR, 0644)
portFile, err := os.OpenFile(path.Join(os.TempDir(), "endtoend.port"), os.O_CREATE|os.O_RDWR, 0o644)
if err != nil {
panic(err)
}
Expand Down
84 changes: 84 additions & 0 deletions go/test/endtoend/vtorc/general/vtorc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package general

import (
"context"
"encoding/json"
"fmt"
"strconv"
"testing"
"time"

Expand Down Expand Up @@ -893,3 +895,85 @@ func TestFullStatusConnectionPooling(t *testing.T) {
assert.Equal(t, 200, status)
assert.Equal(t, "null", resp)
}

// TestSemiSyncRecoveryOrdering verifies that when the durability policy changes
// to semi_sync, VTOrc fixes ReplicaSemiSyncMustBeSet before PrimarySemiSyncMustBeSet.
// This ordering is enforced by the AfterAnalyses/BeforeAnalyses dependencies.
func TestSemiSyncRecoveryOrdering(t *testing.T) {
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
// Start with durability "none" so no semi-sync is required initially.
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 0, nil, cluster.VTOrcConfiguration{
PreventCrossCellFailover: true,
}, cluster.DefaultVtorcsByCell, policy.DurabilityNone)
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
shard0 := &keyspace.Shards[0]

// Wait for primary election and healthy replication.
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
assert.NotNil(t, primary, "should have elected a primary")
utils.CheckReplication(t, clusterInfo, primary, shard0.Vttablets, 10*time.Second)

vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.ElectNewPrimaryRecoveryName, keyspace.Name, shard0.Name, 1)

// Change durability to semi_sync. VTOrc should detect that replicas and primary
// need semi-sync enabled, and fix them in the correct order.
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync)
require.NoError(t, err, out)

// Poll the database-state API to verify recovery ordering.
// The topology_recovery table has auto-incremented recovery_id values that
// reflect execution order. All ReplicaSemiSyncMustBeSet recovery_ids should
// be less than any PrimarySemiSyncMustBeSet recovery_id.
type tableState struct {
TableName string
Rows []map[string]any
}

assert.EventuallyWithT(t, func(c *assert.CollectT) {
status, response, err := utils.MakeAPICall(t, vtorc, "/api/database-state")
assert.NoError(c, err)
assert.Equal(c, 200, status)

var tables []tableState
if !assert.NoError(c, json.Unmarshal([]byte(response), &tables)) {
return
}

var maxReplicaRecoveryID, minPrimaryRecoveryID int
var replicaCount, primaryCount int
for _, table := range tables {
if table.TableName != "topology_recovery" {
continue
}
for _, row := range table.Rows {
analysis, _ := row["analysis"].(string)
recoveryIDStr, _ := row["recovery_id"].(string)
recoveryID, err := strconv.Atoi(recoveryIDStr)
if err != nil {
continue
}
switch inst.AnalysisCode(analysis) {
case inst.ReplicaSemiSyncMustBeSet:
replicaCount++
if replicaCount == 1 || recoveryID > maxReplicaRecoveryID {
maxReplicaRecoveryID = recoveryID
}
case inst.PrimarySemiSyncMustBeSet:
primaryCount++
if primaryCount == 1 || recoveryID < minPrimaryRecoveryID {
minPrimaryRecoveryID = recoveryID
}
}
}
}

assert.Greater(c, replicaCount, 0, "should have ReplicaSemiSyncMustBeSet recoveries")
assert.Greater(c, primaryCount, 0, "should have PrimarySemiSyncMustBeSet recoveries")
if replicaCount > 0 && primaryCount > 0 {
assert.Less(c, maxReplicaRecoveryID, minPrimaryRecoveryID,
"all ReplicaSemiSyncMustBeSet recoveries should have lower recovery_id than PrimarySemiSyncMustBeSet")
}
}, 30*time.Second, time.Second)
}
13 changes: 13 additions & 0 deletions go/vt/vtorc/inst/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtorc/config"
)

Expand Down Expand Up @@ -115,11 +116,13 @@ type DetectionAnalysis struct {
CountReplicas uint
CountValidReplicas uint
CountValidReplicatingReplicas uint
CountValidSemiSyncReplicatingReplicas uint
ReplicationStopped bool
ErrantGTID string
ReplicaNetTimeout int32
HeartbeatInterval float64
Analysis AnalysisCode
AnalysisMatchedProblems []*DetectionAnalysisProblemMeta
Description string
StructureAnalysis []StructureAnalysisCode
OracleGTIDImmediateTopology bool
Expand Down Expand Up @@ -148,6 +151,16 @@ type DetectionAnalysis struct {
IsDiskStalled bool
}

// hasMinSemiSyncAckers returns true if there are a minimum number of semi-sync ackers enabled and replicating.
// True is always returned if the durability policy does not require semi-sync ackers (eg: "none"). This gives
// a useful signal if it is safe to enable semi-sync without risk of stalling ongoing PRIMARY writes.
func hasMinSemiSyncAckers(durabler policy.Durabler, primary *topodatapb.Tablet, analysis *DetectionAnalysis) bool {
if durabler == nil || analysis == nil {
return false
}
return int(analysis.CountValidSemiSyncReplicatingReplicas) >= durabler.SemiSyncAckers(primary)
}

func (detectionAnalysis *DetectionAnalysis) MarshalJSON() ([]byte, error) {
i := struct {
DetectionAnalysis
Expand Down
Loading
Loading