Skip to content

Commit a0109e0

Browse files
committed
fix failover release lock and add recovery status tracking
1 parent 0ea732d commit a0109e0

File tree

6 files changed

+53
-9
lines changed

6 files changed

+53
-9
lines changed

libs/cluster/Server/ClusterManagerWorkerState.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro
188188

189189
// Transition to recovering state
190190
// Only one caller will succeed in becoming a replica for the provided node-id
191-
if (!clusterProvider.replicationManager.StartRecovery())
191+
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterReplicate))
192192
{
193193
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
194194
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK;

libs/cluster/Server/ClusterProvider.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ public MetricsItem[] GetReplicationInfo()
246246
replicationInfo.Add(new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString()));
247247
replicationInfo.Add(new("replication_offset_lag", replicationOffsetLag.ToString()));
248248
replicationInfo.Add(new("replication_offset_max_lag", storeWrapper.serverOptions.ReplicationOffsetMaxLag.ToString()));
249+
replicationInfo.Add(new("recoverStatus", replicationManager.recoverStatus.ToString()));
249250
}
250251
else
251252
{

libs/cluster/Server/Failover/ReplicaFailoverSession.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,15 @@ private bool TakeOverAsPrimary()
150150
{
151151
// Take over as primary and inform old primary
152152
status = FailoverStatus.TAKING_OVER_AS_PRIMARY;
153+
var acquiredLock = true;
153154

154155
try
155156
{
156157
// Make replica syncing unavailable by setting recovery flag
157-
if (!clusterProvider.replicationManager.StartRecovery())
158+
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterFailover))
158159
{
159160
logger?.LogWarning($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
161+
acquiredLock = false;
160162
return false;
161163
}
162164
_ = clusterProvider.BumpAndWaitForEpochTransition();
@@ -178,7 +180,7 @@ private bool TakeOverAsPrimary()
178180
finally
179181
{
180182
// Disable recovering as now this node has become a primary or failed in its attempt earlier
181-
clusterProvider.replicationManager.SuspendRecovery();
183+
if (acquiredLock) clusterProvider.replicationManager.SuspendRecovery();
182184
}
183185

184186
return true;
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// Copyright (c) Microsoft Corporation.
2+
// Licensed under the MIT license.
3+
4+
namespace Garnet.cluster
5+
{
6+
/// <summary>
7+
/// Recovery status
8+
/// </summary>
9+
public enum RecoveryStatus : byte
10+
{
11+
/// <summary>
12+
/// No recovery
13+
/// </summary>
14+
NoRecovery,
15+
/// <summary>
16+
/// Recovery at initilization
17+
/// </summary>
18+
InitializeRecover,
19+
/// <summary>
20+
/// Recovery at cluster replicate
21+
/// </summary>
22+
ClusterReplicate,
23+
/// <summary>
24+
/// Recovery at cluster failover
25+
/// </summary>
26+
ClusterFailover,
27+
/// <summary>
28+
/// Recovery at replicaof no one
29+
/// </summary>
30+
ReplicaOfNoOne
31+
}
32+
}

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@ public long ReplicationOffset2
6464
public string PrimaryReplId => currentReplicationConfig.primary_replid;
6565
public string PrimaryReplId2 => currentReplicationConfig.primary_replid2;
6666

67+
/// <summary>
68+
/// Recovery status
69+
/// </summary>
70+
public RecoveryStatus recoverStatus;
71+
6772
[MethodImpl(MethodImplOptions.AggressiveInlining)]
6873
public ReplicationLogCheckpointManager GetCkptManager(StoreType storeType)
6974
{
@@ -112,7 +117,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
112117
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShift = CheckpointVersionShift;
113118

114119
// If this node starts as replica, it cannot serve requests until it is connected to primary
115-
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery())
120+
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery(RecoveryStatus.InitializeRecover))
116121
throw new Exception(Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
117122

118123
checkpointStore = new CheckpointStore(storeWrapper, clusterProvider, true, logger);
@@ -165,22 +170,24 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
165170
/// <summary>
166171
/// Acquire recovery and checkpoint locks to prevent checkpoints and parallel recovery tasks
167172
/// </summary>
168-
public bool StartRecovery()
173+
public bool StartRecovery(RecoveryStatus recoverStatus)
169174
{
170175
if (!clusterProvider.storeWrapper.TryPauseCheckpoints())
171176
{
172-
logger?.LogError("Error could not acquire checkpoint lock");
177+
logger?.LogError("Error could not acquire checkpoint lock [{recoverStatus}]", recoverStatus);
173178
return false;
174179
}
175180

176181
if (!recoverLock.TryWriteLock())
177182
{
178-
logger?.LogError("Error could not acquire recover lock");
183+
logger?.LogError("Error could not acquire recover lock [{recoverStatus}]", recoverStatus);
179184
// If failed to acquire recoverLock re-enable checkpoint taking
180185
clusterProvider.storeWrapper.ResumeCheckpoints();
181186
return false;
182187
}
183188

189+
this.recoverStatus = recoverStatus;
190+
logger?.LogTrace("Success recover lock [{recoverStatus}]", recoverStatus);
184191
return true;
185192
}
186193

@@ -189,6 +196,8 @@ public bool StartRecovery()
189196
/// </summary>
190197
public void SuspendRecovery()
191198
{
199+
logger?.LogTrace("Release recover lock [{recoverStatus}]", recoverStatus);
200+
recoverStatus = RecoveryStatus.NoRecovery;
192201
recoverLock.WriteUnlock();
193202
clusterProvider.storeWrapper.ResumeCheckpoints();
194203
}

libs/cluster/Session/ReplicaOfCommand.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,13 @@ private bool TryREPLICAOF(out bool invalidParameters)
2424
var addressSpan = parseState.GetArgSliceByRef(0).ReadOnlySpan;
2525
var portSpan = parseState.GetArgSliceByRef(1).ReadOnlySpan;
2626

27-
//Turn of replication and make replica into a primary but do not delete data
27+
// Turn of replication and make replica into a primary but do not delete data
2828
if (addressSpan.EqualsUpperCaseSpanIgnoringCase("NO"u8) &&
2929
portSpan.EqualsUpperCaseSpanIgnoringCase("ONE"u8))
3030
{
3131
try
3232
{
33-
if (!clusterProvider.replicationManager.StartRecovery())
33+
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ReplicaOfNoOne))
3434
{
3535
logger?.LogError($"{nameof(TryREPLICAOF)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
3636
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK, ref dcurr, dend))

0 commit comments

Comments
 (0)