diff --git a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
index 63e8ce8e50d..f509cdf5bc3 100644
--- a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
+++ b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs
@@ -110,6 +110,11 @@ public override void Start()
{
}
+ ///
+ public override void Close()
+ {
+ }
+
public bool TryCreateMessageConsumer(Span bytes, INetworkSender networkSender, out IMessageConsumer session)
{
session = null;
diff --git a/global.json b/global.json
index 3462d015963..fa8e68b5fd5 100644
--- a/global.json
+++ b/global.json
@@ -1,6 +1,6 @@
{
"sdk": {
- "version": "10.0.103",
+ "version": "10.0.201",
"rollForward": "latestMajor",
"allowPrerelease": false
}
diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs
index 6dbbad160b0..e35e97fae42 100644
--- a/libs/host/GarnetServer.cs
+++ b/libs/host/GarnetServer.cs
@@ -478,7 +478,14 @@ public void Dispose(bool deleteDir = true)
private void InternalDispose()
{
+ // Phase 1: Stop listening on all servers to free ports immediately.
+ for (var i = 0; i < servers.Length; i++)
+ servers[i]?.Close();
+
+ // Phase 2: Dispose the provider (storage engine shutdown — may take time).
Provider?.Dispose();
+
+ // Phase 3: Drain active handlers and clean up remaining resources.
for (var i = 0; i < servers.Length; i++)
servers[i]?.Dispose();
subscribeBroker?.Dispose();
diff --git a/libs/server/Lua/LuaTimeoutManager.cs b/libs/server/Lua/LuaTimeoutManager.cs
index d7f8542498d..bccb2983fba 100644
--- a/libs/server/Lua/LuaTimeoutManager.cs
+++ b/libs/server/Lua/LuaTimeoutManager.cs
@@ -263,7 +263,7 @@ internal Registration RegisterForTimeout(SessionScriptCache cache)
goto tryAgain;
}
- // Other threads might update registrations, so check that before returning
+ // Other threads might update registrations, so check that before returning
checkUnmodified:
if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, curRegistrations, curRegistrations)) != curRegistrations)
{
diff --git a/libs/server/Metrics/Info/GarnetInfoMetrics.cs b/libs/server/Metrics/Info/GarnetInfoMetrics.cs
index 8d407d400ae..75dfc1962fc 100644
--- a/libs/server/Metrics/Info/GarnetInfoMetrics.cs
+++ b/libs/server/Metrics/Info/GarnetInfoMetrics.cs
@@ -25,6 +25,11 @@ class GarnetInfoMetrics
_ => true
})];
+ ///
+ /// All info sections excluding module-generated ones.
+ ///
+ public static readonly HashSet AllInfoSet = [.. DefaultInfo.Where(e => e != InfoMetricsType.MODULES)];
+
MetricsItem[] serverInfo = null;
MetricsItem[] memoryInfo = null;
MetricsItem[] clusterInfo = null;
diff --git a/libs/server/Metrics/Info/InfoCommand.cs b/libs/server/Metrics/Info/InfoCommand.cs
index e48c6b6b4d8..0722fdd3b9e 100644
--- a/libs/server/Metrics/Info/InfoCommand.cs
+++ b/libs/server/Metrics/Info/InfoCommand.cs
@@ -28,17 +28,20 @@ private bool NetworkINFO()
reset = true;
else if (sbSection.EqualsUpperCaseSpanIgnoringCase("HELP"u8))
help = true;
- else if (!sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8))
+ else if (sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8))
+ sections.UnionWith(GarnetInfoMetrics.AllInfoSet);
+ else if (sbSection.EqualsUpperCaseSpanIgnoringCase("DEFAULT"u8))
+ sections.UnionWith(GarnetInfoMetrics.DefaultInfo);
+ else if (sbSection.EqualsUpperCaseSpanIgnoringCase("EVERYTHING"u8))
+ sections.UnionWith(GarnetInfoMetrics.DefaultInfo);
+ else if (parseState.TryGetInfoMetricsType(i, out var sectionType))
{
- if (parseState.TryGetInfoMetricsType(i, out var sectionType))
- {
- sections.Add(sectionType);
- }
- else
- {
- invalid = true;
- invalidSection = parseState.GetString(i);
- }
+ sections.Add(sectionType);
+ }
+ else
+ {
+ invalid = true;
+ invalidSection = parseState.GetString(i);
}
}
}
diff --git a/libs/server/Metrics/Info/InfoHelp.cs b/libs/server/Metrics/Info/InfoHelp.cs
index 47cc24a3460..f2de3e79bed 100644
--- a/libs/server/Metrics/Info/InfoHelp.cs
+++ b/libs/server/Metrics/Info/InfoHelp.cs
@@ -10,6 +10,8 @@ class InfoHelp
{
internal const string HELP = "HELP";
internal const string ALL = "ALL";
+ internal const string DEFAULT = "DEFAULT";
+ internal const string EVERYTHING = "EVERYTHING";
internal const string RESET = "RESET";
public static List GetInfoTypeHelpMessage()
@@ -33,7 +35,9 @@ public static List GetInfoTypeHelpMessage()
$"{nameof(InfoMetricsType.KEYSPACE)}: Database related statistics.",
$"{nameof(InfoMetricsType.MODULES)}: Information related to loaded modules.",
$"{nameof(InfoMetricsType.HLOGSCAN)}: Distribution of records in main store's hybrid log in-memory portion.",
- $"{nameof(ALL)}: Return all informational sections.",
+ $"{nameof(ALL)}: Return all informational sections (excluding module generated ones).",
+ $"{nameof(DEFAULT)}: Return the default set of informational sections.",
+ $"{nameof(EVERYTHING)}: Return all informational sections including module generated ones.",
$"{nameof(HELP)}: Print this help message.",
$"{nameof(RESET)}: Reset stats.",
"\r\n",
diff --git a/libs/server/Servers/GarnetServerBase.cs b/libs/server/Servers/GarnetServerBase.cs
index 5bfd1ff62ff..b326c9f3d30 100644
--- a/libs/server/Servers/GarnetServerBase.cs
+++ b/libs/server/Servers/GarnetServerBase.cs
@@ -154,6 +154,9 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw
///
public abstract void Start();
+ ///
+ public abstract void Close();
+
///
public virtual void Dispose()
{
diff --git a/libs/server/Servers/GarnetServerTcp.cs b/libs/server/Servers/GarnetServerTcp.cs
index c681e09befa..48bbe61cfa7 100644
--- a/libs/server/Servers/GarnetServerTcp.cs
+++ b/libs/server/Servers/GarnetServerTcp.cs
@@ -82,24 +82,47 @@ public GarnetServerTcp(
this.unixSocketPath = unixSocketPath;
this.unixSocketPermission = unixSocketPermission;
- listenSocket = endpoint switch
+ if (endpoint is UnixDomainSocketEndPoint unix)
{
- UnixDomainSocketEndPoint unix => new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified),
+ // UDS Initialization & Cleanup
+ listenSocket = new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified);
+ var socketPath = unix.ToString();
+ if (File.Exists(socketPath))
+ {
+ File.Delete(socketPath);
+ }
+ }
+ else
+ {
+ // TCP Initialization & Port Reuse
+ listenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
- _ => new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp)
- };
+ // Set reuse BEFORE Bind to handle TIME_WAIT states
+ listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);
+ }
acceptEventArg = new SocketAsyncEventArgs();
acceptEventArg.Completed += AcceptEventArg_Completed;
}
+ ///
+ /// Stop listening for new connections. Frees the listening port
+ /// without waiting for active connections to drain.
+ ///
+ public override void Close()
+ {
+ listenSocket.Close();
+ }
+
///
/// Dispose
///
public override void Dispose()
{
- base.Dispose();
+ // Close listening socket to free the port and stop accepting new connections.
+ // This also prevents new connections from arriving while DisposeActiveHandlers drains existing ones.
listenSocket.Dispose();
+ base.Dispose();
acceptEventArg.UserToken = null;
acceptEventArg.Dispose();
networkPool?.Dispose();
diff --git a/libs/server/Servers/IGarnetServer.cs b/libs/server/Servers/IGarnetServer.cs
index 9e197451627..2b33e77648d 100644
--- a/libs/server/Servers/IGarnetServer.cs
+++ b/libs/server/Servers/IGarnetServer.cs
@@ -46,5 +46,11 @@ public interface IGarnetServer : IDisposable
/// Start server
///
public void Start();
+
+ ///
+ /// Stop listening for new connections. Frees the listening port
+ /// without waiting for active connections to drain.
+ ///
+ public void Close();
}
}
\ No newline at end of file
diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
index 9358a6c5921..d3df96a17b2 100644
--- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs
@@ -247,10 +247,11 @@ private protected bool ScanLookup 0)
- bContext.CompletePending(wait: true);
+ _ = bContext.CompletePending(wait: true);
- IterationComplete:
- if (resetCursor) cursor = 0;
+ IterationComplete:
+ if (resetCursor)
+ cursor = 0;
scanFunctions.OnStop(false, scanCursorState.acceptedCount);
return false;
}
diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
index 5258b1dc07b..7889e61efe9 100644
--- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
+++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs
@@ -246,7 +246,7 @@ internal OperationStatus ContinuePendingRMW(sessionFunctions, ref key, ref stackCtx);
}
- // Must do this *after* Unlocking.
+ // Must do this *after* Unlocking.
CheckRetry:
if (!HandleImmediateRetryStatus(status, sessionFunctions, ref pendingContext))
return status;
diff --git a/playground/Bitmap/BitOp.cs b/playground/Bitmap/BitOp.cs
index 0d8878f0849..893e619e778 100644
--- a/playground/Bitmap/BitOp.cs
+++ b/playground/Bitmap/BitOp.cs
@@ -360,7 +360,7 @@ private static void __bitop_simdX128_and(byte* dstBitmap, long dstLen, byte* src
#endregion
#region fillDstTail
- fillTail:
+ fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
@@ -530,7 +530,7 @@ private static void __bitop_simdX128_and_long(byte* dstBitmap, long dstLen, byte
#endregion
#region fillDstTail
- fillTail:
+ fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
@@ -729,7 +729,7 @@ private static void __bitop_simdX256_and(byte* dstBitmap, long dstLen, byte* src
#endregion
#region fillDstTail
- fillTail:
+ fillTail:
if (dstLen > srcLen)
{
batchSize = 8 * 4;
@@ -828,7 +828,7 @@ private static void __bitop_multikey_scalar_and(byte* dstPtr, int dstLen, byte**
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
- #endregion
+ #endregion
fillTail:
#region scalar_1x1
@@ -1046,7 +1046,7 @@ private static void __bitop_multikey_simdX128_and(byte* dstPtr, int dstLen, byte
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
- #endregion
+ #endregion
fillTail:
#region scalar_1x1
@@ -1266,7 +1266,7 @@ private static void __bitop_multikey_simdX256_and(byte* dstPtr, int dstLen, byte
*(long*)dstCurr = d00;
dstCurr += batchSize;
}
- #endregion
+ #endregion
fillTail:
#region scalar_1x1
diff --git a/test/Garnet.test.cluster/ClusterManagementTests.cs b/test/Garnet.test.cluster/ClusterManagementTests.cs
index eb742cdaf10..aba00602566 100644
--- a/test/Garnet.test.cluster/ClusterManagementTests.cs
+++ b/test/Garnet.test.cluster/ClusterManagementTests.cs
@@ -2,13 +2,16 @@
// Licensed under the MIT license.
using System;
+using System.Collections.Concurrent;
using System.Collections.Generic;
+using System.Diagnostics;
using System.Linq;
using System.Net;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Allure.NUnit;
+using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
@@ -930,5 +933,488 @@ public void ClusterCheckpointUpgradeFrom([Values] InstanceType instanceType)
ClassicAssert.AreEqual(value, dbValue);
}
}
+
+ [Test, Order(16)]
+ [CancelAfter(30_000)]
+ [TestCase(false)]
+ [TestCase(true)]
+ public async Task ClusterReplicationObjectCollectTest(bool useManualCollect, CancellationToken cancellationToken)
+ {
+ var replica_count = 1;// Per primary
+ var primary_count = 1;
+ var nodes_count = primary_count + (primary_count * replica_count);
+ var primaryNodeIndex = 0;
+ var replicaNodeIndex = 1;
+
+ context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: false, asyncReplay: false, expiredObjectCollectionFrequencySecs: !useManualCollect ? 100 : 0);
+ context.CreateConnection(useTLS: false);
+
+ var primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex);
+ var replicaServer = context.clusterTestUtils.GetServer(replicaNodeIndex);
+
+ // Setup cluster
+ context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger);
+ context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
+
+ var db = context.clusterTestUtils.GetDatabase();
+ HashEntry[] elements = [new HashEntry("field1", "hello"), new HashEntry("field2", "world"), new HashEntry("field3", "value3"), new HashEntry("field4", "value4"), new HashEntry("field5", "value5"), new HashEntry("field6", "value6")];
+ // Attach replica
+ var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
+ ClassicAssert.AreEqual("OK", resp);
+
+ // Execute first hash set workload
+ var hashSetKey = "myhash";
+ ExecuteHashSet(hashSetKey, elements);
+ await Task.Delay(3000).ConfigureAwait(false);
+ ManualCollect();
+ ValidateHashSet(hashSetKey);
+
+ // Execute second hash set workload
+ hashSetKey = "myhash2";
+ ExecuteHashSet(hashSetKey, elements);
+ await Task.Delay(3000).ConfigureAwait(false);
+ ManualCollect();
+ ValidateHashSet(hashSetKey);
+
+ void ExecuteHashSet(string key, HashEntry[] elements)
+ {
+ db.HashSet(key, elements);
+
+ var result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field1", "field2");
+ var results = (RedisResult[])result;
+ ClassicAssert.AreEqual(2, results.Length);
+ ClassicAssert.AreEqual(1, (long)results[0]);
+ ClassicAssert.AreEqual(1, (long)results[1]);
+
+ result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field3", "field4");
+ results = (RedisResult[])result;
+ ClassicAssert.AreEqual(2, results.Length);
+ ClassicAssert.AreEqual(1, (long)results[0]);
+ ClassicAssert.AreEqual(1, (long)results[1]);
+ }
+
+ void ValidateHashSet(string key)
+ {
+ // Check expected result at primary
+ var expectedFieldsAndValues = elements.AsSpan().Slice(4).ToArray()
+ .SelectMany(e => new[] { e.Name.ToString(), e.Value.ToString() })
+ .ToArray();
+ var fields = (string[])primaryServer.Execute("HGETALL", [key]);
+ ClassicAssert.AreEqual(4, fields.Length);
+ ClassicAssert.AreEqual(expectedFieldsAndValues, fields);
+
+ // Wait to ensure sync
+ context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger, cancellationToken);
+
+ // Check if replica is caught up
+ fields = (string[])replicaServer.Execute("HGETALL", [key]);
+ ClassicAssert.AreEqual(4, fields.Length);
+ ClassicAssert.AreEqual(expectedFieldsAndValues, fields);
+ }
+
+ void ManualCollect()
+ {
+ if (useManualCollect)
+ {
+ Assert.Throws(() => replicaServer.Execute("HCOLLECT", ["*"], CommandFlags.NoRedirect),
+ $"Expected exception was not thrown");
+
+ resp = (string)primaryServer.Execute("HCOLLECT", ["*"]);
+ ClassicAssert.AreEqual("OK", resp);
+ }
+ }
+ }
+
+ [Test, Order(17)]
+ [Category("CLUSTER")]
+ [CancelAfter(30_000)]
+ public async Task ReplicasRestartAsReplicasAsync(CancellationToken cancellation)
+ {
+ var replica_count = 1;// Per primary
+ var primary_count = 1;
+ var nodes_count = primary_count + primary_count * replica_count;
+ ClassicAssert.IsTrue(primary_count > 0);
+
+ context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: true, tryRecover: false, FastAofTruncate: true, CommitFrequencyMs: -1, clusterReplicationReestablishmentTimeout: 1);
+ context.CreateConnection(useTLS: true);
+ var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);
+
+ var primary = (IPEndPoint)context.endpoints[0];
+ var replica = (IPEndPoint)context.endpoints[1];
+
+ // Make sure role assignment is as expected
+ ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary).Value);
+ ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value);
+
+ context.ShutdownNode(primary);
+ context.ShutdownNode(replica);
+
+ // Intentionally leaving primary offline
+ context.RestartNode(replica);
+
+ // Delay a bit for replication init tasks to fire off
+ await Task.Delay(100, cancellation).ConfigureAwait(false);
+
+ // Make sure replica did not promote to Primary
+ ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value);
+ }
+
+ [Test, Order(18)]
+ [Category("CLUSTER")]
+ [CancelAfter(30_000)]
+ [TestCase(ExceptionInjectionType.None, true)]
+ [TestCase(ExceptionInjectionType.None, false)]
+ [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, true)]
+ [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, false)]
+ [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, true)]
+ [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, false)]
+ public async Task PrimaryUnavailableRecoveryAsync(ExceptionInjectionType faultType, bool replicaFailoverBeforeShutdown, CancellationToken cancellation)
+ {
+ // Case we're testing is where a Primary _and_ it's Replica die, but only the Replica comes back.
+ //
+ // If configured correctly (for example, as a cache), we still want the Replica to come back up with some data - even if it's
+ // acknowledges writes to the Primary are dropped.
+ //
+ // We also sometimes inject faults into the Primaries and Replicas before the proper fault, to simulate a cluster
+ // in an unstable environment.
+ //
+ // Additionally we simulate both when we detect failures and intervene in time to promote Replicas to Primaries,
+ // and when we don't intervene until after everything dies and the Replicas come back.
+
+#if !DEBUG
+ Assert.Ignore($"Depends on {nameof(ExceptionInjectionHelper)}, which is disabled in non-Debug builds");
+#endif
+
+ var replica_count = 1;// Per primary
+ var primary_count = 2;
+ var nodes_count = primary_count + primary_count * replica_count;
+ ClassicAssert.IsTrue(primary_count > 0);
+
+ // Config lifted from a deployed product, be wary of changing these without discussion
+ context.CreateInstances(
+ nodes_count,
+ tryRecover: true,
+ disablePubSub: false,
+ disableObjects: false,
+ enableAOF: true,
+ AofMemorySize: "128m",
+ CommitFrequencyMs: -1,
+ aofSizeLimit: "256m",
+ compactionFrequencySecs: 30,
+ compactionType: LogCompactionType.Scan,
+ latencyMonitory: true,
+ metricsSamplingFrequency: 1,
+ loggingFrequencySecs: 10,
+ checkpointThrottleFlushDelayMs: 0,
+ FastCommit: true,
+ FastAofTruncate: true,
+ OnDemandCheckpoint: true,
+ useTLS: true,
+ enableLua: true,
+ luaMemoryMode: LuaMemoryManagementMode.Tracked,
+ luaTransactionMode: true,
+ luaMemoryLimit: "2M",
+ clusterReplicationReestablishmentTimeout: 1,
+ clusterReplicaResumeWithData: true
+ );
+ context.CreateConnection(useTLS: true);
+ var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);
+
+ shards = context.clusterTestUtils.ClusterShards(0, context.logger);
+ ClassicAssert.AreEqual(2, shards.Count);
+
+ IPEndPoint primary1 = (IPEndPoint)context.endpoints[0];
+ IPEndPoint primary2 = (IPEndPoint)context.endpoints[1];
+ IPEndPoint replica1 = (IPEndPoint)context.endpoints[2];
+ IPEndPoint replica2 = (IPEndPoint)context.endpoints[3];
+
+ // Make sure role assignment is as expected
+ ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary1).Value);
+ ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary2).Value);
+ ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica1).Value);
+ ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica2).Value);
+
+ // Populate both shards
+ var writtenToPrimary1 = new ConcurrentDictionary();
+ var writtenToPrimary2 = new ConcurrentDictionary();
+ using (var writeTaskCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellation))
+ {
+ var uniquePrefix = Guid.NewGuid();
+
+ var writeToPrimary1Task =
+ Task.Run(
+ async () =>
+ {
+ var ix = -1;
+
+ var p1 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary1)));
+
+ while (!writeTaskCancel.IsCancellationRequested)
+ {
+ ix++;
+
+ var key = $"pura-1-{uniquePrefix}-{ix}";
+
+ var slot = context.clusterTestUtils.HashSlot(key);
+ if (!p1.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2))
+ {
+ continue;
+ }
+
+ var value = Guid.NewGuid().ToString();
+ try
+ {
+ var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]);
+ if (res == "OK")
+ {
+ writtenToPrimary1[key] = value;
+ }
+
+ await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore, cancellation or throwing should both just be powered through
+ }
+ }
+ },
+ cancellation
+ );
+
+ var writeToPrimary2Task =
+ Task.Run(
+ async () =>
+ {
+ var ix = -1;
+
+ var p2 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary2)));
+
+ while (!writeTaskCancel.IsCancellationRequested)
+ {
+ ix++;
+
+ var key = $"pura-2-{uniquePrefix}-{ix}";
+
+ var slot = context.clusterTestUtils.HashSlot(key);
+ if (!p2.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2))
+ {
+ continue;
+ }
+
+ var value = Guid.NewGuid().ToString();
+
+ try
+ {
+ var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]);
+ if (res == "OK")
+ {
+ writtenToPrimary2[key] = value;
+ }
+
+ await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false);
+ }
+ catch
+ {
+ // Ignore, cancellation or throwing should both just be powered through
+ }
+ }
+ },
+ cancellation
+ );
+
+ // Simulate out of band checkpointing
+ var checkpointTask =
+ Task.Run(
+ async () =>
+ {
+ while (!writeTaskCancel.IsCancellationRequested)
+ {
+ foreach (var node in context.nodes)
+ {
+ try
+ {
+ _ = await node.Store.CommitAOFAsync(writeTaskCancel.Token).ConfigureAwait(false);
+ }
+ catch (TaskCanceledException)
+ {
+ // Cancel is fine
+ break;
+ }
+ catch
+ {
+ // Ignore everything else
+ }
+ }
+
+ try
+ {
+ await Task.Delay(100, writeTaskCancel.Token).ConfigureAwait(false);
+ }
+ catch
+ {
+ continue;
+ }
+ }
+ },
+ cancellation
+ );
+
+ // Wait for a bit, optionally injecting a fault
+ if (faultType == ExceptionInjectionType.None)
+ {
+ await Task.Delay(10_000, cancellation).ConfigureAwait(false);
+ }
+ else
+ {
+ var timer = Stopwatch.StartNew();
+
+ // Things start fine
+ await Task.Delay(1_000, cancellation).ConfigureAwait(false);
+
+ // Wait for something to get replicated
+ var replica1Happened = false;
+ var replica2Happened = false;
+ do
+ {
+ if (!replica1Happened)
+ {
+ var readonlyRes = (string)context.clusterTestUtils.Execute(replica1, "READONLY", []);
+ ClassicAssert.AreEqual("OK", readonlyRes);
+
+ foreach (var kv in writtenToPrimary1)
+ {
+ var val = (string)context.clusterTestUtils.Execute(replica1, "GET", [kv.Key]);
+ if (val == kv.Value)
+ {
+ replica1Happened = true;
+ break;
+ }
+ }
+ }
+
+ if (!replica2Happened)
+ {
+ var readonlyRes = (string)context.clusterTestUtils.Execute(replica2, "READONLY", []);
+ ClassicAssert.AreEqual("OK", readonlyRes);
+
+ foreach (var kv in writtenToPrimary2)
+ {
+ var val = (string)context.clusterTestUtils.Execute(replica2, "GET", [kv.Key]);
+ if (val == kv.Value)
+ {
+ replica2Happened = true;
+ break;
+ }
+ }
+ }
+
+ await Task.Delay(100, cancellation).ConfigureAwait(false);
+ } while (!replica1Happened || !replica2Happened);
+
+ // Things fail for a bit
+ ExceptionInjectionHelper.EnableException(faultType);
+ await Task.Delay(2_000, cancellation).ConfigureAwait(false);
+
+ // Things recover
+ ExceptionInjectionHelper.DisableException(faultType);
+
+ timer.Stop();
+
+ // Wait out the rest of the duration
+ if (timer.ElapsedMilliseconds < 10_000)
+ {
+ await Task.Delay((int)(10_000 - timer.ElapsedMilliseconds), cancellation).ConfigureAwait(false);
+ }
+ }
+
+ // Stop writing
+ writeTaskCancel.Cancel();
+
+ // Wait for all our writes and checkpoints to spin down
+ await Task.WhenAll(writeToPrimary1Task, writeToPrimary2Task, checkpointTask).ConfigureAwait(false);
+ }
+
+ // Shutdown all primaries
+ context.ShutdownNode(primary1);
+ context.ShutdownNode(primary2);
+
+ // Sometimes we can intervene post-Primary crash but pre-Replica crash, simulate that
+ if (replicaFailoverBeforeShutdown)
+ {
+ await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false);
+ }
+
+ // Shutdown the (old) replicas
+ context.ShutdownNode(replica1);
+ context.ShutdownNode(replica2);
+
+ // Restart just the replicas
+ context.RestartNode(replica1);
+ context.RestartNode(replica2);
+
+ // If we didn't promte pre-crash, promote now that Replicas came back
+ if (!replicaFailoverBeforeShutdown)
+ {
+ await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false);
+ }
+
+ // Confirm that at least some of the data is available on each Replica
+ var onReplica1 = 0;
+ foreach (var (k, v) in writtenToPrimary1)
+ {
+ var res = (string)context.clusterTestUtils.Execute(replica1, "GET", [k]);
+ if (res is not null)
+ {
+ ClassicAssert.AreEqual(v, res);
+ onReplica1++;
+ }
+ }
+
+ var onReplica2 = 0;
+ foreach (var (k, v) in writtenToPrimary2)
+ {
+ var res = (string)context.clusterTestUtils.Execute(replica2, "GET", [k]);
+ if (res is not null)
+ {
+ ClassicAssert.AreEqual(v, res);
+ onReplica2++;
+ }
+ }
+
+ // Something, ANYTHING, made it
+ ClassicAssert.IsTrue(onReplica1 > 0, $"Nothing made it to replica 1, should have been up to {writtenToPrimary1.Count} values");
+ ClassicAssert.IsTrue(onReplica2 > 0, $"Nothing made it to replica 2, should have been up to {writtenToPrimary2.Count} values");
+
+ static async Task UpgradeReplicasAsync(ClusterTestContext context, IPEndPoint replica1, IPEndPoint replica2, CancellationToken cancellation)
+ {
+ // Promote the replicas, if no primary is coming back
+ var takeOverRes1 = (string)context.clusterTestUtils.Execute(replica1, "CLUSTER", ["FAILOVER", "FORCE"]);
+ var takeOverRes2 = (string)context.clusterTestUtils.Execute(replica2, "CLUSTER", ["FAILOVER", "FORCE"]);
+ ClassicAssert.AreEqual("OK", takeOverRes1);
+ ClassicAssert.AreEqual("OK", takeOverRes2);
+
+ // Wait for roles to update
+ while (true)
+ {
+ await Task.Delay(10, cancellation).ConfigureAwait(false);
+
+ if (context.clusterTestUtils.RoleCommand(replica1).Value != "master")
+ {
+ continue;
+ }
+
+ if (context.clusterTestUtils.RoleCommand(replica2).Value != "master")
+ {
+ continue;
+ }
+
+ break;
+ }
+ }
+ }
}
}
\ No newline at end of file
diff --git a/test/Garnet.test.cluster/ClusterNegativeTests.cs b/test/Garnet.test.cluster/ClusterNegativeTests.cs
index 645bdbfd0a6..88c740db833 100644
--- a/test/Garnet.test.cluster/ClusterNegativeTests.cs
+++ b/test/Garnet.test.cluster/ClusterNegativeTests.cs
@@ -291,7 +291,7 @@ public void ClusterCheckpointAcquireTest([Values] bool fastAofTruncate, [Values]
}
}
- [Test, Order(6), CancelAfter(testTimeout)]
+ [Test, Order(7), CancelAfter(testTimeout)]
public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken)
{
var primaryIndex = 0;
@@ -312,6 +312,8 @@ public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger);
context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger);
+ context.clusterTestUtils.WaitUntilNodeIsKnown(replicaIndex, primaryIndex, logger: context.logger);
var keyLength = 32;
var kvpairCount = 32;
@@ -369,7 +371,7 @@ public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken
}
}
- [Test, Order(6), CancelAfter(testTimeout)]
+ [Test, Order(8), CancelAfter(testTimeout)]
public void ClusterFailedToAddAofSyncTask()
{
var primaryIndex = 0;
@@ -419,7 +421,7 @@ public void ClusterFailedToAddAofSyncTask()
context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaIndex);
}
- [Test, Order(6), CancelAfter(testTimeout)]
+ [Test, Order(9), CancelAfter(testTimeout)]
public void ClusterReplicaSyncTimeoutTest()
{
var primaryIndex = 0;
@@ -468,7 +470,7 @@ public void ClusterReplicaSyncTimeoutTest()
}
#endif
- [Test, CancelAfter(60_000)]
+ [Test, Order(10), CancelAfter(60_000)]
public async Task ClusterParallelFailoverOnDistinctShards(CancellationToken cancellationToken)
{
var nodes_count = 4;
@@ -523,8 +525,7 @@ public async Task ClusterParallelFailoverOnDistinctShards(CancellationToken canc
}
}
-
- [Test, CancelAfter(60_000)]
+ [Test, Order(11), CancelAfter(60_000)]
public void ClusterMeetFromReplica(CancellationToken cancellationToken)
{
var nodes_count = 3;
@@ -559,5 +560,71 @@ public void ClusterMeetFromReplica(CancellationToken cancellationToken)
Assert.That(nodes_count, Is.EqualTo(context.clusterTestUtils.ClusterNodes(i).Nodes.Count));
}
}
+
+ [Test, Order(12)]
+ [Category("REPLICATION")]
+ public void ClusterDontKnowReplicaFailTest([Values] bool useReplicaOf)
+ {
+ var replica_count = 1;// Per primary
+ var primary_count = 1;
+ var nodes_count = primary_count + (primary_count * replica_count);
+ ClassicAssert.IsTrue(primary_count > 0);
+ context.CreateInstances(nodes_count, disableObjects: true, FastAofTruncate: false, OnDemandCheckpoint: false, CommitFrequencyMs: -1, enableAOF: true, useTLS: false, asyncReplay: false);
+ context.CreateConnection(useTLS: false);
+
+ var primaryNodeIndex = 0;
+ var replicaNodeIndex = 1;
+ ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger));
+ context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, 1, logger: context.logger);
+ context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, 2, logger: context.logger);
+
+ var primaryId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger);
+ string resp;
+ if (!useReplicaOf)
+ {
+ resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, failEx: false, logger: context.logger);
+ ClassicAssert.AreEqual($"ERR I don't know about node {primaryId}.", resp);
+ }
+ else
+ {
+ resp = context.clusterTestUtils.ReplicaOf(replicaNodeIndex, primaryNodeIndex, failEx: false, logger: context.logger);
+ ClassicAssert.AreEqual($"ERR I don't know about node {context.clusterTestUtils.GetEndPoint(primaryNodeIndex)}.", resp);
+ }
+ }
+
+ [Test, Order(13)]
+ [Category("REPLICATION")]
+ public void ClusterReplicateFails()
+ {
+ const string UserName = "temp-user";
+ const string Password = "temp-password";
+
+ const string ClusterUserName = "cluster-user";
+ const string ClusterPassword = "cluster-password";
+
+ // Setup a cluster (mimicking the style in which this bug was first found)
+ ServerCredential clusterCreds = new(ClusterUserName, ClusterPassword, IsAdmin: true, UsedForClusterAuth: true, IsClearText: true);
+ ServerCredential userCreds = new(UserName, Password, IsAdmin: true, UsedForClusterAuth: false, IsClearText: true);
+
+ context.GenerateCredentials([userCreds, clusterCreds]);
+ context.CreateInstances(2, disableObjects: true, disablePubSub: true, enableAOF: true, clusterCreds: clusterCreds, useAcl: true, FastAofTruncate: true, CommitFrequencyMs: -1, asyncReplay: false);
+ var primaryEndpoint = (IPEndPoint)context.endpoints.First();
+ var replicaEndpoint = (IPEndPoint)context.endpoints.Last();
+
+ ClassicAssert.AreNotEqual(primaryEndpoint, replicaEndpoint, "Should have different endpoints for nodes");
+
+ using var primaryConnection = ConnectionMultiplexer.Connect($"{primaryEndpoint.Address}:{primaryEndpoint.Port},user={UserName},password={Password}");
+ var primaryServer = primaryConnection.GetServer(primaryEndpoint);
+
+ ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["ADDSLOTSRANGE", "0", "16383"], flags: CommandFlags.NoRedirect));
+ ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["MEET", replicaEndpoint.Address.ToString(), replicaEndpoint.Port.ToString()], flags: CommandFlags.NoRedirect));
+
+ using var replicaConnection = ConnectionMultiplexer.Connect($"{replicaEndpoint.Address}:{replicaEndpoint.Port},user={UserName},password={Password}");
+ var replicaServer = replicaConnection.GetServer(replicaEndpoint);
+
+ // Try to replicate from a server that doesn't exist
+ var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect));
+ ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node "));
+ }
}
}
\ No newline at end of file
diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs
index b35b15dbad8..449799d5444 100644
--- a/test/Garnet.test.cluster/ClusterTestContext.cs
+++ b/test/Garnet.test.cluster/ClusterTestContext.cs
@@ -14,6 +14,7 @@
using Garnet.server.Auth.Settings;
using Microsoft.Extensions.Logging;
using NUnit.Framework;
+using NUnit.Framework.Interfaces;
using NUnit.Framework.Legacy;
using StackExchange.Redis;
using Tsavorite.core;
@@ -117,42 +118,73 @@ public void RestartNode(int nodeIndex)
nodes[nodeIndex].Start();
}
+
public void TearDown()
{
+ // Capture test outcome before any teardown work to distinguish
+ // primary teardown failures from secondary ones caused by a hung/failed test.
+ var testOutcome = TestContext.CurrentContext.Result.Outcome;
+ var testAlreadyFailed = testOutcome.Status == TestStatus.Failed;
+
+ if (testAlreadyFailed)
+ {
+ logger?.LogError(
+ "TearDown: test already failed ({label}): {message}",
+ testOutcome.Label,
+ TestContext.CurrentContext.Result.Message);
+ }
+
cts.Cancel();
cts.Dispose();
- logger.LogDebug("0. Dispose <<<<<<<<<<<");
waiter?.Dispose();
clusterTestUtils?.Dispose();
- var timeoutSeconds = 5;
- var failMessage = "";
+ var timeoutSeconds = 60;
+ string failureReason = null;
- if (!Task.Run(() => DisposeCluster()).Wait(TimeSpan.FromSeconds(timeoutSeconds)))
+ // Phase 1: Dispose cluster nodes (may timeout if handlers are stuck)
+ try
{
- logger?.LogError("Timed out waiting for DisposeCluster");
- failMessage += "Timed out waiting for DisposeCluster; ";
+ if (!Task.Run(() => DisposeCluster()).Wait(TimeSpan.FromSeconds(timeoutSeconds)))
+ {
+ failureReason = "Timed out waiting for DisposeCluster";
+ logger?.LogError("Timed out waiting for DisposeCluster");
+ }
}
- // Dispose logger factory only after servers are disposed
- loggerFactory?.Dispose();
- if (!Task.Run(() => TestUtils.DeleteDirectory(TestFolder, true)).Wait(TimeSpan.FromSeconds(timeoutSeconds)))
+ catch (Exception ex)
{
- logger?.LogError("Timed out DeleteDirectory");
- failMessage += "Timed out DeleteDirectory; ";
+ failureReason = $"DisposeCluster threw: {ex.Message}";
+ logger?.LogError(ex, "DisposeCluster failed");
}
+ // Phase 2: Dispose logger factory (always, even after timeout)
+ loggerFactory?.Dispose();
+
+ // Phase 3: Delete test directory (may timeout if files locked from Phase 1 timeout)
try
{
- TestUtils.OnTearDown();
+ if (!Task.Run(() => TestUtils.DeleteDirectory(TestFolder, true)).Wait(TimeSpan.FromSeconds(timeoutSeconds)))
+ {
+ failureReason ??= "Timed out DeleteDirectory";
+ logger?.LogError("Timed out DeleteDirectory");
+ }
}
- catch (AssertionException e)
+ catch (Exception ex)
{
- failMessage += e.Message;
+ failureReason ??= $"DeleteDirectory threw: {ex.Message}";
+ logger?.LogError(ex, "DeleteDirectory failed");
}
- if (failMessage != "")
+ // Phase 4: Always runs — resets LightEpoch instances to prevent cross-test contamination
+ TestUtils.OnTearDown();
+
+ // Fail the test at the end, after all cleanup is done
+ if (failureReason != null)
{
- ClassicAssert.Fail(failMessage);
+ var context = testAlreadyFailed
+ ? $" (secondary failure — test already failed with '{testOutcome.Label}')"
+ : " (primary failure — test itself passed)";
+ Assert.Fail(failureReason + context);
}
}
@@ -516,6 +548,20 @@ public void PopulatePrimary(
}
}
+ public void SimplePrimaryReplicaSetup()
+ {
+ var primaryIndex = 0;
+ var replicaIndex = 1;
+ // Setup cluster
+ var resp = clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], true, logger);
+ ClassicAssert.AreEqual("OK", resp);
+ clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger);
+ clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger);
+ clusterTestUtils.Meet(primaryIndex, replicaIndex, logger);
+ clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex);
+ clusterTestUtils.WaitUntilNodeIsKnown(replicaIndex, primaryIndex);
+ }
+
public void SimplePopulateDB(bool disableObjects, int keyLength, int kvpairCount, int primaryIndex, int addCount = 0, bool performRMW = false)
{
//Populate Primary
@@ -726,9 +772,6 @@ public void ClusterFailoverSpinWait(int replicaNodeIndex, ILogger logger)
public void AttachAndWaitForSync(int primary_count, int replica_count, bool disableObjects)
{
var primaryId = clusterTestUtils.GetNodeIdFromNode(0, logger);
- // Issue meet to replicas
- for (var i = primary_count; i < primary_count + replica_count; i++)
- clusterTestUtils.Meet(i, 0);
// Wait until primary node is known so as not to fail replicate
for (var i = primary_count; i < primary_count + replica_count; i++)
@@ -736,10 +779,7 @@ public void AttachAndWaitForSync(int primary_count, int replica_count, bool disa
// Issue cluster replicate and bump epoch manually to capture config.
for (var i = primary_count; i < primary_count + replica_count; i++)
- {
_ = clusterTestUtils.ClusterReplicate(i, primaryId, async: true, logger: logger);
- clusterTestUtils.BumpEpoch(i, logger: logger);
- }
if (!checkpointTask.Wait(TimeSpan.FromSeconds(100))) Assert.Fail("Checkpoint task timeout");
diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs
index e910f9f3776..cab5017862c 100644
--- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs
+++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs
@@ -6,6 +6,7 @@
namespace Garnet.test.cluster
{
[NonParallelizable]
+ [Ignore("Skip to reduce CI duration.")]
public class ClusterReplicationAsyncReplay : ClusterReplicationBaseTests
{
[SetUp]
diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
index 8c6a940c0be..e460b557e91 100644
--- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
+++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs
@@ -2,7 +2,6 @@
// Licensed under the MIT license.
using System;
-using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
@@ -734,24 +733,31 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo
[Test, Order(13)]
[Category("REPLICATION")]
+ //[Repeat(20)]
public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool enableIncrementalSnapshots)
{
+ if (TestContext.CurrentContext.CurrentRepeatCount > 0)
+ Debug.WriteLine($"*** Current test iteration: {TestContext.CurrentContext.CurrentRepeatCount + 1}, name = {TestContext.CurrentContext.Test.Name} ***");
+
+ var primaryIndex = 0;
+ var replicaIndex = 1;
var replica_count = 1;//Per primary
var primary_count = 1;
var nodes_count = primary_count + (primary_count * replica_count);
ClassicAssert.IsTrue(primary_count > 0);
context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: true, segmentSize: "4k", EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay, deviceType: Tsavorite.core.DeviceType.Native);
context.CreateConnection(useTLS: useTLS);
- ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger));
- context.clusterTestUtils.BumpEpoch(0, logger: context.logger);
- var cconfig = context.clusterTestUtils.ClusterNodes(0, context.logger);
+ // Setup cluster
+ context.SimplePrimaryReplicaSetup();
+
+ var cconfig = context.clusterTestUtils.ClusterNodes(primaryIndex, context.logger);
var myself = cconfig.Nodes.First();
var slotRangesStr = string.Join(",", myself.Slots.Select(x => $"({x.From}-{x.To})").ToList());
ClassicAssert.AreEqual(1, myself.Slots.Count, $"Setup failed slot ranges count greater than 1 {slotRangesStr}");
var shards = context.clusterTestUtils.ClusterShards(0, context.logger);
- ClassicAssert.AreEqual(1, shards.Count);
+ ClassicAssert.AreEqual(2, shards.Count);
ClassicAssert.AreEqual(1, shards[0].slotRanges.Count);
ClassicAssert.AreEqual(0, shards[0].slotRanges[0].Item1);
ClassicAssert.AreEqual(16383, shards[0].slotRanges[0].Item2);
@@ -767,7 +773,7 @@ public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [V
if (!attachReplicaTask.Wait(TimeSpan.FromSeconds(60)))
Assert.Fail("attachReplicaTask timeout");
- context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex: 0, secondaryIndex: 1, logger: context.logger);
+ context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex: primaryIndex, secondaryIndex: replicaIndex, logger: context.logger);
}
[Test, Order(14)]
@@ -814,59 +820,6 @@ public void ClusterMainMemoryReplicationAttachReplicas()
context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 2);
}
- [Test, Order(15)]
- [Category("REPLICATION")]
- public void ClusterDontKnowReplicaFailTest([Values] bool performRMW, [Values] bool MainMemoryReplication, [Values] bool onDemandCheckpoint, [Values] bool useReplicaOf)
- {
- var replica_count = 1;// Per primary
- var primary_count = 1;
- var nodes_count = primary_count + (primary_count * replica_count);
- ClassicAssert.IsTrue(primary_count > 0);
- context.CreateInstances(nodes_count, disableObjects: true, FastAofTruncate: MainMemoryReplication, OnDemandCheckpoint: onDemandCheckpoint, CommitFrequencyMs: -1, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay);
- context.CreateConnection(useTLS: useTLS);
-
- var primaryNodeIndex = 0;
- var replicaNodeIndex = 1;
- ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger));
- context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, 1, logger: context.logger);
- context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, 2, logger: context.logger);
- context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
- context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
-
- var replicaId = context.clusterTestUtils.ClusterMyId(replicaNodeIndex, logger: context.logger);
- _ = context.clusterTestUtils.ClusterForget(primaryNodeIndex, replicaId, 5, logger: context.logger);
-
- var primaryId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger);
- string resp;
- if (!useReplicaOf)
- resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, failEx: false, logger: context.logger);
- else
- resp = context.clusterTestUtils.ReplicaOf(replicaNodeIndex, primaryNodeIndex, failEx: false, logger: context.logger);
- ClassicAssert.IsTrue(resp.StartsWith("PRIMARY-ERR"));
-
- while (true)
- {
- context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
- context.clusterTestUtils.BumpEpoch(replicaNodeIndex, logger: context.logger);
- var config = context.clusterTestUtils.ClusterNodes(primaryNodeIndex, logger: context.logger);
- if (config.Nodes.Count == 2) break;
- ClusterTestUtils.BackOff(cancellationToken: context.cts.Token);
- }
-
- _ = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, logger: context.logger);
-
- context.kvPairs = [];
- var keyLength = 32;
- var kvpairCount = keyCount;
- var addCount = 5;
- if (!performRMW)
- context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex);
- else
- context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex, addCount);
-
- context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex);
- }
-
[Test, Order(16)]
[Category("REPLICATION")]
public void ClusterDivergentReplicasTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool ckptBeforeDivergence)
@@ -1042,41 +995,6 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp
context.ValidateNodeObjects(ref context.kvPairsObj, replicaIndex: newPrimaryIndex, set: set);
}
- [Test, Order(21)]
- [Category("REPLICATION")]
- public void ClusterReplicateFails()
- {
- const string UserName = "temp-user";
- const string Password = "temp-password";
-
- const string ClusterUserName = "cluster-user";
- const string ClusterPassword = "cluster-password";
-
- // Setup a cluster (mimicking the style in which this bug was first found)
- ServerCredential clusterCreds = new(ClusterUserName, ClusterPassword, IsAdmin: true, UsedForClusterAuth: true, IsClearText: true);
- ServerCredential userCreds = new(UserName, Password, IsAdmin: true, UsedForClusterAuth: false, IsClearText: true);
-
- context.GenerateCredentials([userCreds, clusterCreds]);
- context.CreateInstances(2, disableObjects: true, disablePubSub: true, enableAOF: true, clusterCreds: clusterCreds, useAcl: true, FastAofTruncate: true, CommitFrequencyMs: -1, asyncReplay: asyncReplay);
- var primaryEndpoint = (IPEndPoint)context.endpoints.First();
- var replicaEndpoint = (IPEndPoint)context.endpoints.Last();
-
- ClassicAssert.AreNotEqual(primaryEndpoint, replicaEndpoint, "Should have different endpoints for nodes");
-
- using var primaryConnection = ConnectionMultiplexer.Connect($"{primaryEndpoint.Address}:{primaryEndpoint.Port},user={UserName},password={Password}");
- var primaryServer = primaryConnection.GetServer(primaryEndpoint);
-
- ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["ADDSLOTSRANGE", "0", "16383"], flags: CommandFlags.NoRedirect));
- ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["MEET", replicaEndpoint.Address.ToString(), replicaEndpoint.Port.ToString()], flags: CommandFlags.NoRedirect));
-
- using var replicaConnection = ConnectionMultiplexer.Connect($"{replicaEndpoint.Address}:{replicaEndpoint.Port},user={UserName},password={Password}");
- var replicaServer = replicaConnection.GetServer(replicaEndpoint);
-
- // Try to replicate from a server that doesn't exist
- var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect));
- ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node "));
- }
-
[Test, Order(22)]
[Category("REPLICATION")]
public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW)
@@ -1519,394 +1437,6 @@ void RestartRecover(int iteration)
}
}
- [Test, Order(27)]
- [Category("CLUSTER")]
- [CancelAfter(30_000)]
- public async Task ReplicasRestartAsReplicasAsync(CancellationToken cancellation)
- {
- var replica_count = 1;// Per primary
- var primary_count = 1;
- var nodes_count = primary_count + primary_count * replica_count;
- ClassicAssert.IsTrue(primary_count > 0);
-
- context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: true, tryRecover: false, FastAofTruncate: true, CommitFrequencyMs: -1, clusterReplicationReestablishmentTimeout: 1);
- context.CreateConnection(useTLS: true);
- var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);
-
- IPEndPoint primary = (IPEndPoint)context.endpoints[0];
- IPEndPoint replica = (IPEndPoint)context.endpoints[1];
-
- // Make sure role assignment is as expected
- ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary).Value);
- ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value);
-
- context.ShutdownNode(primary);
- context.ShutdownNode(replica);
-
- // Intentionally leaving primary offline
- context.RestartNode(replica);
-
- // Delay a bit for replication init tasks to fire off
- await Task.Delay(100, cancellation).ConfigureAwait(false);
-
- // Make sure replica did not promote to Primary
- ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value);
- }
-
- [Test, Order(28)]
- [Category("CLUSTER")]
- [CancelAfter(30_000)]
- [TestCase(ExceptionInjectionType.None, true)]
- [TestCase(ExceptionInjectionType.None, false)]
- [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, true)]
- [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, false)]
- [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, true)]
- [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, false)]
- public async Task PrimaryUnavailableRecoveryAsync(ExceptionInjectionType faultType, bool replicaFailoverBeforeShutdown, CancellationToken cancellation)
- {
- // Case we're testing is where a Primary _and_ it's Replica die, but only the Replica comes back.
- //
- // If configured correctly (for example, as a cache), we still want the Replica to come back up with some data - even if it's
- // acknowledges writes to the Primary are dropped.
- //
- // We also sometimes inject faults into the Primaries and Replicas before the proper fault, to simulate a cluster
- // in an unstable environment.
- //
- // Additionally we simulate both when we detect failures and intervene in time to promote Replicas to Primaries,
- // and when we don't intervene until after everything dies and the Replicas come back.
-
-#if !DEBUG
- Assert.Ignore($"Depends on {nameof(ExceptionInjectionHelper)}, which is disabled in non-Debug builds");
-#endif
-
- var replica_count = 1;// Per primary
- var primary_count = 2;
- var nodes_count = primary_count + primary_count * replica_count;
- ClassicAssert.IsTrue(primary_count > 0);
-
- // Config lifted from a deployed product, be wary of changing these without discussion
- context.CreateInstances(
- nodes_count,
- tryRecover: true,
- disablePubSub: false,
- disableObjects: false,
- enableAOF: true,
- AofMemorySize: "128m",
- CommitFrequencyMs: -1,
- aofSizeLimit: "256m",
- compactionFrequencySecs: 30,
- compactionType: LogCompactionType.Scan,
- latencyMonitory: true,
- metricsSamplingFrequency: 1,
- loggingFrequencySecs: 10,
- checkpointThrottleFlushDelayMs: 0,
- FastCommit: true,
- FastAofTruncate: true,
- OnDemandCheckpoint: true,
- useTLS: true,
- enableLua: true,
- luaMemoryMode: LuaMemoryManagementMode.Tracked,
- luaTransactionMode: true,
- luaMemoryLimit: "2M",
- clusterReplicationReestablishmentTimeout: 1,
- clusterReplicaResumeWithData: true
- );
- context.CreateConnection(useTLS: true);
- var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger);
-
- shards = context.clusterTestUtils.ClusterShards(0, context.logger);
- ClassicAssert.AreEqual(2, shards.Count);
-
- IPEndPoint primary1 = (IPEndPoint)context.endpoints[0];
- IPEndPoint primary2 = (IPEndPoint)context.endpoints[1];
- IPEndPoint replica1 = (IPEndPoint)context.endpoints[2];
- IPEndPoint replica2 = (IPEndPoint)context.endpoints[3];
-
- // Make sure role assignment is as expected
- ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary1).Value);
- ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary2).Value);
- ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica1).Value);
- ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica2).Value);
-
- // Populate both shards
- var writtenToPrimary1 = new ConcurrentDictionary();
- var writtenToPrimary2 = new ConcurrentDictionary();
- using (var writeTaskCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellation))
- {
- var uniquePrefix = Guid.NewGuid();
-
- var writeToPrimary1Task =
- Task.Run(
- async () =>
- {
- var ix = -1;
-
- var p1 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary1)));
-
- while (!writeTaskCancel.IsCancellationRequested)
- {
- ix++;
-
- var key = $"pura-1-{uniquePrefix}-{ix}";
-
- var slot = context.clusterTestUtils.HashSlot(key);
- if (!p1.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2))
- {
- continue;
- }
-
- var value = Guid.NewGuid().ToString();
- try
- {
- var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]);
- if (res == "OK")
- {
- writtenToPrimary1[key] = value;
- }
-
- await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false);
- }
- catch
- {
- // Ignore, cancellation or throwing should both just be powered through
- }
- }
- },
- cancellation
- );
-
- var writeToPrimary2Task =
- Task.Run(
- async () =>
- {
- var ix = -1;
-
- var p2 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary2)));
-
- while (!writeTaskCancel.IsCancellationRequested)
- {
- ix++;
-
- var key = $"pura-2-{uniquePrefix}-{ix}";
-
- var slot = context.clusterTestUtils.HashSlot(key);
- if (!p2.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2))
- {
- continue;
- }
-
- var value = Guid.NewGuid().ToString();
-
- try
- {
- var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]);
- if (res == "OK")
- {
- writtenToPrimary2[key] = value;
- }
-
- await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false);
- }
- catch
- {
- // Ignore, cancellation or throwing should both just be powered through
- }
- }
- },
- cancellation
- );
-
- // Simulate out of band checkpointing
- var checkpointTask =
- Task.Run(
- async () =>
- {
- while (!writeTaskCancel.IsCancellationRequested)
- {
- foreach (var node in context.nodes)
- {
- try
- {
- _ = await node.Store.CommitAOFAsync(writeTaskCancel.Token).ConfigureAwait(false);
- }
- catch (TaskCanceledException)
- {
- // Cancel is fine
- break;
- }
- catch
- {
- // Ignore everything else
- }
- }
-
- try
- {
- await Task.Delay(100, writeTaskCancel.Token).ConfigureAwait(false);
- }
- catch
- {
- continue;
- }
- }
- },
- cancellation
- );
-
- // Wait for a bit, optionally injecting a fault
- if (faultType == ExceptionInjectionType.None)
- {
- await Task.Delay(10_000, cancellation).ConfigureAwait(false);
- }
- else
- {
- var timer = Stopwatch.StartNew();
-
- // Things start fine
- await Task.Delay(1_000, cancellation).ConfigureAwait(false);
-
- // Wait for something to get replicated
- var replica1Happened = false;
- var replica2Happened = false;
- do
- {
- if (!replica1Happened)
- {
- var readonlyRes = (string)context.clusterTestUtils.Execute(replica1, "READONLY", []);
- ClassicAssert.AreEqual("OK", readonlyRes);
-
- foreach (var kv in writtenToPrimary1)
- {
- var val = (string)context.clusterTestUtils.Execute(replica1, "GET", [kv.Key]);
- if (val == kv.Value)
- {
- replica1Happened = true;
- break;
- }
- }
- }
-
- if (!replica2Happened)
- {
- var readonlyRes = (string)context.clusterTestUtils.Execute(replica2, "READONLY", []);
- ClassicAssert.AreEqual("OK", readonlyRes);
-
- foreach (var kv in writtenToPrimary2)
- {
- var val = (string)context.clusterTestUtils.Execute(replica2, "GET", [kv.Key]);
- if (val == kv.Value)
- {
- replica2Happened = true;
- break;
- }
- }
- }
-
- await Task.Delay(100, cancellation).ConfigureAwait(false);
- } while (!replica1Happened || !replica2Happened);
-
- // Things fail for a bit
- ExceptionInjectionHelper.EnableException(faultType);
- await Task.Delay(2_000, cancellation).ConfigureAwait(false);
-
- // Things recover
- ExceptionInjectionHelper.DisableException(faultType);
-
- timer.Stop();
-
- // Wait out the rest of the duration
- if (timer.ElapsedMilliseconds < 10_000)
- {
- await Task.Delay((int)(10_000 - timer.ElapsedMilliseconds), cancellation).ConfigureAwait(false);
- }
- }
-
- // Stop writing
- writeTaskCancel.Cancel();
-
- // Wait for all our writes and checkpoints to spin down
- await Task.WhenAll(writeToPrimary1Task, writeToPrimary2Task, checkpointTask).ConfigureAwait(false);
- }
-
- // Shutdown all primaries
- context.ShutdownNode(primary1);
- context.ShutdownNode(primary2);
-
- // Sometimes we can intervene post-Primary crash but pre-Replica crash, simulate that
- if (replicaFailoverBeforeShutdown)
- {
- await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false);
- }
-
- // Shutdown the (old) replicas
- context.ShutdownNode(replica1);
- context.ShutdownNode(replica2);
-
- // Restart just the replicas
- context.RestartNode(replica1);
- context.RestartNode(replica2);
-
- // If we didn't promte pre-crash, promote now that Replicas came back
- if (!replicaFailoverBeforeShutdown)
- {
- await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false);
- }
-
- // Confirm that at least some of the data is available on each Replica
- var onReplica1 = 0;
- foreach (var (k, v) in writtenToPrimary1)
- {
- var res = (string)context.clusterTestUtils.Execute(replica1, "GET", [k]);
- if (res is not null)
- {
- ClassicAssert.AreEqual(v, res);
- onReplica1++;
- }
- }
-
- var onReplica2 = 0;
- foreach (var (k, v) in writtenToPrimary2)
- {
- var res = (string)context.clusterTestUtils.Execute(replica2, "GET", [k]);
- if (res is not null)
- {
- ClassicAssert.AreEqual(v, res);
- onReplica2++;
- }
- }
-
- // Something, ANYTHING, made it
- ClassicAssert.IsTrue(onReplica1 > 0, $"Nothing made it to replica 1, should have been up to {writtenToPrimary1.Count} values");
- ClassicAssert.IsTrue(onReplica2 > 0, $"Nothing made it to replica 2, should have been up to {writtenToPrimary2.Count} values");
-
- static async Task UpgradeReplicasAsync(ClusterTestContext context, IPEndPoint replica1, IPEndPoint replica2, CancellationToken cancellation)
- {
- // Promote the replicas, if no primary is coming back
- var takeOverRes1 = (string)context.clusterTestUtils.Execute(replica1, "CLUSTER", ["FAILOVER", "FORCE"]);
- var takeOverRes2 = (string)context.clusterTestUtils.Execute(replica2, "CLUSTER", ["FAILOVER", "FORCE"]);
- ClassicAssert.AreEqual("OK", takeOverRes1);
- ClassicAssert.AreEqual("OK", takeOverRes2);
-
- // Wait for roles to update
- while (true)
- {
- await Task.Delay(10, cancellation).ConfigureAwait(false);
-
- if (context.clusterTestUtils.RoleCommand(replica1).Value != "master")
- {
- continue;
- }
-
- if (context.clusterTestUtils.RoleCommand(replica2).Value != "master")
- {
- continue;
- }
-
- break;
- }
- }
- }
-
[Test, Order(27)]
[Category("CLUSTER")]
[Category("REPLICATION")]
@@ -2075,101 +1605,5 @@ public void ClusterReplicationSimpleTransactionTest([Values] bool storedProcedur
var replicaPInfo = context.clusterTestUtils.GetPersistenceInfo(replicaNodeIndex, context.logger);
ClassicAssert.AreEqual(primaryPInfo.TailAddress, replicaPInfo.TailAddress);
}
-
- [Test, Order(29)]
- [Category("REPLICATION")]
- [CancelAfter(30_000)]
- [TestCase(false)]
- [TestCase(true)]
- public async Task ClusterReplicationObjectCollectTest(bool useManualCollect, CancellationToken cancellationToken)
- {
- var replica_count = 1;// Per primary
- var primary_count = 1;
- var nodes_count = primary_count + (primary_count * replica_count);
- var primaryNodeIndex = 0;
- var replicaNodeIndex = 1;
-
- context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay, expiredObjectCollectionFrequencySecs: !useManualCollect ? 100 : 0);
- context.CreateConnection(useTLS: useTLS);
-
- var primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex);
- var replicaServer = context.clusterTestUtils.GetServer(replicaNodeIndex);
-
- // Setup cluster
- context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger);
- context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger);
- context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger);
- context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
- context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger);
- context.clusterTestUtils.WaitUntilNodeIsKnown(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
-
- var db = context.clusterTestUtils.GetDatabase();
- HashEntry[] elements = [new HashEntry("field1", "hello"), new HashEntry("field2", "world"), new HashEntry("field3", "value3"), new HashEntry("field4", "value4"), new HashEntry("field5", "value5"), new HashEntry("field6", "value6")];
- // Attach replica
- var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger);
- ClassicAssert.AreEqual("OK", resp);
-
- // Execute first hash set workload
- var hashSetKey = "myhash";
- ExecuteHashSet(hashSetKey, elements);
- await Task.Delay(3000).ConfigureAwait(false);
- ManualCollect();
- ValidateHashSet(hashSetKey);
-
- // Execute second hash set workload
- hashSetKey = "myhash2";
- ExecuteHashSet(hashSetKey, elements);
- await Task.Delay(3000).ConfigureAwait(false);
- ManualCollect();
- ValidateHashSet(hashSetKey);
-
- void ExecuteHashSet(string key, HashEntry[] elements)
- {
- db.HashSet(key, elements);
-
- var result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field1", "field2");
- var results = (RedisResult[])result;
- ClassicAssert.AreEqual(2, results.Length);
- ClassicAssert.AreEqual(1, (long)results[0]);
- ClassicAssert.AreEqual(1, (long)results[1]);
-
- result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field3", "field4");
- results = (RedisResult[])result;
- ClassicAssert.AreEqual(2, results.Length);
- ClassicAssert.AreEqual(1, (long)results[0]);
- ClassicAssert.AreEqual(1, (long)results[1]);
- }
-
- void ValidateHashSet(string key)
- {
- // Check expected result at primary
- var expectedFieldsAndValues = elements.AsSpan().Slice(4).ToArray()
- .SelectMany(e => new[] { e.Name.ToString(), e.Value.ToString() })
- .ToArray();
- var fields = (string[])primaryServer.Execute("HGETALL", [key]);
- ClassicAssert.AreEqual(4, fields.Length);
- ClassicAssert.AreEqual(expectedFieldsAndValues, fields);
-
- // Wait to ensure sync
- context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger, cancellationToken);
-
- // Check if replica is caught up
- fields = (string[])replicaServer.Execute("HGETALL", [key]);
- ClassicAssert.AreEqual(4, fields.Length);
- ClassicAssert.AreEqual(expectedFieldsAndValues, fields);
- }
-
- void ManualCollect()
- {
- if (useManualCollect)
- {
- Assert.Throws(() => replicaServer.Execute("HCOLLECT", ["*"], CommandFlags.NoRedirect),
- $"Expected exception was not thrown");
-
- resp = (string)primaryServer.Execute("HCOLLECT", ["*"]);
- ClassicAssert.AreEqual("OK", resp);
- }
- }
- }
}
}
\ No newline at end of file
diff --git a/test/Garnet.test/RespInfoTests.cs b/test/Garnet.test/RespInfoTests.cs
index a61de748976..d6c4cb96c0d 100644
--- a/test/Garnet.test/RespInfoTests.cs
+++ b/test/Garnet.test/RespInfoTests.cs
@@ -75,6 +75,86 @@ public void ResetStatsTest()
ClassicAssert.AreEqual("total_found:1", totalFound, "Expected total_found to be one after sending one successful request");
}
+ [Test]
+ [TestCase("ALL")]
+ [TestCase("DEFAULT")]
+ [TestCase("EVERYTHING")]
+ public void InfoSectionOptionsTest(string option)
+ {
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var db = redis.GetDatabase(0);
+
+ var infoResult = db.Execute("INFO", option).ToString();
+ ClassicAssert.IsNotNull(infoResult);
+ ClassicAssert.IsNotEmpty(infoResult);
+
+ // All options should include these core sections
+ ClassicAssert.IsTrue(infoResult.Contains("# Server"), $"INFO {option} should contain Server section");
+ ClassicAssert.IsTrue(infoResult.Contains("# Memory"), $"INFO {option} should contain Memory section");
+ ClassicAssert.IsTrue(infoResult.Contains("# Stats"), $"INFO {option} should contain Stats section");
+ ClassicAssert.IsTrue(infoResult.Contains("# Clients"), $"INFO {option} should contain Clients section");
+ ClassicAssert.IsTrue(infoResult.Contains("# Keyspace"), $"INFO {option} should contain Keyspace section");
+
+ // ALL excludes Modules section; DEFAULT and EVERYTHING include it
+ if (option == "ALL")
+ {
+ ClassicAssert.IsFalse(infoResult.Contains("# Modules"), "INFO ALL should not contain Modules section");
+ }
+ else
+ {
+ ClassicAssert.IsTrue(infoResult.Contains("# Modules"), $"INFO {option} should contain Modules section");
+ }
+
+ // All three options are based on DefaultInfo which excludes expensive sections
+ ClassicAssert.IsFalse(infoResult.Contains("MainStoreHashTableDistribution"), $"INFO {option} should not contain StoreHashTable section");
+ ClassicAssert.IsFalse(infoResult.Contains("MainStoreDeletedRecordRevivification"), $"INFO {option} should not contain StoreReviv section");
+ }
+
+ [Test]
+ public void InfoDefaultMatchesNoArgsTest()
+ {
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var db = redis.GetDatabase(0);
+
+ var infoNoArgs = db.Execute("INFO").ToString();
+ var infoDefault = db.Execute("INFO", "DEFAULT").ToString();
+
+ // Both should return the same set of section headers
+ var noArgsSections = GetSectionHeaders(infoNoArgs);
+ var defaultSections = GetSectionHeaders(infoDefault);
+
+ CollectionAssert.AreEquivalent(noArgsSections, defaultSections,
+ "INFO (no args) and INFO DEFAULT should return the same sections");
+ }
+
+ [Test]
+ public void InfoAllWithModulesEqualsEverythingTest()
+ {
+ using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig());
+ var db = redis.GetDatabase(0);
+
+ var infoEverything = db.Execute("INFO", "EVERYTHING").ToString();
+ var infoAllModules = db.Execute("INFO", "ALL", "MODULES").ToString();
+
+ var everythingSections = GetSectionHeaders(infoEverything);
+ var allModulesSections = GetSectionHeaders(infoAllModules);
+
+ CollectionAssert.AreEquivalent(everythingSections, allModulesSections,
+ "INFO EVERYTHING and INFO ALL MODULES should return the same sections");
+ }
+
+ private static List GetSectionHeaders(string infoOutput)
+ {
+ ClassicAssert.IsNotNull(infoOutput, "INFO output should not be null");
+ ClassicAssert.IsNotEmpty(infoOutput, "INFO output should not be empty");
+
+ return infoOutput.Split("\r\n")
+ .Where(line => line.StartsWith("# "))
+ .Select(line => line.TrimStart('#', ' '))
+ .OrderBy(s => s)
+ .ToList();
+ }
+
[Test]
public async Task InfoHlogScanTest()
{