From 82501b8f88c3f065928532fdc4e8d2db46912524 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 24 Mar 2026 13:44:08 -0700 Subject: [PATCH 1/8] Fix INFO all/default/everything returning empty response Implement support for INFO all, default, and everything options: - all: returns all DefaultInfo sections excluding module-generated ones - default: returns the default set of sections (same as no-arg INFO) - everything: returns all DefaultInfo sections including modules Added pre-declared HashSet collections (AllInfoSet, EverythingInfoSet) in GarnetInfoMetrics.cs derived from DefaultInfo to support these options. Updated InfoCommand.cs to use UnionWith with the new HashSets instead of silently skipping the ALL keyword. Added tests for all three options, verifying correct section inclusion/exclusion. Fixes #1643 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- libs/server/Metrics/Info/GarnetInfoMetrics.cs | 10 +++ libs/server/Metrics/Info/InfoCommand.cs | 23 +++--- libs/server/Metrics/Info/InfoHelp.cs | 6 +- test/Garnet.test/RespInfoTests.cs | 77 +++++++++++++++++++ 4 files changed, 105 insertions(+), 11 deletions(-) diff --git a/libs/server/Metrics/Info/GarnetInfoMetrics.cs b/libs/server/Metrics/Info/GarnetInfoMetrics.cs index 8d407d400ae..8806467762b 100644 --- a/libs/server/Metrics/Info/GarnetInfoMetrics.cs +++ b/libs/server/Metrics/Info/GarnetInfoMetrics.cs @@ -25,6 +25,16 @@ class GarnetInfoMetrics _ => true })]; + /// + /// All info sections excluding module-generated ones. + /// + public static readonly HashSet AllInfoSet = [.. DefaultInfo.Where(e => e != InfoMetricsType.MODULES)]; + + /// + /// All info sections including module-generated ones. + /// + public static readonly HashSet EverythingInfoSet = [.. DefaultInfo]; + MetricsItem[] serverInfo = null; MetricsItem[] memoryInfo = null; MetricsItem[] clusterInfo = null; diff --git a/libs/server/Metrics/Info/InfoCommand.cs b/libs/server/Metrics/Info/InfoCommand.cs index e48c6b6b4d8..7c7c4e71837 100644 --- a/libs/server/Metrics/Info/InfoCommand.cs +++ b/libs/server/Metrics/Info/InfoCommand.cs @@ -28,17 +28,20 @@ private bool NetworkINFO() reset = true; else if (sbSection.EqualsUpperCaseSpanIgnoringCase("HELP"u8)) help = true; - else if (!sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8)) + else if (sbSection.EqualsUpperCaseSpanIgnoringCase("ALL"u8)) + sections.UnionWith(GarnetInfoMetrics.AllInfoSet); + else if (sbSection.EqualsUpperCaseSpanIgnoringCase("DEFAULT"u8)) + sections.UnionWith(GarnetInfoMetrics.DefaultInfo); + else if (sbSection.EqualsUpperCaseSpanIgnoringCase("EVERYTHING"u8)) + sections.UnionWith(GarnetInfoMetrics.EverythingInfoSet); + else if (parseState.TryGetInfoMetricsType(i, out var sectionType)) { - if (parseState.TryGetInfoMetricsType(i, out var sectionType)) - { - sections.Add(sectionType); - } - else - { - invalid = true; - invalidSection = parseState.GetString(i); - } + sections.Add(sectionType); + } + else + { + invalid = true; + invalidSection = parseState.GetString(i); } } } diff --git a/libs/server/Metrics/Info/InfoHelp.cs b/libs/server/Metrics/Info/InfoHelp.cs index 47cc24a3460..f2de3e79bed 100644 --- a/libs/server/Metrics/Info/InfoHelp.cs +++ b/libs/server/Metrics/Info/InfoHelp.cs @@ -10,6 +10,8 @@ class InfoHelp { internal const string HELP = "HELP"; internal const string ALL = "ALL"; + internal const string DEFAULT = "DEFAULT"; + internal const string EVERYTHING = "EVERYTHING"; internal const string RESET = "RESET"; public static List GetInfoTypeHelpMessage() @@ -33,7 +35,9 @@ public static List GetInfoTypeHelpMessage() $"{nameof(InfoMetricsType.KEYSPACE)}: Database related statistics.", $"{nameof(InfoMetricsType.MODULES)}: Information related to loaded modules.", $"{nameof(InfoMetricsType.HLOGSCAN)}: Distribution of records in main store's hybrid log in-memory portion.", - $"{nameof(ALL)}: Return all informational sections.", + $"{nameof(ALL)}: Return all informational sections (excluding module generated ones).", + $"{nameof(DEFAULT)}: Return the default set of informational sections.", + $"{nameof(EVERYTHING)}: Return all informational sections including module generated ones.", $"{nameof(HELP)}: Print this help message.", $"{nameof(RESET)}: Reset stats.", "\r\n", diff --git a/test/Garnet.test/RespInfoTests.cs b/test/Garnet.test/RespInfoTests.cs index a61de748976..a50c90924e8 100644 --- a/test/Garnet.test/RespInfoTests.cs +++ b/test/Garnet.test/RespInfoTests.cs @@ -75,6 +75,83 @@ public void ResetStatsTest() ClassicAssert.AreEqual("total_found:1", totalFound, "Expected total_found to be one after sending one successful request"); } + [Test] + [TestCase("ALL")] + [TestCase("DEFAULT")] + [TestCase("EVERYTHING")] + public void InfoSectionOptionsTest(string option) + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var infoResult = db.Execute("INFO", option).ToString(); + ClassicAssert.IsNotNull(infoResult); + ClassicAssert.IsNotEmpty(infoResult); + + // All options should include these core sections + ClassicAssert.IsTrue(infoResult.Contains("# Server"), $"INFO {option} should contain Server section"); + ClassicAssert.IsTrue(infoResult.Contains("# Memory"), $"INFO {option} should contain Memory section"); + ClassicAssert.IsTrue(infoResult.Contains("# Stats"), $"INFO {option} should contain Stats section"); + ClassicAssert.IsTrue(infoResult.Contains("# Clients"), $"INFO {option} should contain Clients section"); + ClassicAssert.IsTrue(infoResult.Contains("# Keyspace"), $"INFO {option} should contain Keyspace section"); + + // ALL excludes Modules section; DEFAULT and EVERYTHING include it + if (option == "ALL") + { + ClassicAssert.IsFalse(infoResult.Contains("# Modules"), "INFO ALL should not contain Modules section"); + } + else + { + ClassicAssert.IsTrue(infoResult.Contains("# Modules"), $"INFO {option} should contain Modules section"); + } + + // All three options are based on DefaultInfo which excludes expensive sections + ClassicAssert.IsFalse(infoResult.Contains("MainStoreHashTableDistribution"), $"INFO {option} should not contain StoreHashTable section"); + ClassicAssert.IsFalse(infoResult.Contains("MainStoreDeletedRecordRevivification"), $"INFO {option} should not contain StoreReviv section"); + } + + [Test] + public void InfoDefaultMatchesNoArgsTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var infoNoArgs = db.Execute("INFO").ToString(); + var infoDefault = db.Execute("INFO", "DEFAULT").ToString(); + + // Both should return the same set of section headers + var noArgsSections = GetSectionHeaders(infoNoArgs); + var defaultSections = GetSectionHeaders(infoDefault); + + CollectionAssert.AreEquivalent(noArgsSections, defaultSections, + "INFO (no args) and INFO DEFAULT should return the same sections"); + } + + [Test] + public void InfoAllWithModulesEqualsEverythingTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var infoEverything = db.Execute("INFO", "EVERYTHING").ToString(); + var infoAllModules = db.Execute("INFO", "ALL", "MODULES").ToString(); + + var everythingSections = GetSectionHeaders(infoEverything); + var allModulesSections = GetSectionHeaders(infoAllModules); + + CollectionAssert.AreEquivalent(everythingSections, allModulesSections, + "INFO EVERYTHING and INFO ALL MODULES should return the same sections"); + } + + private static List GetSectionHeaders(string infoOutput) + { + return infoOutput.Split("\r\n") + .Where(line => line.StartsWith("# ")) + .Select(line => line.TrimStart('#', ' ')) + .OrderBy(s => s) + .ToList(); + } + [Test] public async Task InfoHlogScanTest() { From c834733114494ea2f61d85c65ce54cf2ad440e78 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 24 Mar 2026 13:57:33 -0700 Subject: [PATCH 2/8] make everything option use DefaultInfo --- libs/server/Metrics/Info/GarnetInfoMetrics.cs | 5 ----- libs/server/Metrics/Info/InfoCommand.cs | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/libs/server/Metrics/Info/GarnetInfoMetrics.cs b/libs/server/Metrics/Info/GarnetInfoMetrics.cs index 8806467762b..75dfc1962fc 100644 --- a/libs/server/Metrics/Info/GarnetInfoMetrics.cs +++ b/libs/server/Metrics/Info/GarnetInfoMetrics.cs @@ -30,11 +30,6 @@ class GarnetInfoMetrics /// public static readonly HashSet AllInfoSet = [.. DefaultInfo.Where(e => e != InfoMetricsType.MODULES)]; - /// - /// All info sections including module-generated ones. - /// - public static readonly HashSet EverythingInfoSet = [.. DefaultInfo]; - MetricsItem[] serverInfo = null; MetricsItem[] memoryInfo = null; MetricsItem[] clusterInfo = null; diff --git a/libs/server/Metrics/Info/InfoCommand.cs b/libs/server/Metrics/Info/InfoCommand.cs index 7c7c4e71837..0722fdd3b9e 100644 --- a/libs/server/Metrics/Info/InfoCommand.cs +++ b/libs/server/Metrics/Info/InfoCommand.cs @@ -33,7 +33,7 @@ private bool NetworkINFO() else if (sbSection.EqualsUpperCaseSpanIgnoringCase("DEFAULT"u8)) sections.UnionWith(GarnetInfoMetrics.DefaultInfo); else if (sbSection.EqualsUpperCaseSpanIgnoringCase("EVERYTHING"u8)) - sections.UnionWith(GarnetInfoMetrics.EverythingInfoSet); + sections.UnionWith(GarnetInfoMetrics.DefaultInfo); else if (parseState.TryGetInfoMetricsType(i, out var sectionType)) { sections.Add(sectionType); From 3ee5d52a27b97c0120192f87f834e2f021ca0f78 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Tue, 24 Mar 2026 14:02:16 -0700 Subject: [PATCH 3/8] Add null/empty guard in GetSectionHeaders test helper Adds explicit asserts before splitting INFO output so test failures surface a clear message instead of a NullReferenceException. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- test/Garnet.test/RespInfoTests.cs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/Garnet.test/RespInfoTests.cs b/test/Garnet.test/RespInfoTests.cs index a50c90924e8..d6c4cb96c0d 100644 --- a/test/Garnet.test/RespInfoTests.cs +++ b/test/Garnet.test/RespInfoTests.cs @@ -145,6 +145,9 @@ public void InfoAllWithModulesEqualsEverythingTest() private static List GetSectionHeaders(string infoOutput) { + ClassicAssert.IsNotNull(infoOutput, "INFO output should not be null"); + ClassicAssert.IsNotEmpty(infoOutput, "INFO output should not be empty"); + return infoOutput.Split("\r\n") .Where(line => line.StartsWith("# ")) .Select(line => line.TrimStart('#', ' ')) From c75d911638669f97dd33a9a7eb611550d7bb9922 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Fri, 20 Mar 2026 12:15:14 -0700 Subject: [PATCH 4/8] wip; restructuring cluster tests to reduce CI duration --- .../ClusterManagementTests.cs | 486 +++++++++++++++ .../ClusterNegativeTests.cs | 77 ++- .../ClusterReplicationAsyncReplay.cs | 1 + .../ClusterReplicationBaseTests.cs | 574 ------------------ 4 files changed, 558 insertions(+), 580 deletions(-) diff --git a/test/Garnet.test.cluster/ClusterManagementTests.cs b/test/Garnet.test.cluster/ClusterManagementTests.cs index eb742cdaf10..aba00602566 100644 --- a/test/Garnet.test.cluster/ClusterManagementTests.cs +++ b/test/Garnet.test.cluster/ClusterManagementTests.cs @@ -2,13 +2,16 @@ // Licensed under the MIT license. using System; +using System.Collections.Concurrent; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Net; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; using Allure.NUnit; +using Garnet.common; using Garnet.server; using Microsoft.Extensions.Logging; using NUnit.Framework; @@ -930,5 +933,488 @@ public void ClusterCheckpointUpgradeFrom([Values] InstanceType instanceType) ClassicAssert.AreEqual(value, dbValue); } } + + [Test, Order(16)] + [CancelAfter(30_000)] + [TestCase(false)] + [TestCase(true)] + public async Task ClusterReplicationObjectCollectTest(bool useManualCollect, CancellationToken cancellationToken) + { + var replica_count = 1;// Per primary + var primary_count = 1; + var nodes_count = primary_count + (primary_count * replica_count); + var primaryNodeIndex = 0; + var replicaNodeIndex = 1; + + context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: false, asyncReplay: false, expiredObjectCollectionFrequencySecs: !useManualCollect ? 100 : 0); + context.CreateConnection(useTLS: false); + + var primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex); + var replicaServer = context.clusterTestUtils.GetServer(replicaNodeIndex); + + // Setup cluster + context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger); + context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); + context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); + context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger); + context.clusterTestUtils.WaitUntilNodeIsKnown(replicaNodeIndex, primaryNodeIndex, logger: context.logger); + + var db = context.clusterTestUtils.GetDatabase(); + HashEntry[] elements = [new HashEntry("field1", "hello"), new HashEntry("field2", "world"), new HashEntry("field3", "value3"), new HashEntry("field4", "value4"), new HashEntry("field5", "value5"), new HashEntry("field6", "value6")]; + // Attach replica + var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger); + ClassicAssert.AreEqual("OK", resp); + + // Execute first hash set workload + var hashSetKey = "myhash"; + ExecuteHashSet(hashSetKey, elements); + await Task.Delay(3000).ConfigureAwait(false); + ManualCollect(); + ValidateHashSet(hashSetKey); + + // Execute second hash set workload + hashSetKey = "myhash2"; + ExecuteHashSet(hashSetKey, elements); + await Task.Delay(3000).ConfigureAwait(false); + ManualCollect(); + ValidateHashSet(hashSetKey); + + void ExecuteHashSet(string key, HashEntry[] elements) + { + db.HashSet(key, elements); + + var result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field1", "field2"); + var results = (RedisResult[])result; + ClassicAssert.AreEqual(2, results.Length); + ClassicAssert.AreEqual(1, (long)results[0]); + ClassicAssert.AreEqual(1, (long)results[1]); + + result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field3", "field4"); + results = (RedisResult[])result; + ClassicAssert.AreEqual(2, results.Length); + ClassicAssert.AreEqual(1, (long)results[0]); + ClassicAssert.AreEqual(1, (long)results[1]); + } + + void ValidateHashSet(string key) + { + // Check expected result at primary + var expectedFieldsAndValues = elements.AsSpan().Slice(4).ToArray() + .SelectMany(e => new[] { e.Name.ToString(), e.Value.ToString() }) + .ToArray(); + var fields = (string[])primaryServer.Execute("HGETALL", [key]); + ClassicAssert.AreEqual(4, fields.Length); + ClassicAssert.AreEqual(expectedFieldsAndValues, fields); + + // Wait to ensure sync + context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger, cancellationToken); + + // Check if replica is caught up + fields = (string[])replicaServer.Execute("HGETALL", [key]); + ClassicAssert.AreEqual(4, fields.Length); + ClassicAssert.AreEqual(expectedFieldsAndValues, fields); + } + + void ManualCollect() + { + if (useManualCollect) + { + Assert.Throws(() => replicaServer.Execute("HCOLLECT", ["*"], CommandFlags.NoRedirect), + $"Expected exception was not thrown"); + + resp = (string)primaryServer.Execute("HCOLLECT", ["*"]); + ClassicAssert.AreEqual("OK", resp); + } + } + } + + [Test, Order(17)] + [Category("CLUSTER")] + [CancelAfter(30_000)] + public async Task ReplicasRestartAsReplicasAsync(CancellationToken cancellation) + { + var replica_count = 1;// Per primary + var primary_count = 1; + var nodes_count = primary_count + primary_count * replica_count; + ClassicAssert.IsTrue(primary_count > 0); + + context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: true, tryRecover: false, FastAofTruncate: true, CommitFrequencyMs: -1, clusterReplicationReestablishmentTimeout: 1); + context.CreateConnection(useTLS: true); + var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); + + var primary = (IPEndPoint)context.endpoints[0]; + var replica = (IPEndPoint)context.endpoints[1]; + + // Make sure role assignment is as expected + ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary).Value); + ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value); + + context.ShutdownNode(primary); + context.ShutdownNode(replica); + + // Intentionally leaving primary offline + context.RestartNode(replica); + + // Delay a bit for replication init tasks to fire off + await Task.Delay(100, cancellation).ConfigureAwait(false); + + // Make sure replica did not promote to Primary + ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value); + } + + [Test, Order(18)] + [Category("CLUSTER")] + [CancelAfter(30_000)] + [TestCase(ExceptionInjectionType.None, true)] + [TestCase(ExceptionInjectionType.None, false)] + [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, true)] + [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, false)] + [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, true)] + [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, false)] + public async Task PrimaryUnavailableRecoveryAsync(ExceptionInjectionType faultType, bool replicaFailoverBeforeShutdown, CancellationToken cancellation) + { + // Case we're testing is where a Primary _and_ it's Replica die, but only the Replica comes back. + // + // If configured correctly (for example, as a cache), we still want the Replica to come back up with some data - even if it's + // acknowledges writes to the Primary are dropped. + // + // We also sometimes inject faults into the Primaries and Replicas before the proper fault, to simulate a cluster + // in an unstable environment. + // + // Additionally we simulate both when we detect failures and intervene in time to promote Replicas to Primaries, + // and when we don't intervene until after everything dies and the Replicas come back. + +#if !DEBUG + Assert.Ignore($"Depends on {nameof(ExceptionInjectionHelper)}, which is disabled in non-Debug builds"); +#endif + + var replica_count = 1;// Per primary + var primary_count = 2; + var nodes_count = primary_count + primary_count * replica_count; + ClassicAssert.IsTrue(primary_count > 0); + + // Config lifted from a deployed product, be wary of changing these without discussion + context.CreateInstances( + nodes_count, + tryRecover: true, + disablePubSub: false, + disableObjects: false, + enableAOF: true, + AofMemorySize: "128m", + CommitFrequencyMs: -1, + aofSizeLimit: "256m", + compactionFrequencySecs: 30, + compactionType: LogCompactionType.Scan, + latencyMonitory: true, + metricsSamplingFrequency: 1, + loggingFrequencySecs: 10, + checkpointThrottleFlushDelayMs: 0, + FastCommit: true, + FastAofTruncate: true, + OnDemandCheckpoint: true, + useTLS: true, + enableLua: true, + luaMemoryMode: LuaMemoryManagementMode.Tracked, + luaTransactionMode: true, + luaMemoryLimit: "2M", + clusterReplicationReestablishmentTimeout: 1, + clusterReplicaResumeWithData: true + ); + context.CreateConnection(useTLS: true); + var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); + + shards = context.clusterTestUtils.ClusterShards(0, context.logger); + ClassicAssert.AreEqual(2, shards.Count); + + IPEndPoint primary1 = (IPEndPoint)context.endpoints[0]; + IPEndPoint primary2 = (IPEndPoint)context.endpoints[1]; + IPEndPoint replica1 = (IPEndPoint)context.endpoints[2]; + IPEndPoint replica2 = (IPEndPoint)context.endpoints[3]; + + // Make sure role assignment is as expected + ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary1).Value); + ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary2).Value); + ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica1).Value); + ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica2).Value); + + // Populate both shards + var writtenToPrimary1 = new ConcurrentDictionary(); + var writtenToPrimary2 = new ConcurrentDictionary(); + using (var writeTaskCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellation)) + { + var uniquePrefix = Guid.NewGuid(); + + var writeToPrimary1Task = + Task.Run( + async () => + { + var ix = -1; + + var p1 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary1))); + + while (!writeTaskCancel.IsCancellationRequested) + { + ix++; + + var key = $"pura-1-{uniquePrefix}-{ix}"; + + var slot = context.clusterTestUtils.HashSlot(key); + if (!p1.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2)) + { + continue; + } + + var value = Guid.NewGuid().ToString(); + try + { + var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]); + if (res == "OK") + { + writtenToPrimary1[key] = value; + } + + await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false); + } + catch + { + // Ignore, cancellation or throwing should both just be powered through + } + } + }, + cancellation + ); + + var writeToPrimary2Task = + Task.Run( + async () => + { + var ix = -1; + + var p2 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary2))); + + while (!writeTaskCancel.IsCancellationRequested) + { + ix++; + + var key = $"pura-2-{uniquePrefix}-{ix}"; + + var slot = context.clusterTestUtils.HashSlot(key); + if (!p2.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2)) + { + continue; + } + + var value = Guid.NewGuid().ToString(); + + try + { + var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]); + if (res == "OK") + { + writtenToPrimary2[key] = value; + } + + await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false); + } + catch + { + // Ignore, cancellation or throwing should both just be powered through + } + } + }, + cancellation + ); + + // Simulate out of band checkpointing + var checkpointTask = + Task.Run( + async () => + { + while (!writeTaskCancel.IsCancellationRequested) + { + foreach (var node in context.nodes) + { + try + { + _ = await node.Store.CommitAOFAsync(writeTaskCancel.Token).ConfigureAwait(false); + } + catch (TaskCanceledException) + { + // Cancel is fine + break; + } + catch + { + // Ignore everything else + } + } + + try + { + await Task.Delay(100, writeTaskCancel.Token).ConfigureAwait(false); + } + catch + { + continue; + } + } + }, + cancellation + ); + + // Wait for a bit, optionally injecting a fault + if (faultType == ExceptionInjectionType.None) + { + await Task.Delay(10_000, cancellation).ConfigureAwait(false); + } + else + { + var timer = Stopwatch.StartNew(); + + // Things start fine + await Task.Delay(1_000, cancellation).ConfigureAwait(false); + + // Wait for something to get replicated + var replica1Happened = false; + var replica2Happened = false; + do + { + if (!replica1Happened) + { + var readonlyRes = (string)context.clusterTestUtils.Execute(replica1, "READONLY", []); + ClassicAssert.AreEqual("OK", readonlyRes); + + foreach (var kv in writtenToPrimary1) + { + var val = (string)context.clusterTestUtils.Execute(replica1, "GET", [kv.Key]); + if (val == kv.Value) + { + replica1Happened = true; + break; + } + } + } + + if (!replica2Happened) + { + var readonlyRes = (string)context.clusterTestUtils.Execute(replica2, "READONLY", []); + ClassicAssert.AreEqual("OK", readonlyRes); + + foreach (var kv in writtenToPrimary2) + { + var val = (string)context.clusterTestUtils.Execute(replica2, "GET", [kv.Key]); + if (val == kv.Value) + { + replica2Happened = true; + break; + } + } + } + + await Task.Delay(100, cancellation).ConfigureAwait(false); + } while (!replica1Happened || !replica2Happened); + + // Things fail for a bit + ExceptionInjectionHelper.EnableException(faultType); + await Task.Delay(2_000, cancellation).ConfigureAwait(false); + + // Things recover + ExceptionInjectionHelper.DisableException(faultType); + + timer.Stop(); + + // Wait out the rest of the duration + if (timer.ElapsedMilliseconds < 10_000) + { + await Task.Delay((int)(10_000 - timer.ElapsedMilliseconds), cancellation).ConfigureAwait(false); + } + } + + // Stop writing + writeTaskCancel.Cancel(); + + // Wait for all our writes and checkpoints to spin down + await Task.WhenAll(writeToPrimary1Task, writeToPrimary2Task, checkpointTask).ConfigureAwait(false); + } + + // Shutdown all primaries + context.ShutdownNode(primary1); + context.ShutdownNode(primary2); + + // Sometimes we can intervene post-Primary crash but pre-Replica crash, simulate that + if (replicaFailoverBeforeShutdown) + { + await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false); + } + + // Shutdown the (old) replicas + context.ShutdownNode(replica1); + context.ShutdownNode(replica2); + + // Restart just the replicas + context.RestartNode(replica1); + context.RestartNode(replica2); + + // If we didn't promte pre-crash, promote now that Replicas came back + if (!replicaFailoverBeforeShutdown) + { + await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false); + } + + // Confirm that at least some of the data is available on each Replica + var onReplica1 = 0; + foreach (var (k, v) in writtenToPrimary1) + { + var res = (string)context.clusterTestUtils.Execute(replica1, "GET", [k]); + if (res is not null) + { + ClassicAssert.AreEqual(v, res); + onReplica1++; + } + } + + var onReplica2 = 0; + foreach (var (k, v) in writtenToPrimary2) + { + var res = (string)context.clusterTestUtils.Execute(replica2, "GET", [k]); + if (res is not null) + { + ClassicAssert.AreEqual(v, res); + onReplica2++; + } + } + + // Something, ANYTHING, made it + ClassicAssert.IsTrue(onReplica1 > 0, $"Nothing made it to replica 1, should have been up to {writtenToPrimary1.Count} values"); + ClassicAssert.IsTrue(onReplica2 > 0, $"Nothing made it to replica 2, should have been up to {writtenToPrimary2.Count} values"); + + static async Task UpgradeReplicasAsync(ClusterTestContext context, IPEndPoint replica1, IPEndPoint replica2, CancellationToken cancellation) + { + // Promote the replicas, if no primary is coming back + var takeOverRes1 = (string)context.clusterTestUtils.Execute(replica1, "CLUSTER", ["FAILOVER", "FORCE"]); + var takeOverRes2 = (string)context.clusterTestUtils.Execute(replica2, "CLUSTER", ["FAILOVER", "FORCE"]); + ClassicAssert.AreEqual("OK", takeOverRes1); + ClassicAssert.AreEqual("OK", takeOverRes2); + + // Wait for roles to update + while (true) + { + await Task.Delay(10, cancellation).ConfigureAwait(false); + + if (context.clusterTestUtils.RoleCommand(replica1).Value != "master") + { + continue; + } + + if (context.clusterTestUtils.RoleCommand(replica2).Value != "master") + { + continue; + } + + break; + } + } + } } } \ No newline at end of file diff --git a/test/Garnet.test.cluster/ClusterNegativeTests.cs b/test/Garnet.test.cluster/ClusterNegativeTests.cs index 645bdbfd0a6..30d5b8541ff 100644 --- a/test/Garnet.test.cluster/ClusterNegativeTests.cs +++ b/test/Garnet.test.cluster/ClusterNegativeTests.cs @@ -291,7 +291,7 @@ public void ClusterCheckpointAcquireTest([Values] bool fastAofTruncate, [Values] } } - [Test, Order(6), CancelAfter(testTimeout)] + [Test, Order(7), CancelAfter(testTimeout)] public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken) { var primaryIndex = 0; @@ -369,7 +369,7 @@ public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken } } - [Test, Order(6), CancelAfter(testTimeout)] + [Test, Order(8), CancelAfter(testTimeout)] public void ClusterFailedToAddAofSyncTask() { var primaryIndex = 0; @@ -419,7 +419,7 @@ public void ClusterFailedToAddAofSyncTask() context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaIndex); } - [Test, Order(6), CancelAfter(testTimeout)] + [Test, Order(9), CancelAfter(testTimeout)] public void ClusterReplicaSyncTimeoutTest() { var primaryIndex = 0; @@ -468,7 +468,7 @@ public void ClusterReplicaSyncTimeoutTest() } #endif - [Test, CancelAfter(60_000)] + [Test, Order(10), CancelAfter(60_000)] public async Task ClusterParallelFailoverOnDistinctShards(CancellationToken cancellationToken) { var nodes_count = 4; @@ -523,8 +523,7 @@ public async Task ClusterParallelFailoverOnDistinctShards(CancellationToken canc } } - - [Test, CancelAfter(60_000)] + [Test, Order(11), CancelAfter(60_000)] public void ClusterMeetFromReplica(CancellationToken cancellationToken) { var nodes_count = 3; @@ -559,5 +558,71 @@ public void ClusterMeetFromReplica(CancellationToken cancellationToken) Assert.That(nodes_count, Is.EqualTo(context.clusterTestUtils.ClusterNodes(i).Nodes.Count)); } } + + [Test, Order(12)] + [Category("REPLICATION")] + public void ClusterDontKnowReplicaFailTest([Values] bool useReplicaOf) + { + var replica_count = 1;// Per primary + var primary_count = 1; + var nodes_count = primary_count + (primary_count * replica_count); + ClassicAssert.IsTrue(primary_count > 0); + context.CreateInstances(nodes_count, disableObjects: true, FastAofTruncate: false, OnDemandCheckpoint: false, CommitFrequencyMs: -1, enableAOF: true, useTLS: false, asyncReplay: false); + context.CreateConnection(useTLS: false); + + var primaryNodeIndex = 0; + var replicaNodeIndex = 1; + ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger)); + context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, 1, logger: context.logger); + context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, 2, logger: context.logger); + + var primaryId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger); + string resp; + if (!useReplicaOf) + { + resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, failEx: false, logger: context.logger); + ClassicAssert.AreEqual($"ERR I don't know about node {primaryId}.", resp); + } + else + { + resp = context.clusterTestUtils.ReplicaOf(replicaNodeIndex, primaryNodeIndex, failEx: false, logger: context.logger); + ClassicAssert.AreEqual($"ERR I don't know about node {context.clusterTestUtils.GetEndPoint(primaryNodeIndex)}.", resp); + } + } + + [Test, Order(13)] + [Category("REPLICATION")] + public void ClusterReplicateFails() + { + const string UserName = "temp-user"; + const string Password = "temp-password"; + + const string ClusterUserName = "cluster-user"; + const string ClusterPassword = "cluster-password"; + + // Setup a cluster (mimicking the style in which this bug was first found) + ServerCredential clusterCreds = new(ClusterUserName, ClusterPassword, IsAdmin: true, UsedForClusterAuth: true, IsClearText: true); + ServerCredential userCreds = new(UserName, Password, IsAdmin: true, UsedForClusterAuth: false, IsClearText: true); + + context.GenerateCredentials([userCreds, clusterCreds]); + context.CreateInstances(2, disableObjects: true, disablePubSub: true, enableAOF: true, clusterCreds: clusterCreds, useAcl: true, FastAofTruncate: true, CommitFrequencyMs: -1, asyncReplay: false); + var primaryEndpoint = (IPEndPoint)context.endpoints.First(); + var replicaEndpoint = (IPEndPoint)context.endpoints.Last(); + + ClassicAssert.AreNotEqual(primaryEndpoint, replicaEndpoint, "Should have different endpoints for nodes"); + + using var primaryConnection = ConnectionMultiplexer.Connect($"{primaryEndpoint.Address}:{primaryEndpoint.Port},user={UserName},password={Password}"); + var primaryServer = primaryConnection.GetServer(primaryEndpoint); + + ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["ADDSLOTSRANGE", "0", "16383"], flags: CommandFlags.NoRedirect)); + ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["MEET", replicaEndpoint.Address.ToString(), replicaEndpoint.Port.ToString()], flags: CommandFlags.NoRedirect)); + + using var replicaConnection = ConnectionMultiplexer.Connect($"{replicaEndpoint.Address}:{replicaEndpoint.Port},user={UserName},password={Password}"); + var replicaServer = replicaConnection.GetServer(replicaEndpoint); + + // Try to replicate from a server that doesn't exist + var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect)); + ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); + } } } \ No newline at end of file diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs index e910f9f3776..cab5017862c 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationAsyncReplay.cs @@ -6,6 +6,7 @@ namespace Garnet.test.cluster { [NonParallelizable] + [Ignore("Skip to reduce CI duration.")] public class ClusterReplicationAsyncReplay : ClusterReplicationBaseTests { [SetUp] diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs index 8c6a940c0be..ed283d142ec 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs @@ -2,9 +2,7 @@ // Licensed under the MIT license. using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Diagnostics; using System.IO; using System.Linq; using System.Net; @@ -814,59 +812,6 @@ public void ClusterMainMemoryReplicationAttachReplicas() context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, 2); } - [Test, Order(15)] - [Category("REPLICATION")] - public void ClusterDontKnowReplicaFailTest([Values] bool performRMW, [Values] bool MainMemoryReplication, [Values] bool onDemandCheckpoint, [Values] bool useReplicaOf) - { - var replica_count = 1;// Per primary - var primary_count = 1; - var nodes_count = primary_count + (primary_count * replica_count); - ClassicAssert.IsTrue(primary_count > 0); - context.CreateInstances(nodes_count, disableObjects: true, FastAofTruncate: MainMemoryReplication, OnDemandCheckpoint: onDemandCheckpoint, CommitFrequencyMs: -1, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay); - context.CreateConnection(useTLS: useTLS); - - var primaryNodeIndex = 0; - var replicaNodeIndex = 1; - ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, new List<(int, int)>() { (0, 16383) }, true, context.logger)); - context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, 1, logger: context.logger); - context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, 2, logger: context.logger); - context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); - context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger); - - var replicaId = context.clusterTestUtils.ClusterMyId(replicaNodeIndex, logger: context.logger); - _ = context.clusterTestUtils.ClusterForget(primaryNodeIndex, replicaId, 5, logger: context.logger); - - var primaryId = context.clusterTestUtils.ClusterMyId(primaryNodeIndex, logger: context.logger); - string resp; - if (!useReplicaOf) - resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, failEx: false, logger: context.logger); - else - resp = context.clusterTestUtils.ReplicaOf(replicaNodeIndex, primaryNodeIndex, failEx: false, logger: context.logger); - ClassicAssert.IsTrue(resp.StartsWith("PRIMARY-ERR")); - - while (true) - { - context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); - context.clusterTestUtils.BumpEpoch(replicaNodeIndex, logger: context.logger); - var config = context.clusterTestUtils.ClusterNodes(primaryNodeIndex, logger: context.logger); - if (config.Nodes.Count == 2) break; - ClusterTestUtils.BackOff(cancellationToken: context.cts.Token); - } - - _ = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryId, logger: context.logger); - - context.kvPairs = []; - var keyLength = 32; - var kvpairCount = keyCount; - var addCount = 5; - if (!performRMW) - context.PopulatePrimary(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex); - else - context.PopulatePrimaryRMW(ref context.kvPairs, keyLength, kvpairCount, primaryNodeIndex, addCount); - - context.ValidateKVCollectionAgainstReplica(ref context.kvPairs, replicaNodeIndex); - } - [Test, Order(16)] [Category("REPLICATION")] public void ClusterDivergentReplicasTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool ckptBeforeDivergence) @@ -1042,41 +987,6 @@ void ClusterDivergentReplicasTest(bool performRMW, bool disableObjects, bool ckp context.ValidateNodeObjects(ref context.kvPairsObj, replicaIndex: newPrimaryIndex, set: set); } - [Test, Order(21)] - [Category("REPLICATION")] - public void ClusterReplicateFails() - { - const string UserName = "temp-user"; - const string Password = "temp-password"; - - const string ClusterUserName = "cluster-user"; - const string ClusterPassword = "cluster-password"; - - // Setup a cluster (mimicking the style in which this bug was first found) - ServerCredential clusterCreds = new(ClusterUserName, ClusterPassword, IsAdmin: true, UsedForClusterAuth: true, IsClearText: true); - ServerCredential userCreds = new(UserName, Password, IsAdmin: true, UsedForClusterAuth: false, IsClearText: true); - - context.GenerateCredentials([userCreds, clusterCreds]); - context.CreateInstances(2, disableObjects: true, disablePubSub: true, enableAOF: true, clusterCreds: clusterCreds, useAcl: true, FastAofTruncate: true, CommitFrequencyMs: -1, asyncReplay: asyncReplay); - var primaryEndpoint = (IPEndPoint)context.endpoints.First(); - var replicaEndpoint = (IPEndPoint)context.endpoints.Last(); - - ClassicAssert.AreNotEqual(primaryEndpoint, replicaEndpoint, "Should have different endpoints for nodes"); - - using var primaryConnection = ConnectionMultiplexer.Connect($"{primaryEndpoint.Address}:{primaryEndpoint.Port},user={UserName},password={Password}"); - var primaryServer = primaryConnection.GetServer(primaryEndpoint); - - ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["ADDSLOTSRANGE", "0", "16383"], flags: CommandFlags.NoRedirect)); - ClassicAssert.AreEqual("OK", (string)primaryServer.Execute("CLUSTER", ["MEET", replicaEndpoint.Address.ToString(), replicaEndpoint.Port.ToString()], flags: CommandFlags.NoRedirect)); - - using var replicaConnection = ConnectionMultiplexer.Connect($"{replicaEndpoint.Address}:{replicaEndpoint.Port},user={UserName},password={Password}"); - var replicaServer = replicaConnection.GetServer(replicaEndpoint); - - // Try to replicate from a server that doesn't exist - var exc = Assert.Throws(() => replicaServer.Execute("CLUSTER", ["REPLICATE", Guid.NewGuid().ToString()], flags: CommandFlags.NoRedirect)); - ClassicAssert.IsTrue(exc.Message.StartsWith("ERR I don't know about node ")); - } - [Test, Order(22)] [Category("REPLICATION")] public void ClusterReplicationCheckpointAlignmentTest([Values] bool performRMW) @@ -1519,394 +1429,6 @@ void RestartRecover(int iteration) } } - [Test, Order(27)] - [Category("CLUSTER")] - [CancelAfter(30_000)] - public async Task ReplicasRestartAsReplicasAsync(CancellationToken cancellation) - { - var replica_count = 1;// Per primary - var primary_count = 1; - var nodes_count = primary_count + primary_count * replica_count; - ClassicAssert.IsTrue(primary_count > 0); - - context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: true, tryRecover: false, FastAofTruncate: true, CommitFrequencyMs: -1, clusterReplicationReestablishmentTimeout: 1); - context.CreateConnection(useTLS: true); - var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); - - IPEndPoint primary = (IPEndPoint)context.endpoints[0]; - IPEndPoint replica = (IPEndPoint)context.endpoints[1]; - - // Make sure role assignment is as expected - ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary).Value); - ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value); - - context.ShutdownNode(primary); - context.ShutdownNode(replica); - - // Intentionally leaving primary offline - context.RestartNode(replica); - - // Delay a bit for replication init tasks to fire off - await Task.Delay(100, cancellation).ConfigureAwait(false); - - // Make sure replica did not promote to Primary - ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica).Value); - } - - [Test, Order(28)] - [Category("CLUSTER")] - [CancelAfter(30_000)] - [TestCase(ExceptionInjectionType.None, true)] - [TestCase(ExceptionInjectionType.None, false)] - [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, true)] - [TestCase(ExceptionInjectionType.Divergent_AOF_Stream, false)] - [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, true)] - [TestCase(ExceptionInjectionType.Aof_Sync_Task_Consume, false)] - public async Task PrimaryUnavailableRecoveryAsync(ExceptionInjectionType faultType, bool replicaFailoverBeforeShutdown, CancellationToken cancellation) - { - // Case we're testing is where a Primary _and_ it's Replica die, but only the Replica comes back. - // - // If configured correctly (for example, as a cache), we still want the Replica to come back up with some data - even if it's - // acknowledges writes to the Primary are dropped. - // - // We also sometimes inject faults into the Primaries and Replicas before the proper fault, to simulate a cluster - // in an unstable environment. - // - // Additionally we simulate both when we detect failures and intervene in time to promote Replicas to Primaries, - // and when we don't intervene until after everything dies and the Replicas come back. - -#if !DEBUG - Assert.Ignore($"Depends on {nameof(ExceptionInjectionHelper)}, which is disabled in non-Debug builds"); -#endif - - var replica_count = 1;// Per primary - var primary_count = 2; - var nodes_count = primary_count + primary_count * replica_count; - ClassicAssert.IsTrue(primary_count > 0); - - // Config lifted from a deployed product, be wary of changing these without discussion - context.CreateInstances( - nodes_count, - tryRecover: true, - disablePubSub: false, - disableObjects: false, - enableAOF: true, - AofMemorySize: "128m", - CommitFrequencyMs: -1, - aofSizeLimit: "256m", - compactionFrequencySecs: 30, - compactionType: LogCompactionType.Scan, - latencyMonitory: true, - metricsSamplingFrequency: 1, - loggingFrequencySecs: 10, - checkpointThrottleFlushDelayMs: 0, - FastCommit: true, - FastAofTruncate: true, - OnDemandCheckpoint: true, - useTLS: true, - enableLua: true, - luaMemoryMode: LuaMemoryManagementMode.Tracked, - luaTransactionMode: true, - luaMemoryLimit: "2M", - clusterReplicationReestablishmentTimeout: 1, - clusterReplicaResumeWithData: true - ); - context.CreateConnection(useTLS: true); - var (shards, _) = context.clusterTestUtils.SimpleSetupCluster(primary_count, replica_count, logger: context.logger); - - shards = context.clusterTestUtils.ClusterShards(0, context.logger); - ClassicAssert.AreEqual(2, shards.Count); - - IPEndPoint primary1 = (IPEndPoint)context.endpoints[0]; - IPEndPoint primary2 = (IPEndPoint)context.endpoints[1]; - IPEndPoint replica1 = (IPEndPoint)context.endpoints[2]; - IPEndPoint replica2 = (IPEndPoint)context.endpoints[3]; - - // Make sure role assignment is as expected - ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary1).Value); - ClassicAssert.AreEqual("master", context.clusterTestUtils.RoleCommand(primary2).Value); - ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica1).Value); - ClassicAssert.AreEqual("slave", context.clusterTestUtils.RoleCommand(replica2).Value); - - // Populate both shards - var writtenToPrimary1 = new ConcurrentDictionary(); - var writtenToPrimary2 = new ConcurrentDictionary(); - using (var writeTaskCancel = CancellationTokenSource.CreateLinkedTokenSource(cancellation)) - { - var uniquePrefix = Guid.NewGuid(); - - var writeToPrimary1Task = - Task.Run( - async () => - { - var ix = -1; - - var p1 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary1))); - - while (!writeTaskCancel.IsCancellationRequested) - { - ix++; - - var key = $"pura-1-{uniquePrefix}-{ix}"; - - var slot = context.clusterTestUtils.HashSlot(key); - if (!p1.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2)) - { - continue; - } - - var value = Guid.NewGuid().ToString(); - try - { - var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]); - if (res == "OK") - { - writtenToPrimary1[key] = value; - } - - await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false); - } - catch - { - // Ignore, cancellation or throwing should both just be powered through - } - } - }, - cancellation - ); - - var writeToPrimary2Task = - Task.Run( - async () => - { - var ix = -1; - - var p2 = shards.Single(s => s.nodes.Any(n => n.nodeIndex == context.endpoints.IndexOf(primary2))); - - while (!writeTaskCancel.IsCancellationRequested) - { - ix++; - - var key = $"pura-2-{uniquePrefix}-{ix}"; - - var slot = context.clusterTestUtils.HashSlot(key); - if (!p2.slotRanges.Any(x => slot >= x.Item1 && slot <= x.Item2)) - { - continue; - } - - var value = Guid.NewGuid().ToString(); - - try - { - var res = (string)context.clusterTestUtils.Execute(primary1, "SET", [key, value]); - if (res == "OK") - { - writtenToPrimary2[key] = value; - } - - await Task.Delay(10, writeTaskCancel.Token).ConfigureAwait(false); - } - catch - { - // Ignore, cancellation or throwing should both just be powered through - } - } - }, - cancellation - ); - - // Simulate out of band checkpointing - var checkpointTask = - Task.Run( - async () => - { - while (!writeTaskCancel.IsCancellationRequested) - { - foreach (var node in context.nodes) - { - try - { - _ = await node.Store.CommitAOFAsync(writeTaskCancel.Token).ConfigureAwait(false); - } - catch (TaskCanceledException) - { - // Cancel is fine - break; - } - catch - { - // Ignore everything else - } - } - - try - { - await Task.Delay(100, writeTaskCancel.Token).ConfigureAwait(false); - } - catch - { - continue; - } - } - }, - cancellation - ); - - // Wait for a bit, optionally injecting a fault - if (faultType == ExceptionInjectionType.None) - { - await Task.Delay(10_000, cancellation).ConfigureAwait(false); - } - else - { - var timer = Stopwatch.StartNew(); - - // Things start fine - await Task.Delay(1_000, cancellation).ConfigureAwait(false); - - // Wait for something to get replicated - var replica1Happened = false; - var replica2Happened = false; - do - { - if (!replica1Happened) - { - var readonlyRes = (string)context.clusterTestUtils.Execute(replica1, "READONLY", []); - ClassicAssert.AreEqual("OK", readonlyRes); - - foreach (var kv in writtenToPrimary1) - { - var val = (string)context.clusterTestUtils.Execute(replica1, "GET", [kv.Key]); - if (val == kv.Value) - { - replica1Happened = true; - break; - } - } - } - - if (!replica2Happened) - { - var readonlyRes = (string)context.clusterTestUtils.Execute(replica2, "READONLY", []); - ClassicAssert.AreEqual("OK", readonlyRes); - - foreach (var kv in writtenToPrimary2) - { - var val = (string)context.clusterTestUtils.Execute(replica2, "GET", [kv.Key]); - if (val == kv.Value) - { - replica2Happened = true; - break; - } - } - } - - await Task.Delay(100, cancellation).ConfigureAwait(false); - } while (!replica1Happened || !replica2Happened); - - // Things fail for a bit - ExceptionInjectionHelper.EnableException(faultType); - await Task.Delay(2_000, cancellation).ConfigureAwait(false); - - // Things recover - ExceptionInjectionHelper.DisableException(faultType); - - timer.Stop(); - - // Wait out the rest of the duration - if (timer.ElapsedMilliseconds < 10_000) - { - await Task.Delay((int)(10_000 - timer.ElapsedMilliseconds), cancellation).ConfigureAwait(false); - } - } - - // Stop writing - writeTaskCancel.Cancel(); - - // Wait for all our writes and checkpoints to spin down - await Task.WhenAll(writeToPrimary1Task, writeToPrimary2Task, checkpointTask).ConfigureAwait(false); - } - - // Shutdown all primaries - context.ShutdownNode(primary1); - context.ShutdownNode(primary2); - - // Sometimes we can intervene post-Primary crash but pre-Replica crash, simulate that - if (replicaFailoverBeforeShutdown) - { - await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false); - } - - // Shutdown the (old) replicas - context.ShutdownNode(replica1); - context.ShutdownNode(replica2); - - // Restart just the replicas - context.RestartNode(replica1); - context.RestartNode(replica2); - - // If we didn't promte pre-crash, promote now that Replicas came back - if (!replicaFailoverBeforeShutdown) - { - await UpgradeReplicasAsync(context, replica1, replica2, cancellation).ConfigureAwait(false); - } - - // Confirm that at least some of the data is available on each Replica - var onReplica1 = 0; - foreach (var (k, v) in writtenToPrimary1) - { - var res = (string)context.clusterTestUtils.Execute(replica1, "GET", [k]); - if (res is not null) - { - ClassicAssert.AreEqual(v, res); - onReplica1++; - } - } - - var onReplica2 = 0; - foreach (var (k, v) in writtenToPrimary2) - { - var res = (string)context.clusterTestUtils.Execute(replica2, "GET", [k]); - if (res is not null) - { - ClassicAssert.AreEqual(v, res); - onReplica2++; - } - } - - // Something, ANYTHING, made it - ClassicAssert.IsTrue(onReplica1 > 0, $"Nothing made it to replica 1, should have been up to {writtenToPrimary1.Count} values"); - ClassicAssert.IsTrue(onReplica2 > 0, $"Nothing made it to replica 2, should have been up to {writtenToPrimary2.Count} values"); - - static async Task UpgradeReplicasAsync(ClusterTestContext context, IPEndPoint replica1, IPEndPoint replica2, CancellationToken cancellation) - { - // Promote the replicas, if no primary is coming back - var takeOverRes1 = (string)context.clusterTestUtils.Execute(replica1, "CLUSTER", ["FAILOVER", "FORCE"]); - var takeOverRes2 = (string)context.clusterTestUtils.Execute(replica2, "CLUSTER", ["FAILOVER", "FORCE"]); - ClassicAssert.AreEqual("OK", takeOverRes1); - ClassicAssert.AreEqual("OK", takeOverRes2); - - // Wait for roles to update - while (true) - { - await Task.Delay(10, cancellation).ConfigureAwait(false); - - if (context.clusterTestUtils.RoleCommand(replica1).Value != "master") - { - continue; - } - - if (context.clusterTestUtils.RoleCommand(replica2).Value != "master") - { - continue; - } - - break; - } - } - } - [Test, Order(27)] [Category("CLUSTER")] [Category("REPLICATION")] @@ -2075,101 +1597,5 @@ public void ClusterReplicationSimpleTransactionTest([Values] bool storedProcedur var replicaPInfo = context.clusterTestUtils.GetPersistenceInfo(replicaNodeIndex, context.logger); ClassicAssert.AreEqual(primaryPInfo.TailAddress, replicaPInfo.TailAddress); } - - [Test, Order(29)] - [Category("REPLICATION")] - [CancelAfter(30_000)] - [TestCase(false)] - [TestCase(true)] - public async Task ClusterReplicationObjectCollectTest(bool useManualCollect, CancellationToken cancellationToken) - { - var replica_count = 1;// Per primary - var primary_count = 1; - var nodes_count = primary_count + (primary_count * replica_count); - var primaryNodeIndex = 0; - var replicaNodeIndex = 1; - - context.CreateInstances(nodes_count, disableObjects: false, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay, expiredObjectCollectionFrequencySecs: !useManualCollect ? 100 : 0); - context.CreateConnection(useTLS: useTLS); - - var primaryServer = context.clusterTestUtils.GetServer(primaryNodeIndex); - var replicaServer = context.clusterTestUtils.GetServer(replicaNodeIndex); - - // Setup cluster - context.clusterTestUtils.AddDelSlotsRange(primaryNodeIndex, [(0, 16383)], addslot: true, logger: context.logger); - context.clusterTestUtils.SetConfigEpoch(primaryNodeIndex, primaryNodeIndex + 1, logger: context.logger); - context.clusterTestUtils.SetConfigEpoch(replicaNodeIndex, replicaNodeIndex + 1, logger: context.logger); - context.clusterTestUtils.Meet(primaryNodeIndex, replicaNodeIndex, logger: context.logger); - context.clusterTestUtils.WaitUntilNodeIsKnown(primaryNodeIndex, replicaNodeIndex, logger: context.logger); - context.clusterTestUtils.WaitUntilNodeIsKnown(replicaNodeIndex, primaryNodeIndex, logger: context.logger); - - var db = context.clusterTestUtils.GetDatabase(); - HashEntry[] elements = [new HashEntry("field1", "hello"), new HashEntry("field2", "world"), new HashEntry("field3", "value3"), new HashEntry("field4", "value4"), new HashEntry("field5", "value5"), new HashEntry("field6", "value6")]; - // Attach replica - var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex, primaryNodeIndex, logger: context.logger); - ClassicAssert.AreEqual("OK", resp); - - // Execute first hash set workload - var hashSetKey = "myhash"; - ExecuteHashSet(hashSetKey, elements); - await Task.Delay(3000).ConfigureAwait(false); - ManualCollect(); - ValidateHashSet(hashSetKey); - - // Execute second hash set workload - hashSetKey = "myhash2"; - ExecuteHashSet(hashSetKey, elements); - await Task.Delay(3000).ConfigureAwait(false); - ManualCollect(); - ValidateHashSet(hashSetKey); - - void ExecuteHashSet(string key, HashEntry[] elements) - { - db.HashSet(key, elements); - - var result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field1", "field2"); - var results = (RedisResult[])result; - ClassicAssert.AreEqual(2, results.Length); - ClassicAssert.AreEqual(1, (long)results[0]); - ClassicAssert.AreEqual(1, (long)results[1]); - - result = primaryServer.Execute("HPEXPIRE", key, "500", "FIELDS", "2", "field3", "field4"); - results = (RedisResult[])result; - ClassicAssert.AreEqual(2, results.Length); - ClassicAssert.AreEqual(1, (long)results[0]); - ClassicAssert.AreEqual(1, (long)results[1]); - } - - void ValidateHashSet(string key) - { - // Check expected result at primary - var expectedFieldsAndValues = elements.AsSpan().Slice(4).ToArray() - .SelectMany(e => new[] { e.Name.ToString(), e.Value.ToString() }) - .ToArray(); - var fields = (string[])primaryServer.Execute("HGETALL", [key]); - ClassicAssert.AreEqual(4, fields.Length); - ClassicAssert.AreEqual(expectedFieldsAndValues, fields); - - // Wait to ensure sync - context.clusterTestUtils.WaitForReplicaAofSync(primaryNodeIndex, replicaNodeIndex, context.logger, cancellationToken); - - // Check if replica is caught up - fields = (string[])replicaServer.Execute("HGETALL", [key]); - ClassicAssert.AreEqual(4, fields.Length); - ClassicAssert.AreEqual(expectedFieldsAndValues, fields); - } - - void ManualCollect() - { - if (useManualCollect) - { - Assert.Throws(() => replicaServer.Execute("HCOLLECT", ["*"], CommandFlags.NoRedirect), - $"Expected exception was not thrown"); - - resp = (string)primaryServer.Execute("HCOLLECT", ["*"]); - ClassicAssert.AreEqual("OK", resp); - } - } - } } } \ No newline at end of file From 774992523b7ca65957798f81b65b010cc231dd85 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 19 Mar 2026 19:56:59 -0700 Subject: [PATCH 5/8] separate dispose from close and configure socket to allow rapid connect --- .../Embedded/GarnetServerEmbedded.cs | 5 +++ libs/host/GarnetServer.cs | 7 ++++ libs/server/Servers/GarnetServerBase.cs | 3 ++ libs/server/Servers/GarnetServerTcp.cs | 33 ++++++++++++++++--- libs/server/Servers/IGarnetServer.cs | 6 ++++ 5 files changed, 49 insertions(+), 5 deletions(-) diff --git a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs index 63e8ce8e50d..f509cdf5bc3 100644 --- a/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs +++ b/benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs @@ -110,6 +110,11 @@ public override void Start() { } + /// + public override void Close() + { + } + public bool TryCreateMessageConsumer(Span bytes, INetworkSender networkSender, out IMessageConsumer session) { session = null; diff --git a/libs/host/GarnetServer.cs b/libs/host/GarnetServer.cs index 6dbbad160b0..e35e97fae42 100644 --- a/libs/host/GarnetServer.cs +++ b/libs/host/GarnetServer.cs @@ -478,7 +478,14 @@ public void Dispose(bool deleteDir = true) private void InternalDispose() { + // Phase 1: Stop listening on all servers to free ports immediately. + for (var i = 0; i < servers.Length; i++) + servers[i]?.Close(); + + // Phase 2: Dispose the provider (storage engine shutdown — may take time). Provider?.Dispose(); + + // Phase 3: Drain active handlers and clean up remaining resources. for (var i = 0; i < servers.Length; i++) servers[i]?.Dispose(); subscribeBroker?.Dispose(); diff --git a/libs/server/Servers/GarnetServerBase.cs b/libs/server/Servers/GarnetServerBase.cs index 5bfd1ff62ff..b326c9f3d30 100644 --- a/libs/server/Servers/GarnetServerBase.cs +++ b/libs/server/Servers/GarnetServerBase.cs @@ -154,6 +154,9 @@ public bool AddSession(WireFormat protocol, ref ISessionProvider provider, INetw /// public abstract void Start(); + /// + public abstract void Close(); + /// public virtual void Dispose() { diff --git a/libs/server/Servers/GarnetServerTcp.cs b/libs/server/Servers/GarnetServerTcp.cs index c681e09befa..48bbe61cfa7 100644 --- a/libs/server/Servers/GarnetServerTcp.cs +++ b/libs/server/Servers/GarnetServerTcp.cs @@ -82,24 +82,47 @@ public GarnetServerTcp( this.unixSocketPath = unixSocketPath; this.unixSocketPermission = unixSocketPermission; - listenSocket = endpoint switch + if (endpoint is UnixDomainSocketEndPoint unix) { - UnixDomainSocketEndPoint unix => new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified), + // UDS Initialization & Cleanup + listenSocket = new Socket(unix.AddressFamily, SocketType.Stream, ProtocolType.Unspecified); + var socketPath = unix.ToString(); + if (File.Exists(socketPath)) + { + File.Delete(socketPath); + } + } + else + { + // TCP Initialization & Port Reuse + listenSocket = new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - _ => new Socket(endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp) - }; + // Set reuse BEFORE Bind to handle TIME_WAIT states + listenSocket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true); + } acceptEventArg = new SocketAsyncEventArgs(); acceptEventArg.Completed += AcceptEventArg_Completed; } + /// + /// Stop listening for new connections. Frees the listening port + /// without waiting for active connections to drain. + /// + public override void Close() + { + listenSocket.Close(); + } + /// /// Dispose /// public override void Dispose() { - base.Dispose(); + // Close listening socket to free the port and stop accepting new connections. + // This also prevents new connections from arriving while DisposeActiveHandlers drains existing ones. listenSocket.Dispose(); + base.Dispose(); acceptEventArg.UserToken = null; acceptEventArg.Dispose(); networkPool?.Dispose(); diff --git a/libs/server/Servers/IGarnetServer.cs b/libs/server/Servers/IGarnetServer.cs index 9e197451627..2b33e77648d 100644 --- a/libs/server/Servers/IGarnetServer.cs +++ b/libs/server/Servers/IGarnetServer.cs @@ -46,5 +46,11 @@ public interface IGarnetServer : IDisposable /// Start server /// public void Start(); + + /// + /// Stop listening for new connections. Frees the listening port + /// without waiting for active connections to drain. + /// + public void Close(); } } \ No newline at end of file From 7220dd341b80ed9fd11a291e0abd1d41dc89d967 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 25 Mar 2026 12:27:27 -0700 Subject: [PATCH 6/8] ensure socket is disposed succesfully --- .../Garnet.test.cluster/ClusterTestContext.cs | 70 +++++++++++++------ 1 file changed, 48 insertions(+), 22 deletions(-) diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index b35b15dbad8..38b1dee6385 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -14,6 +14,7 @@ using Garnet.server.Auth.Settings; using Microsoft.Extensions.Logging; using NUnit.Framework; +using NUnit.Framework.Interfaces; using NUnit.Framework.Legacy; using StackExchange.Redis; using Tsavorite.core; @@ -117,42 +118,73 @@ public void RestartNode(int nodeIndex) nodes[nodeIndex].Start(); } + public void TearDown() { + // Capture test outcome before any teardown work to distinguish + // primary teardown failures from secondary ones caused by a hung/failed test. + var testOutcome = TestContext.CurrentContext.Result.Outcome; + var testAlreadyFailed = testOutcome.Status == TestStatus.Failed; + + if (testAlreadyFailed) + { + logger?.LogError( + "TearDown: test already failed ({label}): {message}", + testOutcome.Label, + TestContext.CurrentContext.Result.Message); + } + cts.Cancel(); cts.Dispose(); - logger.LogDebug("0. Dispose <<<<<<<<<<<"); waiter?.Dispose(); clusterTestUtils?.Dispose(); - var timeoutSeconds = 5; - var failMessage = ""; + var timeoutSeconds = 60; + string failureReason = null; - if (!Task.Run(() => DisposeCluster()).Wait(TimeSpan.FromSeconds(timeoutSeconds))) + // Phase 1: Dispose cluster nodes (may timeout if handlers are stuck) + try { - logger?.LogError("Timed out waiting for DisposeCluster"); - failMessage += "Timed out waiting for DisposeCluster; "; + if (!Task.Run(() => DisposeCluster()).Wait(TimeSpan.FromSeconds(timeoutSeconds))) + { + failureReason = "Timed out waiting for DisposeCluster"; + logger?.LogError("Timed out waiting for DisposeCluster"); + } } - // Dispose logger factory only after servers are disposed - loggerFactory?.Dispose(); - if (!Task.Run(() => TestUtils.DeleteDirectory(TestFolder, true)).Wait(TimeSpan.FromSeconds(timeoutSeconds))) + catch (Exception ex) { - logger?.LogError("Timed out DeleteDirectory"); - failMessage += "Timed out DeleteDirectory; "; + failureReason = $"DisposeCluster threw: {ex.Message}"; + logger?.LogError(ex, "DisposeCluster failed"); } + // Phase 2: Dispose logger factory (always, even after timeout) + loggerFactory?.Dispose(); + + // Phase 3: Delete test directory (may timeout if files locked from Phase 1 timeout) try { - TestUtils.OnTearDown(); + if (!Task.Run(() => TestUtils.DeleteDirectory(TestFolder, true)).Wait(TimeSpan.FromSeconds(timeoutSeconds))) + { + failureReason ??= "Timed out DeleteDirectory"; + logger?.LogError("Timed out DeleteDirectory"); + } } - catch (AssertionException e) + catch (Exception ex) { - failMessage += e.Message; + failureReason ??= $"DeleteDirectory threw: {ex.Message}"; + logger?.LogError(ex, "DeleteDirectory failed"); } - if (failMessage != "") + // Phase 4: Always runs — resets LightEpoch instances to prevent cross-test contamination + TestUtils.OnTearDown(); + + // Fail the test at the end, after all cleanup is done + if (failureReason != null) { - ClassicAssert.Fail(failMessage); + var context = testAlreadyFailed + ? $" (secondary failure — test already failed with '{testOutcome.Label}')" + : " (primary failure — test itself passed)"; + Assert.Fail(failureReason + context); } } @@ -726,9 +758,6 @@ public void ClusterFailoverSpinWait(int replicaNodeIndex, ILogger logger) public void AttachAndWaitForSync(int primary_count, int replica_count, bool disableObjects) { var primaryId = clusterTestUtils.GetNodeIdFromNode(0, logger); - // Issue meet to replicas - for (var i = primary_count; i < primary_count + replica_count; i++) - clusterTestUtils.Meet(i, 0); // Wait until primary node is known so as not to fail replicate for (var i = primary_count; i < primary_count + replica_count; i++) @@ -736,10 +765,7 @@ public void AttachAndWaitForSync(int primary_count, int replica_count, bool disa // Issue cluster replicate and bump epoch manually to capture config. for (var i = primary_count; i < primary_count + replica_count; i++) - { _ = clusterTestUtils.ClusterReplicate(i, primaryId, async: true, logger: logger); - clusterTestUtils.BumpEpoch(i, logger: logger); - } if (!checkpointTask.Wait(TimeSpan.FromSeconds(100))) Assert.Fail("Checkpoint task timeout"); From 35891c1d13e48affe9a8727f090d3e7cbdf8f845 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Wed, 25 Mar 2026 15:01:46 -0700 Subject: [PATCH 7/8] fix failing test --- .../ClusterNegativeTests.cs | 2 ++ test/Garnet.test.cluster/ClusterTestContext.cs | 14 ++++++++++++++ .../ClusterReplicationBaseTests.cs | 18 +++++++++++++----- 3 files changed, 29 insertions(+), 5 deletions(-) diff --git a/test/Garnet.test.cluster/ClusterNegativeTests.cs b/test/Garnet.test.cluster/ClusterNegativeTests.cs index 30d5b8541ff..88c740db833 100644 --- a/test/Garnet.test.cluster/ClusterNegativeTests.cs +++ b/test/Garnet.test.cluster/ClusterNegativeTests.cs @@ -312,6 +312,8 @@ public void ClusterReplicaAttachIntenseWrite(CancellationToken cancellationToken context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger); context.clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger: context.logger); context.clusterTestUtils.Meet(primaryIndex, replicaIndex, logger: context.logger); + context.clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex, logger: context.logger); + context.clusterTestUtils.WaitUntilNodeIsKnown(replicaIndex, primaryIndex, logger: context.logger); var keyLength = 32; var kvpairCount = 32; diff --git a/test/Garnet.test.cluster/ClusterTestContext.cs b/test/Garnet.test.cluster/ClusterTestContext.cs index 38b1dee6385..449799d5444 100644 --- a/test/Garnet.test.cluster/ClusterTestContext.cs +++ b/test/Garnet.test.cluster/ClusterTestContext.cs @@ -548,6 +548,20 @@ public void PopulatePrimary( } } + public void SimplePrimaryReplicaSetup() + { + var primaryIndex = 0; + var replicaIndex = 1; + // Setup cluster + var resp = clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], true, logger); + ClassicAssert.AreEqual("OK", resp); + clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger); + clusterTestUtils.SetConfigEpoch(replicaIndex, replicaIndex + 1, logger); + clusterTestUtils.Meet(primaryIndex, replicaIndex, logger); + clusterTestUtils.WaitUntilNodeIsKnown(primaryIndex, replicaIndex); + clusterTestUtils.WaitUntilNodeIsKnown(replicaIndex, primaryIndex); + } + public void SimplePopulateDB(bool disableObjects, int keyLength, int kvpairCount, int primaryIndex, int addCount = 0, bool performRMW = false) { //Populate Primary diff --git a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs index ed283d142ec..e460b557e91 100644 --- a/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs +++ b/test/Garnet.test.cluster/ReplicationTests/ClusterReplicationBaseTests.cs @@ -3,6 +3,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.IO; using System.Linq; using System.Net; @@ -732,24 +733,31 @@ public void ClusterFailoverAttachReplicas([Values] bool performRMW, [Values] boo [Test, Order(13)] [Category("REPLICATION")] + //[Repeat(20)] public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [Values] bool disableObjects, [Values] bool enableIncrementalSnapshots) { + if (TestContext.CurrentContext.CurrentRepeatCount > 0) + Debug.WriteLine($"*** Current test iteration: {TestContext.CurrentContext.CurrentRepeatCount + 1}, name = {TestContext.CurrentContext.Test.Name} ***"); + + var primaryIndex = 0; + var replicaIndex = 1; var replica_count = 1;//Per primary var primary_count = 1; var nodes_count = primary_count + (primary_count * replica_count); ClassicAssert.IsTrue(primary_count > 0); context.CreateInstances(nodes_count, tryRecover: true, disableObjects: disableObjects, lowMemory: true, segmentSize: "4k", EnableIncrementalSnapshots: enableIncrementalSnapshots, enableAOF: true, useTLS: useTLS, asyncReplay: asyncReplay, deviceType: Tsavorite.core.DeviceType.Native); context.CreateConnection(useTLS: useTLS); - ClassicAssert.AreEqual("OK", context.clusterTestUtils.AddDelSlotsRange(0, [(0, 16383)], true, context.logger)); - context.clusterTestUtils.BumpEpoch(0, logger: context.logger); - var cconfig = context.clusterTestUtils.ClusterNodes(0, context.logger); + // Setup cluster + context.SimplePrimaryReplicaSetup(); + + var cconfig = context.clusterTestUtils.ClusterNodes(primaryIndex, context.logger); var myself = cconfig.Nodes.First(); var slotRangesStr = string.Join(",", myself.Slots.Select(x => $"({x.From}-{x.To})").ToList()); ClassicAssert.AreEqual(1, myself.Slots.Count, $"Setup failed slot ranges count greater than 1 {slotRangesStr}"); var shards = context.clusterTestUtils.ClusterShards(0, context.logger); - ClassicAssert.AreEqual(1, shards.Count); + ClassicAssert.AreEqual(2, shards.Count); ClassicAssert.AreEqual(1, shards[0].slotRanges.Count); ClassicAssert.AreEqual(0, shards[0].slotRanges[0].Item1); ClassicAssert.AreEqual(16383, shards[0].slotRanges[0].Item2); @@ -765,7 +773,7 @@ public void ClusterReplicationCheckpointCleanupTest([Values] bool performRMW, [V if (!attachReplicaTask.Wait(TimeSpan.FromSeconds(60))) Assert.Fail("attachReplicaTask timeout"); - context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex: 0, secondaryIndex: 1, logger: context.logger); + context.clusterTestUtils.WaitForReplicaAofSync(primaryIndex: primaryIndex, secondaryIndex: replicaIndex, logger: context.logger); } [Test, Order(14)] From 9e555e1bb47f831a14838361fbf9ef388f566698 Mon Sep 17 00:00:00 2001 From: Vasileios Zois Date: Thu, 26 Mar 2026 11:30:18 -0700 Subject: [PATCH 8/8] update global.json --- global.json | 2 +- libs/server/Lua/LuaTimeoutManager.cs | 2 +- .../Tsavorite/cs/src/core/Allocator/AllocatorScan.cs | 7 ++++--- .../Tsavorite/Implementation/ContinuePending.cs | 2 +- playground/Bitmap/BitOp.cs | 12 ++++++------ 5 files changed, 13 insertions(+), 12 deletions(-) diff --git a/global.json b/global.json index 3462d015963..fa8e68b5fd5 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "10.0.103", + "version": "10.0.201", "rollForward": "latestMajor", "allowPrerelease": false } diff --git a/libs/server/Lua/LuaTimeoutManager.cs b/libs/server/Lua/LuaTimeoutManager.cs index d7f8542498d..bccb2983fba 100644 --- a/libs/server/Lua/LuaTimeoutManager.cs +++ b/libs/server/Lua/LuaTimeoutManager.cs @@ -263,7 +263,7 @@ internal Registration RegisterForTimeout(SessionScriptCache cache) goto tryAgain; } - // Other threads might update registrations, so check that before returning + // Other threads might update registrations, so check that before returning checkUnmodified: if ((updatedRegistrations = Interlocked.CompareExchange(ref registrations, curRegistrations, curRegistrations)) != curRegistrations) { diff --git a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs index 9358a6c5921..d3df96a17b2 100644 --- a/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs +++ b/libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorScan.cs @@ -247,10 +247,11 @@ private protected bool ScanLookup 0) - bContext.CompletePending(wait: true); + _ = bContext.CompletePending(wait: true); - IterationComplete: - if (resetCursor) cursor = 0; + IterationComplete: + if (resetCursor) + cursor = 0; scanFunctions.OnStop(false, scanCursorState.acceptedCount); return false; } diff --git a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs index 5258b1dc07b..7889e61efe9 100644 --- a/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs +++ b/libs/storage/Tsavorite/cs/src/core/Index/Tsavorite/Implementation/ContinuePending.cs @@ -246,7 +246,7 @@ internal OperationStatus ContinuePendingRMW(sessionFunctions, ref key, ref stackCtx); } - // Must do this *after* Unlocking. + // Must do this *after* Unlocking. CheckRetry: if (!HandleImmediateRetryStatus(status, sessionFunctions, ref pendingContext)) return status; diff --git a/playground/Bitmap/BitOp.cs b/playground/Bitmap/BitOp.cs index 0d8878f0849..893e619e778 100644 --- a/playground/Bitmap/BitOp.cs +++ b/playground/Bitmap/BitOp.cs @@ -360,7 +360,7 @@ private static void __bitop_simdX128_and(byte* dstBitmap, long dstLen, byte* src #endregion #region fillDstTail - fillTail: + fillTail: if (dstLen > srcLen) { batchSize = 8 * 4; @@ -530,7 +530,7 @@ private static void __bitop_simdX128_and_long(byte* dstBitmap, long dstLen, byte #endregion #region fillDstTail - fillTail: + fillTail: if (dstLen > srcLen) { batchSize = 8 * 4; @@ -729,7 +729,7 @@ private static void __bitop_simdX256_and(byte* dstBitmap, long dstLen, byte* src #endregion #region fillDstTail - fillTail: + fillTail: if (dstLen > srcLen) { batchSize = 8 * 4; @@ -828,7 +828,7 @@ private static void __bitop_multikey_scalar_and(byte* dstPtr, int dstLen, byte** *(long*)dstCurr = d00; dstCurr += batchSize; } - #endregion + #endregion fillTail: #region scalar_1x1 @@ -1046,7 +1046,7 @@ private static void __bitop_multikey_simdX128_and(byte* dstPtr, int dstLen, byte *(long*)dstCurr = d00; dstCurr += batchSize; } - #endregion + #endregion fillTail: #region scalar_1x1 @@ -1266,7 +1266,7 @@ private static void __bitop_multikey_simdX256_and(byte* dstPtr, int dstLen, byte *(long*)dstCurr = d00; dstCurr += batchSize; } - #endregion + #endregion fillTail: #region scalar_1x1