Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterManagerWorkerState.cs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro

// Transition to recovering state
// Only one caller will succeed in becoming a replica for the provided node-id
if (!clusterProvider.replicationManager.StartRecovery())
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterReplicate))
{
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK;
Expand Down
2 changes: 2 additions & 0 deletions libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,8 @@ public MetricsItem[] GetReplicationInfo()
replicationInfo.Add(new("master_sync_last_io_seconds_ago", replicationManager.LastPrimarySyncSeconds.ToString()));
replicationInfo.Add(new("replication_offset_lag", replicationOffsetLag.ToString()));
replicationInfo.Add(new("replication_offset_max_lag", storeWrapper.serverOptions.ReplicationOffsetMaxLag.ToString()));
replicationInfo.Add(new("recover_status", replicationManager.recoverStatus.ToString()));
replicationInfo.Add(new("last_failover_state", !clusterEnabled ? FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER) : failoverManager.GetLastFailoverStatus()));
}
else
{
Expand Down
10 changes: 9 additions & 1 deletion libs/cluster/Server/Failover/FailoverManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ internal sealed class FailoverManager(ClusterProvider clusterProvider, ILogger l
readonly TimeSpan clusterTimeout = clusterProvider.serverOptions.ClusterTimeout <= 0 ? Timeout.InfiniteTimeSpan : TimeSpan.FromSeconds(clusterProvider.serverOptions.ClusterTimeout);
readonly ILogger logger = logger;
private SingleWriterMultiReaderLock failoverTaskLock;
public FailoverStatus lastFailoverStatus = FailoverStatus.NO_FAILOVER;

public void Dispose()
{
Expand Down Expand Up @@ -46,6 +47,12 @@ public string GetFailoverStatus()
FailoverUtils.GetFailoverStatus(FailoverStatus.NO_FAILOVER);
}

/// <summary>
/// Retrieve the status of the last failover
/// </summary>
/// <returns></returns>
public string GetLastFailoverStatus() => FailoverUtils.GetFailoverStatus(lastFailoverStatus);

/// <summary>
/// Method used to initiate a background failover from a replica (CLUSTER FAILOVER command)
/// </summary>
Expand All @@ -66,7 +73,8 @@ public bool TryStartReplicaFailover(FailoverOption option, TimeSpan failoverTime
logger: logger);
_ = Task.Run(async () =>
{
_ = await currentFailoverSession.BeginAsyncReplicaFailover();
var success = await currentFailoverSession.BeginAsyncReplicaFailover();
lastFailoverStatus = success ? FailoverStatus.FAILOVER_COMPLETED : FailoverStatus.FAILOVER_ABORTED;
Reset();
});
return true;
Expand Down
6 changes: 5 additions & 1 deletion libs/cluster/Server/Failover/FailoverStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ internal enum FailoverStatus : byte
ISSUING_PAUSE_WRITES,
WAITING_FOR_SYNC,
FAILOVER_IN_PROGRESS,
TAKING_OVER_AS_PRIMARY
TAKING_OVER_AS_PRIMARY,
FAILOVER_COMPLETED,
FAILOVER_ABORTED
}

internal static class FailoverUtils
Expand All @@ -36,6 +38,8 @@ public static string GetFailoverStatus(FailoverStatus? status)
FailoverStatus.WAITING_FOR_SYNC => "waiting-for-sync",
FailoverStatus.FAILOVER_IN_PROGRESS => "failover-in-progress",
FailoverStatus.TAKING_OVER_AS_PRIMARY => "taking-over-as-primary",
FailoverStatus.FAILOVER_COMPLETED => "failover-completed",
FailoverStatus.FAILOVER_ABORTED => "failover-aborted",
_ => throw new Exception("invalid failover status"),
};
}
Expand Down
6 changes: 4 additions & 2 deletions libs/cluster/Server/Failover/ReplicaFailoverSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,15 @@ private bool TakeOverAsPrimary()
{
// Take over as primary and inform old primary
status = FailoverStatus.TAKING_OVER_AS_PRIMARY;
var acquiredLock = true;

try
{
// Make replica syncing unavailable by setting recovery flag
if (!clusterProvider.replicationManager.StartRecovery())
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ClusterFailover))
{
logger?.LogWarning($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
acquiredLock = false;
return false;
}
_ = clusterProvider.BumpAndWaitForEpochTransition();
Expand All @@ -178,7 +180,7 @@ private bool TakeOverAsPrimary()
finally
{
// Disable recovering as now this node has become a primary or failed in its attempt earlier
clusterProvider.replicationManager.SuspendRecovery();
if (acquiredLock) clusterProvider.replicationManager.SuspendRecovery();
}

return true;
Expand Down
32 changes: 32 additions & 0 deletions libs/cluster/Server/Replication/RecoveryStatus.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.cluster
{
/// <summary>
/// Recovery status
/// </summary>
public enum RecoveryStatus : byte
{
/// <summary>
/// No recovery
/// </summary>
NoRecovery,
/// <summary>
/// Recovery at initialization
/// </summary>
InitializeRecover,
/// <summary>
/// Recovery at cluster replicate
/// </summary>
ClusterReplicate,
/// <summary>
/// Recovery at cluster failover
/// </summary>
ClusterFailover,
/// <summary>
/// Recovery at replica of no one
/// </summary>
ReplicaOfNoOne
}
}
17 changes: 13 additions & 4 deletions libs/cluster/Server/Replication/ReplicationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ public long ReplicationOffset2
public string PrimaryReplId => currentReplicationConfig.primary_replid;
public string PrimaryReplId2 => currentReplicationConfig.primary_replid2;

/// <summary>
/// Recovery status
/// </summary>
public RecoveryStatus recoverStatus;

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ReplicationLogCheckpointManager GetCkptManager(StoreType storeType)
{
Expand Down Expand Up @@ -112,7 +117,7 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShift = CheckpointVersionShift;

// If this node starts as replica, it cannot serve requests until it is connected to primary
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery())
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery(RecoveryStatus.InitializeRecover))
throw new Exception(Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));

