Skip to content

Commit 26592d7

Browse files
vitess-bot[bot]timvaillancourt
authored andcommitted
Cherry-pick e7888df with conflicts
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
1 parent d9b96c1 commit 26592d7

File tree

10 files changed

+1473
-100
lines changed

10 files changed

+1473
-100
lines changed

go/test/endtoend/vtorc/general/vtorc_test.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,13 @@ package general
1818

1919
import (
2020
"context"
21+
"encoding/json"
2122
"fmt"
23+
<<<<<<< HEAD
24+
=======
25+
"strconv"
26+
"strings"
27+
>>>>>>> e7888dfa83 (`vtorc`: support analysis ordering, improve semi-sync rollout (#19427))
2228
"testing"
2329
"time"
2430

@@ -893,3 +899,85 @@ func TestFullStatusConnectionPooling(t *testing.T) {
893899
assert.Equal(t, 200, status)
894900
assert.Equal(t, "null", resp)
895901
}
902+
903+
// TestSemiSyncRecoveryOrdering verifies that when the durability policy changes
904+
// to semi_sync, VTOrc fixes ReplicaSemiSyncMustBeSet before PrimarySemiSyncMustBeSet.
905+
// This ordering is enforced by the AfterAnalyses/BeforeAnalyses dependencies.
906+
func TestSemiSyncRecoveryOrdering(t *testing.T) {
907+
defer utils.PrintVTOrcLogsOnFailure(t, clusterInfo.ClusterInstance)
908+
// Start with durability "none" so no semi-sync is required initially.
909+
utils.SetupVttabletsAndVTOrcs(t, clusterInfo, 2, 0, nil, cluster.VTOrcConfiguration{
910+
PreventCrossCellFailover: true,
911+
}, cluster.DefaultVtorcsByCell, policy.DurabilityNone)
912+
keyspace := &clusterInfo.ClusterInstance.Keyspaces[0]
913+
shard0 := &keyspace.Shards[0]
914+
915+
// Wait for primary election and healthy replication.
916+
primary := utils.ShardPrimaryTablet(t, clusterInfo, keyspace, shard0)
917+
assert.NotNil(t, primary, "should have elected a primary")
918+
utils.CheckReplication(t, clusterInfo, primary, shard0.Vttablets, 10*time.Second)
919+
920+
vtorc := clusterInfo.ClusterInstance.VTOrcProcesses[0]
921+
utils.WaitForSuccessfulRecoveryCount(t, vtorc, logic.ElectNewPrimaryRecoveryName, keyspace.Name, shard0.Name, 1)
922+
923+
// Change durability to semi_sync. VTOrc should detect that replicas and primary
924+
// need semi-sync enabled, and fix them in the correct order.
925+
out, err := clusterInfo.ClusterInstance.VtctldClientProcess.ExecuteCommandWithOutput(
926+
"SetKeyspaceDurabilityPolicy", keyspace.Name, "--durability-policy="+policy.DurabilitySemiSync)
927+
require.NoError(t, err, out)
928+
929+
// Poll the database-state API to verify recovery ordering.
930+
// The topology_recovery table has auto-incremented recovery_id values that
931+
// reflect execution order. All ReplicaSemiSyncMustBeSet recovery_ids should
932+
// be less than any PrimarySemiSyncMustBeSet recovery_id.
933+
type tableState struct {
934+
TableName string
935+
Rows []map[string]any
936+
}
937+
938+
assert.EventuallyWithT(t, func(c *assert.CollectT) {
939+
status, response, err := utils.MakeAPICall(t, vtorc, "/api/database-state")
940+
assert.NoError(c, err)
941+
assert.Equal(c, 200, status)
942+
943+
var tables []tableState
944+
if !assert.NoError(c, json.Unmarshal([]byte(response), &tables)) {
945+
return
946+
}
947+
948+
var maxReplicaRecoveryID, minPrimaryRecoveryID int
949+
var replicaCount, primaryCount int
950+
for _, table := range tables {
951+
if table.TableName != "topology_recovery" {
952+
continue
953+
}
954+
for _, row := range table.Rows {
955+
analysis, _ := row["analysis"].(string)
956+
recoveryIDStr, _ := row["recovery_id"].(string)
957+
recoveryID, err := strconv.Atoi(recoveryIDStr)
958+
if err != nil {
959+
continue
960+
}
961+
switch inst.AnalysisCode(analysis) {
962+
case inst.ReplicaSemiSyncMustBeSet:
963+
replicaCount++
964+
if replicaCount == 1 || recoveryID > maxReplicaRecoveryID {
965+
maxReplicaRecoveryID = recoveryID
966+
}
967+
case inst.PrimarySemiSyncMustBeSet:
968+
primaryCount++
969+
if primaryCount == 1 || recoveryID < minPrimaryRecoveryID {
970+
minPrimaryRecoveryID = recoveryID
971+
}
972+
}
973+
}
974+
}
975+
976+
assert.Greater(c, replicaCount, 0, "should have ReplicaSemiSyncMustBeSet recoveries")
977+
assert.Greater(c, primaryCount, 0, "should have PrimarySemiSyncMustBeSet recoveries")
978+
if replicaCount > 0 && primaryCount > 0 {
979+
assert.Less(c, maxReplicaRecoveryID, minPrimaryRecoveryID,
980+
"all ReplicaSemiSyncMustBeSet recoveries should have lower recovery_id than PrimarySemiSyncMustBeSet")
981+
}
982+
}, 30*time.Second, time.Second)
983+
}

go/vt/vtorc/inst/analysis.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"time"
2222

2323
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
24+
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
2425
"vitess.io/vitess/go/vt/vtorc/config"
2526
)
2627

@@ -115,11 +116,13 @@ type DetectionAnalysis struct {
115116
CountReplicas uint
116117
CountValidReplicas uint
117118
CountValidReplicatingReplicas uint
119+
CountValidSemiSyncReplicatingReplicas uint
118120
ReplicationStopped bool
119121
ErrantGTID string
120122
ReplicaNetTimeout int32
121123
HeartbeatInterval float64
122124
Analysis AnalysisCode
125+
AnalysisMatchedProblems []*DetectionAnalysisProblemMeta
123126
Description string
124127
StructureAnalysis []StructureAnalysisCode
125128
OracleGTIDImmediateTopology bool
@@ -148,6 +151,16 @@ type DetectionAnalysis struct {
148151
IsDiskStalled bool
149152
}
150153

154+
// hasMinSemiSyncAckers returns true if there are a minimum number of semi-sync ackers enabled and replicating.
155+
// True is always returned if the durability policy does not require semi-sync ackers (eg: "none"). This gives
156+
// a useful signal if it is safe to enable semi-sync without risk of stalling ongoing PRIMARY writes.
157+
func hasMinSemiSyncAckers(durabler policy.Durabler, primary *topodatapb.Tablet, analysis *DetectionAnalysis) bool {
158+
if durabler == nil || analysis == nil {
159+
return false
160+
}
161+
return int(analysis.CountValidSemiSyncReplicatingReplicas) >= durabler.SemiSyncAckers(primary)
162+
}
163+
151164
func (detectionAnalysis *DetectionAnalysis) MarshalJSON() ([]byte, error) {
152165
i := struct {
153166
DetectionAnalysis

go/vt/vtorc/inst/analysis_dao.go

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package inst
1818

1919
import (
2020
"fmt"
21-
"math"
2221
"time"
2322

2423
"github.com/patrickmn/go-cache"
@@ -196,6 +195,15 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
196195
),
197196
0
198197
) AS count_valid_semi_sync_replicas,
198+
IFNULL(
199+
SUM(
200+
replica_instance.last_checked <= replica_instance.last_seen
201+
AND replica_instance.replica_io_running != 0
202+
AND replica_instance.replica_sql_running != 0
203+
AND replica_instance.semi_sync_replica_enabled != 0
204+
),
205+
0
206+
) AS count_valid_semi_sync_replicating_replicas,
199207
IFNULL(
200208
SUM(
201209
replica_instance.log_bin
@@ -350,6 +358,7 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
350358
a.SemiSyncBlocked = m.GetBool("semi_sync_blocked")
351359
a.SemiSyncReplicaEnabled = m.GetBool("semi_sync_replica_enabled")
352360
a.CountSemiSyncReplicasEnabled = m.GetUint("count_semi_sync_replicas")
361+
a.CountValidSemiSyncReplicatingReplicas = m.GetUint("count_valid_semi_sync_replicating_replicas")
353362
// countValidSemiSyncReplicasEnabled := m.GetUint("count_valid_semi_sync_replicas")
354363
a.SemiSyncPrimaryWaitForReplicaCount = m.GetUint("semi_sync_primary_wait_for_replica_count")
355364
a.SemiSyncPrimaryClients = m.GetUint("semi_sync_primary_clients")
@@ -405,14 +414,15 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
405414
// Increment the total number of tablets.
406415
ca.totalTablets += 1
407416
if ca.hasShardWideAction {
408-
// We can only take one shard level action at a time.
417+
// We can only take one shard-wide action at a time.
409418
return nil
410419
}
411420
if ca.durability == nil {
412421
// We failed to load the durability policy, so we shouldn't run any analysis
413422
return nil
414423
}
415424
isInvalid := m.GetBool("is_invalid")
425+
<<<<<<< HEAD
416426
switch {
417427
case a.IsClusterPrimary && isInvalid:
418428
a.Analysis = InvalidPrimary
@@ -538,27 +548,32 @@ func GetDetectionAnalysis(keyspace string, shard string, hints *DetectionAnalysi
538548
} else {
539549
a.Analysis = LockedSemiSyncPrimaryHypothesis
540550
a.Description = "Semi sync primary seems to be locked, more samplings needed to validate"
551+
=======
552+
var matchedProblems []*DetectionAnalysisProblem
553+
for _, problem := range detectionAnalysisProblems {
554+
// When isInvalid is true, instance data is unreliable (never been reached).
555+
// Only InvalidPrimary/InvalidReplica should match; postProcessAnalyses
556+
// handles upgrading InvalidPrimary to DeadPrimary if needed.
557+
if isInvalid && problem.Meta.Analysis != InvalidPrimary && problem.Meta.Analysis != InvalidReplica {
558+
continue
559+
>>>>>>> e7888dfa83 (`vtorc`: support analysis ordering, improve semi-sync rollout (#19427))
541560
}
542-
//
543-
case a.IsPrimary && a.LastCheckValid && a.CountReplicas == 1 && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0:
544-
a.Analysis = PrimarySingleReplicaNotReplicating
545-
a.Description = "Primary is reachable but its single replica is not replicating"
546-
case a.IsPrimary && a.LastCheckValid && a.CountReplicas == 1 && a.CountValidReplicas == 0:
547-
a.Analysis = PrimarySingleReplicaDead
548-
a.Description = "Primary is reachable but its single replica is dead"
549-
//
550-
case a.IsPrimary && a.LastCheckValid && a.CountReplicas > 1 && a.CountValidReplicas == a.CountReplicas && a.CountValidReplicatingReplicas == 0:
551-
a.Analysis = AllPrimaryReplicasNotReplicating
552-
a.Description = "Primary is reachable but none of its replicas is replicating"
553-
//
554-
case a.IsPrimary && a.LastCheckValid && a.CountReplicas > 1 && a.CountValidReplicas < a.CountReplicas && a.CountValidReplicas > 0 && a.CountValidReplicatingReplicas == 0:
555-
a.Analysis = AllPrimaryReplicasNotReplicatingOrDead
556-
a.Description = "Primary is reachable but none of its replicas is replicating"
557-
//
558-
// case a.IsPrimary && a.CountReplicas == 0:
559-
// a.Analysis = PrimaryWithoutReplicas
560-
// a.Description = "Primary has no replicas"
561-
// }
561+
if problem.HasMatch(a, ca, primaryTablet, tablet, isInvalid, isStaleBinlogCoordinates) {
562+
matchedProblems = append(matchedProblems, problem)
563+
}
564+
}
565+
if len(matchedProblems) > 0 {
566+
sortDetectionAnalysisMatchedProblems(matchedProblems)
567+
for _, problem := range matchedProblems {
568+
a.AnalysisMatchedProblems = append(a.AnalysisMatchedProblems, problem.Meta)
569+
}
570+
// We return a single problem per tablet. Any remaining problems will be discovered/recovered
571+
// by VTOrc(s) on future polls. Often many problems are resolved by a single recovery of the
572+
// first problem. The first element of matchedProblems is the highest-priority problem.
573+
chosenProblem := matchedProblems[0]
574+
a.Analysis = chosenProblem.Meta.Analysis
575+
a.Description = chosenProblem.Meta.Description
576+
ca.hasShardWideAction = chosenProblem.Meta.Priority == detectionAnalysisPriorityShardWideAction
562577
}
563578

564579
{

0 commit comments

Comments
 (0)