checkpointStore = new CheckpointStore(storeWrapper, clusterProvider, true, logger);
Expand Down Expand Up @@ -165,22 +170,24 @@ void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
/// <summary>
/// Acquire recovery and checkpoint locks to prevent checkpoints and parallel recovery tasks
/// </summary>
public bool StartRecovery()
public bool StartRecovery(RecoveryStatus recoverStatus)
{
if (!clusterProvider.storeWrapper.TryPauseCheckpoints())
{
logger?.LogError("Error could not acquire checkpoint lock");
logger?.LogError("Error could not acquire checkpoint lock [{recoverStatus}]", recoverStatus);
return false;
}

if (!recoverLock.TryWriteLock())
{
logger?.LogError("Error could not acquire recover lock");
logger?.LogError("Error could not acquire recover lock [{recoverStatus}]", recoverStatus);
// If failed to acquire recoverLock re-enable checkpoint taking
clusterProvider.storeWrapper.ResumeCheckpoints();
return false;
}

this.recoverStatus = recoverStatus;
logger?.LogTrace("Success recover lock [{recoverStatus}]", recoverStatus);
return true;
}

Expand All @@ -189,6 +196,8 @@ public bool StartRecovery()
/// </summary>
public void SuspendRecovery()
{
logger?.LogTrace("Release recover lock [{recoverStatus}]", recoverStatus);
recoverStatus = RecoveryStatus.NoRecovery;
recoverLock.WriteUnlock();
clusterProvider.storeWrapper.ResumeCheckpoints();
}
Expand Down
4 changes: 2 additions & 2 deletions libs/cluster/Session/ReplicaOfCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ private bool TryREPLICAOF(out bool invalidParameters)
var addressSpan = parseState.GetArgSliceByRef(0).ReadOnlySpan;
var portSpan = parseState.GetArgSliceByRef(1).ReadOnlySpan;

//Turn of replication and make replica into a primary but do not delete data
// Turn of replication and make replica into a primary but do not delete data
if (addressSpan.EqualsUpperCaseSpanIgnoringCase("NO"u8) &&
portSpan.EqualsUpperCaseSpanIgnoringCase("ONE"u8))
{
try
{
if (!clusterProvider.replicationManager.StartRecovery())
if (!clusterProvider.replicationManager.StartRecovery(RecoveryStatus.ReplicaOfNoOne))
{
logger?.LogError($"{nameof(TryREPLICAOF)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
while (!RespWriteUtils.TryWriteError(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK, ref dcurr, dend))
Expand Down